Skip to content
Snippets Groups Projects
Commit 103fe022 authored by nfontrod's avatar nfontrod
Browse files

src/db_utils/fill_TF_tables.py: add a file to create two new table cin_de_event and cin_project_tf

parent 9eba6f19
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Description: The goal of this script is to build tables used to \
store transcription factors (TF) data. One table will contains \
metadata about every TF projects and the other every gene differentially \
expressed when the TF is removed by shRNA
"""
from .config import Config
import pandas as pd
from typing import Dict, Tuple, List
from pathlib import Path
from ..logging_conf import logging_def
import logging
from .populate_database import populate_df
def create_cin_project_tf() -> Tuple[pd.DataFrame, Dict]:
"""
Create the cin_project_TF table and return it with a dictionary \
containing the link between EN_id and cin_id
:return: The cin_project_TF table
>>> t, d = create_cin_project_tf()
>>> t.head()
id project_name source_db db_id_project tf_name cl_name
0 1 NKRF_ENCSR231PWH_K562 Encode ENCSR231PWH NKRF K562
1 2 FUBP1_ENCSR608IXR_K562 Encode ENCSR608IXR FUBP1 K562
2 3 NUP35_ENCSR953IQF_K562 Encode ENCSR953IQF NUP35 K562
3 4 MARK2_ENCSR016OIX_K562 Encode ENCSR016OIX MARK2 K562
4 5 DNAJC2_ENCSR577OVP_K562 Encode ENCSR577OVP DNAJC2 K562
>>> {k: d[k] for k in list(d.keys())[0:5]}
{1: 1, 4: 2, 5: 3, 7: 4, 10: 5}
"""
df = pd.read_csv(Config.tf_metadata, sep=",")
new_id = list(range(1, df.shape[0] + 1))
dic_link = {v: new_id[i] for i, v in enumerate(df["EN_ID"].to_list())}
df = df[["Accession", "Target_of_assay", "Biosample_term_name"]]
df.columns = ["db_id_project", "tf_name", "cl_name"]
df["source_db"] = ["Encode"] * df.shape[0]
df["project_name"] = df["tf_name"] + "_" + df["db_id_project"] + "_" + \
df["cl_name"]
df['id'] = new_id
return df[["id", "project_name", "source_db", "db_id_project",
"tf_name", "cl_name"]], dic_link
def get_de_files() -> List[Path]:
"""
Recover the list of files used to create the cin_de_event
:return: the list of files used to create the cin_de_event
>>> r = get_de_files()
>>> len(r)
49
>>> [a.name for a in r[0:2]]
['condition_ZC3H8_CTRL_sig.csv', 'condition_NUP35_CTRL_sig.csv']
"""
return list(Config.tf_folder.glob("*/condition*.csv"))
def create_cin_de_table(list_files: List[Path], dic_id: Dict,
project_table: pd.DataFrame) -> pd.DataFrame:
"""
Create the cin_DE_table.
:param list_files: The list of files used to build the cin_DE_table
:param dic_id: Dictionary linking each cin id to EN id
:param project_table: The table containing project metadata
:return: The cin_DE_table
>>> lf = get_de_files()
>>> t, di = create_cin_project_tf()
>>> r = create_cin_de_table(lf, di, t)
>>> r.head()[["gene_id", "id_project", "baseMean", "log2FoldChange"]]
gene_id id_project baseMean log2FoldChange
0 1925 47 58.406946 6.663470
1 10885 47 12.850435 -5.844456
2 6393 47 20.921393 -5.524661
3 9154 47 26.319160 4.897672
4 11462 47 54.610968 -4.600871
>>> r.head()[["pvalue", "padj"]]
pvalue padj
0 5.632504e-10 9.098382e-09
1 1.626786e-04 8.666283e-04
2 2.508788e-06 2.051380e-05
3 4.327869e-07 4.241582e-06
4 1.398879e-15 4.614459e-14
"""
df_list = []
good_cols = ["gene_id", "id_project", "baseMean", "log2FoldChange",
"pvalue", "padj"]
for cfile in list_files:
df = pd.read_csv(cfile, sep=",")
df.rename({"id_gene": "gene_id"}, axis=1, inplace=True)
en_id = int(cfile.parent.name.split("_", 1)[0].replace("EN", ""))
cin_id = dic_id[en_id]
df["id_project"] = [cin_id] * df.shape[0]
tf_name = project_table.loc[project_table["id"] == cin_id,
"tf_name"].values[0]
if tf_name != cfile.name.split("_")[1]:
raise ValueError(f"The transcription factor {tf_name} is not in "
f"cfile {cfile.name} !")
df = df[good_cols]
df_list.append(df)
df_final = pd.concat(df_list, axis=0, ignore_index=True).reset_index()
df_final.rename({"index": "id"}, axis=1, inplace=True)
df_final["id"] = df_final["id"] + 1
return df_final
def get_tf_tables() -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Get the cin_project_TF and the cin_DE_event Table
:return: the cin_project_TF and the cin_DE_event Table
>>> p, d = get_tf_tables()
>>> p.head()
id project_name source_db db_id_project tf_name cl_name
0 1 NKRF_ENCSR231PWH_K562 Encode ENCSR231PWH NKRF K562
1 2 FUBP1_ENCSR608IXR_K562 Encode ENCSR608IXR FUBP1 K562
2 3 NUP35_ENCSR953IQF_K562 Encode ENCSR953IQF NUP35 K562
3 4 MARK2_ENCSR016OIX_K562 Encode ENCSR016OIX MARK2 K562
4 5 DNAJC2_ENCSR577OVP_K562 Encode ENCSR577OVP DNAJC2 K562
>>> d.head()[["gene_id", "id_project", "baseMean", "log2FoldChange"]]
gene_id id_project baseMean log2FoldChange
0 1925 47 58.406946 6.663470
1 10885 47 12.850435 -5.844456
2 6393 47 20.921393 -5.524661
3 9154 47 26.319160 4.897672
4 11462 47 54.610968 -4.600871
"""
if not Config.tf_output_de.is_file() or \
not Config.tf_output_metadata.is_file():
Config.tf_output_de.parent.mkdir(exist_ok=True)
list_files = get_de_files()
cin_project_tf, dic_id = create_cin_project_tf()
cin_de_event = create_cin_de_table(list_files, dic_id, cin_project_tf)
cin_project_tf.to_csv(Config.tf_output_metadata, sep="\t", index=False)
cin_de_event.to_csv(Config.tf_output_de, sep="\t", index=False)
else:
cin_project_tf = pd.read_csv(Config.tf_output_metadata, sep="\t")
cin_de_event = pd.read_csv(Config.tf_output_de, sep="\t")
return cin_project_tf, cin_de_event
def fill_tf_data(logging_level: str = 'DISABLE') -> None:
"""
Fill the tables cin_de_event and cin_project_tf
"""
logging_def(Config.results, __file__, logging_level)
cin_project_tf, cin_de_event = get_tf_tables()
logging.debug('Filling cin_project_tf')
populate_df(table='cin_project_tf', df=cin_project_tf, clean='y')
logging.debug('Filling cin_de_event')
populate_df(table='cin_de_event', df=cin_de_event, clean='y')
if __name__ == "__main__":
import doctest
doctest.testmod()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment