Skip to content
Snippets Groups Projects
Commit 39d972e6 authored by nfontrod's avatar nfontrod
Browse files

src/find_interaction_cluster/community_finder.py: creation of lighten_color...

src/find_interaction_cluster/community_finder.py: creation of lighten_color and modification of find_communities to add some data to the communities find and to change the color of the communities in function of those data + creation of multiple_community_launcher to find communities for many projects or wieghts
parent edef0215
Branches
No related tags found
No related merge requests found
...@@ -19,13 +19,11 @@ import logging ...@@ -19,13 +19,11 @@ import logging
import plotly.graph_objects as go import plotly.graph_objects as go
import plotly import plotly
from pathlib import Path from pathlib import Path
from typing import Tuple, Dict from typing import Tuple, Dict, List
import matplotlib.cm as cm import matplotlib.cm as cm
from matplotlib.colors import rgb2hex from matplotlib.colors import to_rgba
from itertools import product
import multiprocessing as mp
class BadProjectName(Exception):
pass
def get_nodes(interaction: np.array) -> np.array: def get_nodes(interaction: np.array) -> np.array:
...@@ -56,6 +54,19 @@ def create_graph(interaction: np.array) -> nx.Graph: ...@@ -56,6 +54,19 @@ def create_graph(interaction: np.array) -> nx.Graph:
return graph return graph
def lighten_color(color: np.array, lighten: float = 1):
"""
:param color: A tuple of 3 float
:param lighten: The value to add to color.
:return: The color lightened
"""
color = list(to_rgba(color))
color = [int(c * 255) for c in color[0:3]] + [lighten]
color = 'rgba(' + ', '.join(map(str, color)) + ')'
return color
def find_communities(graph: nx.Graph, project: str def find_communities(graph: nx.Graph, project: str
) -> Tuple[pd.DataFrame, Dict]: ) -> Tuple[pd.DataFrame, Dict]:
""" """
...@@ -67,24 +78,37 @@ def find_communities(graph: nx.Graph, project: str ...@@ -67,24 +78,37 @@ def find_communities(graph: nx.Graph, project: str
to wich each exon belong to wich each exon belong
""" """
logging.debug("Finding community ...") logging.debug("Finding community ...")
communities_generator = community.girvan_newman(graph) communities_generator = community.label_propagation_communities(graph)
communities = next(communities_generator) communities = list(communities_generator)
dic_community = {} dic_community = {}
cov = community.coverage(graph, communities) cov = round(community.coverage(graph, communities), 2)
perf = community.performance(graph, communities) perf = np.nan # community.performance(graph, communities)
d = {'community': [], 'size': [], 'cov': [], 'perf': [], 'exons': []} d = {'community': [], 'nodes': [], 'edges' : [], 'EC': [], 'HCS': [],
colors = cm.rainbow(np.linspace(0, 1, len(communities))) '%E vs E in complete G': [],
'cov': [], 'perf': [], 'exons': []}
colors = cm.hsv(np.linspace(0, 1, len(communities)))
for k, c in enumerate(communities): for k, c in enumerate(communities):
clen = len(c) subg = nx.subgraph(graph, c)
nb_nodes = len(c)
nb_edges = len(subg.edges)
edge_connectivity = nx.edge_connectivity(subg)
is_hc = 'yes' if edge_connectivity > nb_nodes / 2 else 'no'
for exon in c: for exon in c:
dic_community[exon] = {'num': f'C{k + 1}', dic_community[exon] = {'num': f'C{k + 1}',
'col': rgb2hex(colors[k][:3]) 'col': lighten_color(colors[k])
if clen > 2 else 'white'} if is_hc == 'yes' else
(lighten_color(colors[k], 0.1) if nb_nodes > 2 else 'white')
}
d['community'].append(f'C{k + 1}') d['community'].append(f'C{k + 1}')
d['size'].append(len(list(c))) d['nodes'].append(nb_nodes)
d['edges'].append(nb_edges)
d['EC'].append(edge_connectivity)
d['HCS'].append(is_hc)
d['%E vs E in complete G'].append(round(
nb_edges / (nb_nodes * (nb_nodes - 1) / 2) * 100, 2))
d['exons'].append(', '.join(list(c))) d['exons'].append(', '.join(list(c)))
d['cov'].append(round(cov, 5)) d['cov'].append(cov)
d['perf'].append(round(perf, 5)) d['perf'].append(perf)
d['project'] = [project] * len(d['community']) d['project'] = [project] * len(d['community'])
df = pd.DataFrame(d) df = pd.DataFrame(d)
return df, dic_community return df, dic_community
...@@ -220,16 +244,6 @@ def community_finder(weight: int, global_weight: int, project: str = "", ...@@ -220,16 +244,6 @@ def community_finder(weight: int, global_weight: int, project: str = "",
""" """
ConfigGraph.output_folder.mkdir(exist_ok=True, parents=True) ConfigGraph.output_folder.mkdir(exist_ok=True, parents=True)
logging_def(ConfigGraph.output_folder, __file__, logging_level) logging_def(ConfigGraph.output_folder, __file__, logging_level)
if project != "" and global_weight != 0:
msg = "A project name was given given along with a " \
"global weight != 0 ! You should not give a project name if " \
"you want to concider many project together"
logging.exception(msg)
raise BadProjectName(msg)
if project == "" and global_weight == 0:
msg = "A project name must be given when global weight == 0"
logging.exception(msg)
raise BadProjectName
cnx = sqlite3.connect(ConfigGraph.db_file) cnx = sqlite3.connect(ConfigGraph.db_file)
interaction = get_project_colocalisation(cnx, project, weight, interaction = get_project_colocalisation(cnx, project, weight,
global_weight, same_gene, True) global_weight, same_gene, True)
...@@ -242,3 +256,82 @@ def community_finder(weight: int, global_weight: int, project: str = "", ...@@ -242,3 +256,82 @@ def community_finder(weight: int, global_weight: int, project: str = "",
same_gene, is_fig=True) same_gene, is_fig=True)
fig_title = get_figure_title(project, weight, global_weight, same_gene) fig_title = get_figure_title(project, weight, global_weight, same_gene)
create_figure(graph, figure, dic_community, fig_title) create_figure(graph, figure, dic_community, fig_title)
def get_projects(global_weight: int) -> List[str]:
"""
Get projects name.
:param global_weight: The global weight to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projet are merge together and the interaction \
seen in `global_weight` project are taken into account
:return: The list of the project to consider
"""
if global_weight != 0:
return [f'Global-weight-{global_weight}']
else:
cnx = sqlite3.connect(ConfigGraph.db_file)
c = cnx.cursor()
query = f"SELECT DISTINCT id_sample " \
f"FROM cin_projects " \
f"WHERE id_sample = 'GSM1872888' "
c.execute(query)
res = list(np.asarray(c.fetchall()).flatten())
c.close()
cnx.close()
return res
def get_projects_name(global_weights: List[int]) -> Tuple[List[str], Dict]:
"""
Get projects name given a list of global_weight and a dictionary linking,
each project name to it's corresponding global weight.
:param global_weight: The list of global weights to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projet are merge together and the interaction \
seen in `global_weight` project are taken into account
:return: project names and a dictionary linking,
each name to it's corresponding global weight.
"""
dic = {}
projects = []
for global_weight in global_weights:
tmp = get_projects(global_weight)
projects += tmp
for p in tmp:
dic[p] = global_weight
return projects, dic
def multiple_community_launcher(ps: int, weights: List[int],
global_weights: List[int],
same_gene: bool,
logging_level: str = "DISABLE"):
"""
:param ps: The number of processes we want to use.
:param weights: The list of weights of interaction to consider
:param global_weights: The list global weights to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all projcet are merge together and the interaction \
seen in `global_weight` project are taken into account
:param same_gene: Say if we consider as co-localised exon within the \
same gene
:param logging_level: Level of information to display
"""
ConfigGraph.community_folder.mkdir(exist_ok=True, parents=True)
logging_def(ConfigGraph.community_folder, __file__, logging_level)
global_weights = list(np.unique(global_weights))
weights = list(np.unique(weights))
projects, dic_project = get_projects_name(global_weights)
condition = list(product(projects, weights))
processes = []
pool = mp.Pool(processes=min(ps, len(condition)))
for project, weight in condition:
global_weight = dic_project[project]
logging.info(f'Finding community for project : {project}, '
f'global_weight : {global_weight}, weight: {weight}')
args = [weight, global_weight, project, same_gene]
processes.append(pool.apply_async(community_finder, args))
for proc in processes:
proc.get(timeout=None)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment