Skip to content
Snippets Groups Projects
Commit 1476593a authored by nfontrod's avatar nfontrod
Browse files

src/figures_utils/tf_function.py: add function to handle transcription factors

parent cab687f0
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Description:
"""
import sqlite3
from typing import List, Tuple
from .config_figures import Config
import numpy as np
def get_projects_links_to_a_tf(cnx: sqlite3.Connection,
tf_name: str) -> List:
"""
Get the ID of every projects corresponding to a particular transcription \
factor.
:param cnx: connexion to the ChIA-PET database
:param tf_name: the transcription factor name,
:return: a list of id_project (table cin_project_tf),
corresponding to a particular splicing factor. E.g: [7, 30, 96, 135]
>>> get_projects_links_to_a_tf(sqlite3.connect(Config.db_file), "NKRF")
[1, 37]
"""
cursor = cnx.cursor()
query = """SELECT id
FROM cin_project_tf
WHERE tf_name = ?"""
cursor.execute(query, (tf_name,))
res = cursor.fetchall()
return [val[0] for val in res]
def get_de_events(cnx: sqlite3.Connection, id_project: int,
fc: float = 0.4) -> List:
"""
Get every gene regulated (down or up) according to a particular project.
:param cnx: connexion to the ChIA-PET database
:param id_project: a project ID of the table cin_project_tf
:param fc: The minimum log2foldChange threshold
:return: each sublist corresponds to a gene (gene_regulation +
gene_id), e.g: ['down', 18673]
>>> v = get_de_events(sqlite3.connect(Config.db_file), 29)
>>> len(v)
3718
>>> len([x for x in v if x[0] == "down"])
1712
>>> len([x for x in v if x[0] == "up"])
2006
>>> v[0:3]
[['down', 1185], ['up', 5405], ['up', 9675]]
>>> v = get_de_events(sqlite3.connect(Config.db_file), 1)
>>> len(v)
4889
>>> len([x for x in v if x[0] == "down"])
2380
>>> len([x for x in v if x[0] == "up"])
2509
>>> v[0:3]
[['up', 6774], ['up', 13033], ['up', 1663]]
"""
cursor = cnx.cursor()
query = f"""SELECT log2FoldChange, gene_id
FROM cin_de_event
WHERE id_project = ?
AND (log2FoldChange >= {fc} OR log2FoldChange <= -{fc})
AND padj <= 0.05"""
cursor.execute(query, (id_project,))
res = cursor.fetchall()
nres = []
for gene in res:
ngene = ["down", gene[1]] if gene[0] < 0 else ["up", gene[1]]
nres.append(ngene)
return nres
def washing_events(gene_list: List) -> List:
"""
Remove redundant genes or remove genes showing different regulation.
:param gene_list: each sublist corresponds to a gene (gene_regulation +
gene_id), e.g: ['down', 18673]
:return new_gene_list: each sublist corresponds to an gene (gene_regulation
+ gene_id ), e.g: ['down', 18962].
Every gene regulated by a transcription factor in different projects \
without redundancy.
>>> washing_events([["down", 1], ["down", 2], ["down", 2], ["down", 3],
... ["down", 3], ["up", 8], ["up", 8], ["up", 2], ["up", 1]])
[['down', 3], ['up', 8]]
"""
replace_dic = {"up": "down", "down": "up"}
dic = {}
prefix_list = []
for gene in gene_list:
gene_name = f"{gene[0]}_{gene[1]}"
if gene_name not in dic:
if gene[1] not in prefix_list:
dic[gene_name] = 1
prefix_list.append(gene[1])
else:
reverse_name = f"{replace_dic[gene[0]]}_{gene[1]}"
if reverse_name in dic:
del(dic[reverse_name])
# Else : the gene was deleted before because of a different
# regulation
else:
dic[gene_name] += 1
# Creation of the new list of exons
new_gene_list = []
for key in dic:
my_gene = key.split("_")
my_gene = [my_gene[0], int(my_gene[1])]
new_gene_list.append(my_gene)
return new_gene_list
def get_every_events_4_a_tf(cnx: sqlite3.Connection, tf_name: str,
regulation: str) -> Tuple:
"""
Get every differential expression events for a given transcription factor.
:param cnx: connexion to the ChIA-PET database
:param tf_name: the transcription factor name
:param regulation: up, down or both.
:return: A tuple of two features:
* A dictionary with a list of regulated exons depending on a \
transcription factor and its regulation
* A str which is the concatenation of the tf_name, the
regulation and the number of exons regulated by this TF according to the
type of regulation
>>> a, b = get_every_events_4_a_tf(sqlite3.connect(Config.db_file),
... "DDX59", "down")
>>> len(a["DDX59_down"])
1712
>>> a["DDX59_down"][0:5]
[1185, 4246, 12598, 17765, 6342]
>>> b
'DDX59_down_1712'
>>> a, b = get_every_events_4_a_tf(sqlite3.connect(Config.db_file),
... "NKRF", "down")
>>> len(a["NKRF_down"])
2756
>>> b
'NKRF_down_2756'
>>> a, b = get_every_events_4_a_tf(sqlite3.connect(Config.db_file),
... "NKRF", "both")
>>> len(a["NKRF_both"])
5516
"""
gene_list = []
id_projects = get_projects_links_to_a_tf(cnx, tf_name)
for id_project in id_projects:
de_event = get_de_events(cnx, id_project)
gene_list += de_event
washed_gene_list = washing_events(gene_list)
if regulation in ["up", "down"]:
reg_gene_list = [gene[1] for gene in washed_gene_list
if gene[0] == regulation]
else:
reg_gene_list = [gene[1] for gene in washed_gene_list]
tf_reg = {tf_name + "_" + regulation: reg_gene_list}
number_exons = tf_name + "_" + regulation + "_" + str(len(reg_gene_list))
return tf_reg, number_exons
if __name__ == "__main__":
import doctest
doctest.testmod()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment