Newer
Older
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Description: The goal of this program is to create communites \
of genes or exons with HipMCL by changing the inflation parameter \
and see which values of inflation parameter gives the best result \
in term of modularity, number of communities detected, size of those \
communities
"""
import sqlite3
from .config import ConfigGraph
from typing import List
import logging
import pandas as pd
import numpy as np
import lazyparser as lp
from pathlib import Path
import seaborn as sns
from math import log
from .community_finder import logging_def, get_project_colocalisation, \
create_graph, find_communities
def write_input(arr_interaction: np.array, outfile: Path, use_weight: bool):
"""
:param arr_interaction: Each couples of co-localized feature within a \
project.
:param outfile: the input file
:param use_weight: Say if we want to write the weight into the result file.
:return:
"""
with outfile.open('w') as f:
for exon1, exon2, cweight in arr_interaction:
if not use_weight:
cweight = 1
f.write(f"{exon1}\t{exon2}\t{cweight}\n")
def get_out_name(weight: int, global_weight: int, inflation: float,
project: str = "", same_gene=True, feature: str = "exon",
use_weight: bool = False, cell_line: str = "ALL"):
"""
return the output file where the communities are stored.
:param project: The name of the project of interest
:param weight: The minimum weight of interaction to consider
:param global_weight: The global weight to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projet are merge together and the interaction \
seen in `global_weight` project are taken into account
:param inflation: The inflation parameter of HipMCL program
:param same_gene: Say if we consider as co-localised, exons within the \
same gene (True) or not (False) (default False)
:param feature: The feature we want to analyse (default 'exon')
:param use_weight: Say if we want to write the weight into the result file.
:param cell_line: Interactions are recovered only from project made \
on this cell line.
:return: The file containing communities,
the input of hiMCL and the output
"""
w = "weigthed" if use_weight else "unweigthed"
cell = f"_{cell_line}" if cell_line != "ALL" else ""
if global_weight != 0:
project = f"global-weight-{global_weight}"
output = ConfigGraph.community_calibration_folder / "community_files" / \
f"{project}_weight-{weight}_same_gene-{same_gene}_{feature}_" \
f"{inflation}{cell}_{w}.txt"
input_hip = output.parent / "hipMCL_files" / \
output.name.replace(".txt", "_input.txt")
output_hip = input_hip.parent / input_hip.name.replace("_input.txt",
"_output.txt")
output.parent.mkdir(exist_ok=True, parents=True)
input_hip.parent.mkdir(exist_ok=True, parents=True)
return output, input_hip, output_hip
def get_figname(weight: int, global_weight: int,
project: str = "", same_gene=True, feature: str = "exon",
use_weight: bool = False, cell_line: str = "ALL"):
"""
return the output file where the communities are stored.
:param project: The name of the project of interest
:param weight: The minimum weight of interaction to consider
:param global_weight: The global weight to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projet are merge together and the interaction \
seen in `global_weight` project are taken into account
:param same_gene: Say if we consider as co-localised, exons within the \
same gene (True) or not (False) (default False)
:param feature: The feature we want to analyse (default 'exon')
:param use_weight: Say if we want to write the weight into the result file.
:param cell_line: Interactions are recovered only from project made \
on this cell line.
:return:
"""
w = "weigthed" if use_weight else "unweigthed"
cell = f"_{cell_line}" if cell_line != "ALL" else ""
if global_weight != 0:
project = f"global-weight-{global_weight}"
return ConfigGraph.community_calibration_folder / \
f"{project}_weight-{weight}_same_gene-{same_gene}_{feature}" \
f"{cell}_{w}.pdf"
def community_finder(weight: int, global_weight: int, inflation: float,
project: str = "", same_gene=True, feature: str = "exon",
use_weight: bool = False, cell_line: str = "ALL"):
"""
Find communities inside co-localisation between exons found in \
a ChIA-PET project.
:param project: The name of the project of interest
:param weight: The minimum weight of interaction to consider
:param global_weight: The global weight to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projet are merge together and the interaction \
seen in `global_weight` project are taken into account
:param inflation: The inflation parameter of HipMCL program
:param same_gene: Say if we consider as co-localised, exons within the \
same gene (True) or not (False) (default False)
:param use_weight: Say if we want to write the weight into the result file.
:param feature: The feature we want to analyse (default 'exon')
:param cell_line: The cell line chosen
"""
inflation = round(inflation, 2)
logging.info(f"Working with inflation {inflation}")
outfile, in_hipmcl, out_hipmcl = get_out_name(
weight, global_weight, inflation, project, same_gene, feature,
use_weight, cell_line)
if outfile.is_file():
return pd.read_csv(outfile, sep="\t")
cnx = sqlite3.connect(ConfigGraph.db_file)
interaction = get_project_colocalisation(cnx, project, weight,
global_weight, same_gene, True,
level=feature, cell=cell_line)
write_input(interaction, in_hipmcl, use_weight)
graph = create_graph(interaction)
df, dic_community = find_communities(graph, project, in_hipmcl, out_hipmcl,
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
feature, inflation=inflation,
compute_ec_cov=False)
logging.debug('Writing results ...')
df.to_csv(outfile, sep="\t", index=False)
return df
def create_dataframe(list_df: List[pd.DataFrame], list_inflation: np.array
) -> pd.DataFrame:
"""
Create a pandas dataframe indicating for each inflation, the number \
of community created and the modularity of the communities.
:param list_df: A list of dataframe of community
:param list_inflation: The list of inflation values
:return: A dataframe containing the inflation column, the \
number of community created, the modularity of those community and \
the median size of the community
>>> d = pd.DataFrame({"community": ["C1", "C2"], "nodes": [10, 20],
... "edges": [5, 7], "modularity": [0.95, 0.95]})
>>> create_dataframe([d, d], [1.5, 2])
inflation modularity community_number median_size
0 1.5 0.95 2 15.0
1 2.0 0.95 2 15.0
"""
dic = {"inflation": list_inflation, "modularity": [],
"community_number": [], "median_size": []}
for n, i in enumerate(list_inflation):
dic["modularity"].append(list_df[n]["modularity"][0])
dic["community_number"].append(list_df[n].shape[0])
dic["median_size"].append(np.median(list_df[n]["nodes"].to_list()))
df = pd.DataFrame(dic)
df["inflation"] = df["inflation"].round(2).astype(str)
return df
def create_community_size_dataframe(list_df: List[pd.DataFrame],
list_inflation: np.array
) -> pd.DataFrame:
"""
Create a pandas dataframe indicating for each inflation, the \
size of each communities
:param list_df: A list of dataframe of community
:param list_inflation: The list of inflation values
:return: A dataframe containing the inflation column, the \
size of each community
>>> d = pd.DataFrame({"community": ["C1", "C2"], "nodes": [10, 20],
... "edges": [5, 7], "modularity": [0.95, 0.95]})
>>> create_community_size_dataframe([d, d], [1.5, 2])
inflation community_size
0 1.5 10
1 1.5 20
2 2.0 10
3 2.0 20
"""
dic = {"inflation": [], "community_size": []}
for n, i in enumerate(list_inflation):
cdf = list_df[n]
vect_size = cdf["nodes"].to_list()
dic["community_size"] += vect_size
dic["inflation"] += [i] * len(vect_size)
df = pd.DataFrame(dic)
df["inflation"] = df["inflation"].round(2).astype(str)
return df
def create_scatter(df_infl: pd.DataFrame, fig_name: Path) -> None:
"""
Create a figure indicating the modularity of communities for each \
inflation parameter.
:param df_infl: A dataframe containing a column inflation, modularity \
community_number and median_size
:param fig_name: The file where the figure will be stored
"""
sns.set(context="poster")
g = sns.catplot(x="inflation", y="modularity", data=df_infl, aspect=1.7,
height=12, palette=["red"] * df_infl.shape[0],
kind="point")
xrange = g.ax.get_xlim()
ax2 = g.ax.twinx()
df_infl.plot(x="inflation", y="community_number",
color=(0.2, 0.8, 0.2, 0.4), ax=ax2,
kind="scatter", zorder=55, s=275)
ax2.set_ylabel('community_number', color="green")
ax2.tick_params(axis='y', labelcolor="green")
ax2.grid(False)
ax3 = g.ax.twinx()
ax3.set_ylabel('median_size', color="purple")
ax3.spines["right"].set_position(("axes", 1.1))
ax3.spines["right"].set_visible(True)
df_infl.plot(x="inflation", y="median_size",
color=(0.8, 0.2, 0.8, 0.4), ax=ax3,
kind="scatter", zorder=55, s=275)
ax3.tick_params(axis='y', labelcolor="purple")
ax3.grid(False)
g.ax.set_xlim(xrange)
g.savefig(fig_name)
def create_community_size_fig(df_infl: pd.DataFrame, fig_name: Path) -> None:
"""
Create a figure indicating the size of communities for each \
inflation parameter.
:param df_infl: A dataframe containing a column inflation, and \
community_size
:param fig_name: The file where the figure will be stored
"""
sns.set(context="poster")
df_infl["community_size"] = df_infl["community_size"].astype(float)
df_infl = df_infl[df_infl["community_size"] > 0]
df_infl["community_size"] = [log(x) for x in
df_infl["community_size"].to_list()]
g = sns.catplot(x="inflation", y="community_size", data=df_infl,
aspect=1.7, height=12, kind="violin")
g.savefig(fig_name)
@lp.parse(weight=range(1, 11), global_weight=range(11),
feature=('gene', 'exon'), istart="1.1 <= istart < 2.5",
istop="1.1 < istop <= 2.5", istep="0 < istep <= 1")
def make_calibration(weight: int, global_weight: int, istart: float = 1.1,
istop: float = 2.5, istep: float = 0.1, project: str = "",
same_gene=True, feature: str = "exon",
use_weight: bool = False, cell_line: str = "ALL",
logging_level: str = "INFO"):
logging_def(ConfigGraph.output_folder, __file__, logging_level)
inflations = np.arange(istart, istop + istep, istep)
list_df = [
community_finder(weight, global_weight, i, project, same_gene, feature,
use_weight=use_weight, cell_line=cell_line)
for i in inflations
]
df_infl = create_dataframe(list_df, inflations)
df_size = create_community_size_dataframe(list_df, inflations)
figname = get_figname(weight, global_weight, project, same_gene, feature,
use_weight, cell_line)
create_scatter(df_infl, figname)
create_community_size_fig(df_size, figname.parent /
figname.name.replace(".pdf", "_sizes.pdf"))
if __name__ == "__main__":
make_calibration()