Skip to content
Snippets Groups Projects
Commit f4f9f920 authored by nfontrod's avatar nfontrod
Browse files

src/find_interaction_cluster/nt_and_community.py: creation of a script to...

src/find_interaction_cluster/nt_and_community.py: creation of a script to check if the nucleotide frequency is not randomly distributed amongs communities of exons
parent efbd8f90
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Description: The goal of this script is to see if the nucleotide \
frequency is not random between communities
"""
import pandas as pd
import logging
import sqlite3
from .config import ConfigGraph, get_communities
from typing import List
from doctest import testmod
from functools import reduce
from pathlib import Path
from rpy2.robjects import r, pandas2ri
from statsmodels.stats.multitest import multipletests
import numpy as np
from .community_finder import get_projects_name
from ..logging_conf import logging_def
from itertools import product
import multiprocessing as mp
from .sf_and_communities import get_key
def get_nt_frequency(cnx: sqlite3.Connection, list_exon: List[str]
) -> pd.DataFrame:
"""
Get the frequency of every nucleotides for exons in list_exon.
:param cnx: Connection to chia-pet database
:param list_exon: The list of exons for which we want to get \
:return: the frequency of nucleotides for the list of exons.
>>> d = get_nt_frequency(sqlite3.connect(ConfigGraph.db_file),
... ["1_1", "1_2"])
>>> d[["id_exon", 'A', 'C', 'G', 'T']]
ft id_exon A C G T
0 1_1 16.63480 34.60803 32.12237 16.63480
1 1_2 16.06426 26.10442 39.75904 18.07229
"""
query = f"""
SELECT ft, id_exon, frequency
FROM cin_exon_frequency
WHERE id_exon IN {tuple(list_exon)}
AND ft_type="nt"
"""
df = pd.read_sql_query(query, cnx)
df = df.pivot_table(index="id_exon", columns="ft", values="frequency")\
.reset_index()
return df
def get_community_table(communities: List[List[str]],
size_threshold: int) -> pd.DataFrame:
"""
return the table indicating the name of the exons and the \
the name of the community.
:param communities: List of community of exons
:param size_threshold: The required size a community must \
have to be considered
:return: table of community
>>> c = [['1_1', '2_5'], ['7_9', '4_19', '3_3']]
>>> get_community_table(c, 3)
community id_exon community_size
0 C1 7_9 3
1 C1 4_19 3
2 C1 3_3 3
"""
dic = {"community": [], "id_exon": [], "community_size": []}
for k, comm in enumerate(communities):
if len(comm) >= size_threshold:
name = f'C{k}'
clen = len(comm)
for exon in comm:
dic["community"].append(name)
dic['id_exon'].append(exon)
dic["community_size"].append(clen)
return pd.DataFrame(dic)
def lmm_maker(df: pd.DataFrame, outfile: Path, nt: str) -> float:
"""
Make the lmm analysis to see if the exon regulated by a splicing factor \
are equally distributed among the communities.
:param df: The dataframe
:param outfile: A name of a file
:param nt: the nucleotide of interest
:return: the pvalue of lmm
"""
pandas2ri.activate()
lmm = r(
"""
require("lme4")
require("DHARMa")
function(data, folder, partial_name) {
null_mod <- lm(%s ~ log(community_size), data=data)
mod <- lmer(%s ~ log(community_size) + (1 | community), data=data)
simulationOutput <- simulateResiduals(fittedModel = mod, n = 250)
png(paste0(folder, "/dignostics_", partial_name, ".png"))
plot(simulationOutput)
dev.off()
return(anova(mod, null_mod, test="Chisq"))
}
""" % (nt, nt))
folder = outfile.parent / "diagnostics"
folder.mkdir(parents=True, exist_ok=True)
partial_name = outfile.name.replace('.txt', '')
return lmm(df, str(folder), partial_name)["Pr(>Chisq)"][1]
def get_stat_nt_communities(df: pd.DataFrame, project: str, weight: int,
global_weight: int, same_gene: bool,
nt: str
) -> pd.Series:
"""
Get data (proportion of `reg` regulated exons by a splicing factor
`sf_name` within a community) for every community.
:param df: A dataframe containing the frequency of each nucleotide \
in each exons belonging to a community.
:param project: The name of the project of interest
:param weight: The minimum weight of interaction to consider
:param global_weight: The global weight to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projet are merge together and the interaction \
seen in `global_weight` project are taken into account
:param same_gene: Say if we consider as co-localised, exons within the \
same gene (True) or not (False) (default False)
:param nt: The nucleotide of interest
"""
logging.debug(f"lmm for {project}, w:{weight}, "
f"g:{global_weight} nt: {nt}")
res = {"project": [], "nt": [], 'pval': []}
outfile = ConfigGraph.get_community_file(project, weight,
global_weight,
same_gene, f"{nt}_stat.txt",
True)
nfolder = outfile.parent / "nt_analysis"
nfolder.mkdir(exist_ok=True, parents=True)
noutfile = nfolder / outfile.name
pval = lmm_maker(df, noutfile, nt)
res['project'] = project
res['nt'] = nt
res['pval'] = pval
return pd.Series(res)
def create_dataframe(project: str, weight: int, global_weight: int,
same_gene: bool) -> pd.DataFrame:
"""
:param project: The name of the project of interest
:param weight: The minimum weight of interaction to consider
:param global_weight: The global weight to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projet are merge together and the interaction \
seen in `global_weight` project are taken into account
:param same_gene: Say if we consider as co-localised, exons within the \
same gene (True) or not (False)
:return: A dataframe containing the frequency of every nucleotides \
of every exon in a large community
"""
size_threshold = 50
result = ConfigGraph.get_community_file(project, weight, global_weight,
same_gene,
"_communities.txt")
communities = get_communities(result, 0)
ncommunities = [c for c in communities if len(c) >= size_threshold]
cnx = sqlite3.connect(ConfigGraph.db_file)
list_exon = reduce(lambda x, y: x + y, ncommunities)
df = get_nt_frequency(cnx, list_exon)
df_com = get_community_table(communities, size_threshold)
df = df.merge(df_com, how="left", on="id_exon")
return df
def multiple_nt_lmm_launcher(ps: int, weights: List[int],
global_weights: List[int],
same_gene: bool, logging_level: str = "DISABLE"):
"""
Launch the statistical analysis for every
:param ps: The number of processes we want to use.
:param weights: The list of weights of interaction to consider
:param global_weights: The list global weights to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projcet are merge together and the interaction \
seen in `global_weight` project are taken into account
:param same_gene: Say if we consider as co-localised exon within the \
same gene
:param logging_level: Level of information to display
"""
ConfigGraph.community_folder.mkdir(exist_ok=True, parents=True)
logging_def(ConfigGraph.community_folder, __file__, logging_level)
logging.info("Checking if communities as an effect on nucleotide "
"frequency")
global_weights = list(np.unique(global_weights))
weights = list(np.unique(weights))
projects, dic_project = get_projects_name(global_weights)
nt_list = ["A", "C", "G", "T", "S", "W"]
condition = list(product(projects, weights, nt_list))
processes = {}
pool = mp.Pool(processes=min(ps, len(condition)))
logging.debug("Calculating stats...")
dic_df = {}
for project, weight, nt in condition:
global_weight = dic_project[project]
ckey = get_key(project, weight)
if ckey in dic_df:
df = dic_df[ckey]
else:
df = create_dataframe(project, weight, global_weight, same_gene)
nfile_table = ConfigGraph.get_community_file(project, weight,
global_weight,
same_gene,
f"nt_table.txt",
True)
df.to_csv(nfile_table, sep="\t", index=False)
dic_df[ckey] = df
args = [df, project, weight, global_weight, same_gene, nt]
if ckey not in processes.keys():
processes[ckey] = [pool.apply_async(get_stat_nt_communities, args)]
else:
processes[ckey].append(
pool.apply_async(get_stat_nt_communities, args))
for p, value in processes.items():
project, weight = p.split("_")
results = [p.get(timeout=None) for p in value]
pool.close()
pool.join()
fdf = pd.DataFrame(results)
fdf["padj"] = multipletests(fdf['pval'].values, method='fdr_bh')[1]
outfile = ConfigGraph.get_community_file(project, weight,
dic_project[project],
same_gene, f"lmm-nt_stat.txt",
True)
nfolder = outfile.parent / "nt_analysis"
noutfile = nfolder / outfile.name
fdf.to_csv(noutfile, sep="\t", index=False)
if __name__ == "__main__":
testmod()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment