Skip to content
Snippets Groups Projects
Commit 322c33d8 authored by nfontrod's avatar nfontrod
Browse files

src/nt_composition/make_nt_correlation.py: creation of a script that create...

src/nt_composition/make_nt_correlation.py: creation of a script that create density figures to see if regulated exons have the same composition as their co-localized exons
parent 1f8d5bef
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Description: Make density graphics for one particular nucleotides \
on a given project. The figure should present in x-axis \
the frequency of an exon for a nucleotide N and in y-axis the \
mean frequency of N in every other co-localized exons.
"""
import sqlite3
import pandas as pd
import logging
import numpy as np
import doctest
from .config import ConfigNt
from ..logging_conf import logging_def
from tqdm import tqdm
from typing import Dict, Tuple
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import linregress
from itertools import product
class NoInteractionError(Exception):
pass
def get_project_colocalisation(cnx: sqlite3.Connection, project: str
) -> np.array:
"""
Get the interactions in project `project`
:param cnx: Connection to chia-pet database
:param project: The project of interest
:return: The table containing the number of interaction by projects
"""
logging.debug('Recovering interaction')
query = "SELECT exon1, exon2 " \
"FROM cin_exon_interaction " \
f"WHERE id_project = '{project}'"
c = cnx.cursor()
c.execute(query)
result = np.asarray(c.fetchall())
if len(result) == 0:
msg = f'No interaction found in project {project}'
logging.exception(msg)
raise NoInteractionError(msg)
logging.debug(result[0:5])
return result
def get_interacting_exon(arr: np.array) -> np.array:
"""
get every co-localised exons within arr.
:param arr: The list of interaction within a project
:return: The list of uniq exons interacting with each other
>>> arr = np.array([['1_1', '5_1'], ['1_2', '1_1'], ['2_3', '2_4']])
>>> list(get_interacting_exon(arr))
['1_1', '1_2', '2_3', '2_4', '5_1']
"""
return np.unique(arr.flatten())
def get_all_exon_interacting_with_another(exon: str, arr: np.array
) -> np.array:
"""
From an exons, get the other exons directly co-localised with it.
:param exon: An exon
:param arr: An array of interaction
:return: The list of exons interacting with `exon`
>>> arr = np.array([['1_1', '5_1'], ['1_2', '1_1'], ['2_3', '2_4']])
>>> list(get_all_exon_interacting_with_another('1_1', arr))
['5_1', '1_2']
"""
exons = arr[(arr[:, 1] == exon) | (arr[:, 0] == exon)].flatten()
return exons[exons != exon]
def get_frequency_dic(cnx: sqlite3.Connection, nt: str, ft_type: str
) -> Dict[str, float]:
"""
Get the frequency of a nucleotide for every exon in \
the chia-pet database.
:param cnx: Connection to chia-pet database
:param nt: The nucleotide of interest
:param ft_type: The type of feature of interest
:return: The frequency dic
"""
logging.debug('Calculating frequency table')
query = "SELECT id_exon, frequency " \
"FROM cin_exon_frequency " \
f"WHERE ft = '{nt}' " \
f"AND ft_type = '{ft_type}'"
c = cnx.cursor()
c.execute(query)
result = c.fetchall()
if len(result) == 0:
msg = f'No Frequencies found for {nt} ({ft_type})'
logging.exception(msg)
raise NoInteractionError(msg)
dic = {}
for exon, freq in result:
dic[exon] = freq
logging.debug({k: dic[k] for k in list(dic.keys())[0:5]})
return dic
def create_density_table(arr_interaction: np.array, dic_freq: Dict[str, float],
) -> pd.DataFrame:
"""
Create the density table, a table showing the frequency of \
a nucleotide in every exon in a chia-pet projet and the mean \
frequency of every other co-localised exons.
:param arr_interaction: array of interaction between exons.
:param dic_freq: The frequency dataframe.
:return: The density table
"""
logging.debug('Calculating density table')
exons_list = get_interacting_exon(arr_interaction)
dic = {'exon': [], 'freq_exon': [], 'freq_coloc_exon': [], 'oexon': []}
pbar = tqdm(exons_list, desc="Getting frequencies...")
for exon in pbar:
freq_ex = dic_freq[exon]
oexon = get_all_exon_interacting_with_another(exon, arr_interaction)
freq_oexon = np.nanmean(np.asarray([dic_freq[ex] for ex in oexon],
dtype=float))
if freq_ex is not None and not np.isnan(freq_oexon):
dic['exon'].append(exon)
dic['freq_exon'].append(freq_ex)
dic['freq_coloc_exon'].append(freq_oexon)
dic['oexon'].append(", ".join(oexon))
df = pd.DataFrame(dic)
logging.debug(df.head())
return df
def create_density_fig(df: pd.DataFrame, project: str, ft_type: str, nt: str
) -> Tuple[float, float]:
"""
Compute a density file showing if the nucleotide frequency of \
an exons correlates with the frequency in other co-localised exons.
:param df: The dataframe containing frequency of exons and \
co-localized exons.
:param project: The name of the project where the co-localisation \
where observed
:param ft_type: The type of feature of interest
:param nt: The nucleotide of interest
:return: The correlation and the p-value
"""
logging.debug('Creating the density figure')
sns.set()
sns.set_context('talk')
plt.figure(figsize=(20, 12))
df2 = df[['freq_exon', 'freq_coloc_exon']].copy()
s, i, r, p, stderr = linregress(df.freq_exon, df.freq_coloc_exon)
sns.kdeplot(df2.freq_exon, df2.freq_coloc_exon, shade=True,
shade_lowest=False,
cmap="YlOrBr")
plt.plot(df.freq_exon, i + s * df.freq_exon, 'r',
label=f'r={round(r,4)}, p={round(p, 4)}')
plt.legend()
plt.xlabel(f"Freq of {nt} in an exon")
plt.ylabel(f"Freq of {nt} in co-localized exons")
plt.title(f'Freq of {nt} of exons and their co-localized partner in '
f'{project}')
plt.savefig(ConfigNt.get_density_file(project, ft_type, nt, fig=True))
plt.close()
return r, p
def create_density_figure(nt: str, ft_type: str,
project: str, logging_level: str = "DISABLE"
) -> Tuple[float, float]:
"""
Create a density figure for the project `project` for the nucleotide \
`nt`.
:param project: The chia-pet project of interest
:param nt: The nucleotide used to build the figure
:param ft_type: The type of feature of interest
:param logging_level: The level of information to display
:return: The correlation and the p-value
"""
logging_def(ConfigNt.interaction, __file__, logging_level)
outfile = ConfigNt.get_density_file(project, ft_type, nt, fig=False)
if not outfile.is_file():
cnx = sqlite3.connect(ConfigNt.db_file)
arr_interaction = get_project_colocalisation(cnx, project)
dic_freq = get_frequency_dic(cnx, nt, ft_type)
df = create_density_table(arr_interaction, dic_freq)
df.to_csv(outfile, sep="\t", index=False)
else:
logging.debug(f'The file {outfile} exist, recovering data ...')
df = pd.read_csv(outfile, sep="\t")
return create_density_fig(df, project, ft_type, nt)
def create_scatterplot(df_cor: pd.DataFrame, ft_type: str, ft: str):
"""
Create a scatterplot showing the R-value according to the \
number of interaction.
:param df_cor: The dataframe of correlation.
:param ft_type: The feature type of interest
:param ft: The feature of interest
"""
sns.set()
sns.set_context('talk')
plt.figure(figsize=(20, 12))
df_cor = df_cor[(df_cor['ft_type'] == ft_type) &
(df_cor['ft'] == ft)].copy()
sns.scatterplot(x='cor', y='nb_interaction', data=df_cor)
left, right = plt.xlim()
plt.text(df_cor.cor + right * 0.03, df_cor.nb_interaction, )
plt.xlabel(f"Correlation for {ft} ({ft_type}) in project")
plt.ylabel("Number of total interaction in projects")
plt.title(f'Project correlation for {ft} ({ft_type}) '
f'according to the number of interaction in projects')
plt.savefig(ConfigNt.get_density_recap(ft_type, ft, fig=True))
plt.close()
def create_all_frequency_figures(logging_level: str = "DISABLE"):
"""
Make density figure for every selected projects.
:param logging_level: The level of data to display.
"""
logging_def(ConfigNt.interaction, __file__, logging_level)
di = pd.read_csv(ConfigNt.interaction_file, sep="\t")
dic = {"project": [], 'ft_type': [], 'ft': [], 'nb_interaction': [],
'cor': [], 'pval': []}
with open(ConfigNt.selected_project, 'r') as f:
projects = f.read().splitlines()
nt_list = ['A', 'C', 'G', 'T', 'S', 'W']
param = product(projects, nt_list, ['nt'])
for project, nt, ft_type in param:
logging.info(f'Working on {project}, {ft_type}, {nt}')
r, p = create_density_figure(nt, ft_type, project)
tmp = {"project": project, "ft_type": ft_type,
"ft": nt, "cor": r, "pval": p,
'nb_interaction': di[di['projects'] == project].iloc[0, 1]}
for k in tmp.keys():
dic[k].append(tmp[k])
df_corr = pd.DataFrame(dic)
df_corr.to_csv(ConfigNt.density_folder / "density_recap.txt", sep="\t")
create_scatterplot(df_corr, "nt", "S")
if __name__ == "__main__":
doctest.testmod()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment