Skip to content
Snippets Groups Projects
Commit c04d7383 authored by nfontrod's avatar nfontrod
Browse files

src/find_interaction_cluster/clip_figures/communities_similarities.py: Create...

src/find_interaction_cluster/clip_figures/communities_similarities.py: Create the adjusted rand index heatmap of the differnts communities
parent 8907d939
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Description: The goal of this script is to load many community files \
generated with different technics and compute the adjusted random index \
between each pair of communities to see their level of similarities
"""
from typing import List
from cdlib.evaluation import adjusted_rand_index
from pathlib import Path
import pandas as pd
from ..config import Config
from functools import reduce
from operator import add
import numpy as np
from itertools import product
import seaborn as sns
import matplotlib.pyplot as plt
from .config import ConfigClip
from .clip_launcher_4_many_communities import find_or_create_community, \
logging_def
import lazyparser as lp
import logging
class Partition:
"""
Class allowing to create partitions/communities of nodes in a graph
"""
def __init__(self, my_list: List) -> None:
"""
:param my_list: A list of sublist of nodes
"""
self.communities = my_list
self.overlap = self.is_overlapping()
def is_overlapping(self) -> bool:
"""
:return: True if partitions overlap each other, False else
>>> Partition([[1, 2, 3], [4, 5]]).overlap
False
>>> Partition([[1, 2, 3], [1, 4, 5]]).overlap
True
"""
res = reduce(add, self.communities)
result = len(res) != len(np.unique(res))
if result:
gene, counts = np.unique(res, return_counts=True)
d = pd.DataFrame({"count": counts, "genes": gene})
print(d[d["count"] != 1])
print(d)
return result
def load_communities(community_files: Path, feature: str) -> List[List]:
"""
Load the communities inside a files.
:param community_files: A community files
:param feature: The feature of interest (gene or exon)
:return: The list of communities found in community file
>>> load_communities(Config.tests_files / "test_community_file.txt",
... "gene") == [[415, 416, 421, 422, 423, 433, 441, 475, 481, 502, 511],
... [10123, 10209, 8812, 9140, 9166]]
True
"""
df = pd.read_csv(community_files, sep="\t")
communities = [community.split(", ")
for community in df[f"{feature}s"].to_list()]
if feature == "gene":
communities = [list(map(int, community)) for community in communities]
return communities
def filter_communities(communities: List[List], threshold: int = 0):
"""
Remove every community with a size lower than `threshold`.
:param communities: A list of community
:param threshold: A size threshold below which communities are removed
:return: The communities passing the threshold
>>> filter_communities([[1, 2, 3], [5, 6], [7]], 0)
[[1, 2, 3], [5, 6], [7]]
>>> filter_communities([[1, 2, 3], [5, 6], [7]], 1)
[[1, 2, 3], [5, 6], [7]]
>>> filter_communities([[1, 2, 3], [5, 6], [7]], 2)
[[1, 2, 3], [5, 6]]
>>> filter_communities([[1, 2, 3], [5, 6], [7]], 3)
[[1, 2, 3]]
>>> filter_communities([[1, 2, 3], [5, 6], [7]], 4)
[]
"""
return [community for community in communities
if len(community) >= threshold]
def add_missing_nodes(communities1: List[List], communities2: List[List]
) -> List[List]:
"""
Add every missing nodes that are present in communities2 but not in \
communities1.
:param communities1: A list of communities
:param communities2: Another list of communities
:return: The communities in communities1 that contains every nodes \
in communities2
>>> add_missing_nodes([[1, 2, 3], [4, 5]], [[1, 2], [3, 4]])
[[1, 2, 3], [4, 5]]
>>> add_missing_nodes([[1, 2, 3], [4, 5]], [[1, 2], [3, 4, 8]])
[[1, 2, 3], [4, 5], [8]]
>>> add_missing_nodes([[1, 2, 3], [4, 5]], [[1, 2, 15], [3, 4, 8],
... [9, 10]])
[[1, 2, 3], [4, 5], [8], [9], [10], [15]]
"""
c1f = reduce(add, communities1)
c1f = np.unique(c1f)
c2f = reduce(add, communities2)
c2f = np.unique(c2f)
missing_nodes = [node for node in c2f
if node not in c1f]
return communities1 + [[node] for node in missing_nodes]
def get_rand_index(community_file1: Path, community_file2: Path,
threshold: int, feature: str) -> float:
"""
Get the adjusted rand index from two community files.
:param community_file1: A community files
:param community_file2: Another community files
:param threshold: A size threshold below which communities are removed
:param feature: The feature of interest (gene or exon)
:return: The adjusted rand index
"""
communities1 = load_communities(community_file1, feature)
communities2 = load_communities(community_file2, feature)
communities1 = filter_communities(communities1, threshold)
communities2 = filter_communities(communities2, threshold)
communities1 = add_missing_nodes(communities1, communities2)
communities2 = add_missing_nodes(communities2, communities1)
partition1 = Partition(communities1)
partition2 = Partition(communities2)
return adjusted_rand_index(partition1, partition2).score
def create_matrix(list_files: List[Path], list_name: List[str],
threshold: int, feature: str) -> pd.DataFrame:
"""
:param list_files: The list of community files
:param list_name: The list of the name for the files given in `list_files`
:param threshold: A size threshold below which communities are removed
:param feature: The feature of interest (gene or exon)
:return: The matrix of adjusted rand index
>>> lf = [Config.tests_files / "test_community_file.txt"] * 2
>>> create_matrix(lf, ['rep1', 'rep2'], 0, "gene")
rep1 rep2
rep1 1.0 1.0
rep2 1.0 1.0
"""
if len(list_name) != len(list_files):
raise IndexError(f"The length of list_name({len(list_name)} and "
f"list_files ({len(list_files)}) should be the same!")
matrix = np.zeros([len(list_files), len(list_files)], float)
for row, col in product(range(len(list_files)), repeat=2):
logging.info(f" working on {row}, {col}")
matrix[row][col] = get_rand_index(list_files[row], list_files[col],
threshold, feature)
return pd.DataFrame(matrix, columns=list_name, index=list_name)
def make_heatmap(matrix: pd.DataFrame, outfile: Path) -> None:
"""
Create an heatmap of adjusted rand index.
:param matrix: A matrix of adjusted rand index
:param outfile: File where the figure will be created
"""
sns.set(context="talk")
plt.figure(figsize=(10, 10))
ax = sns.heatmap(matrix, linewidths=.5, annot=True, fmt=".2f")
ax.tick_params(axis='both', labelsize=8)
plt.suptitle("Adjusted Rand Index of some communities")
plt.subplots_adjust(left=0.2, bottom=0.2)
plt.savefig(outfile)
@lp.parse(feature=["gene", "exon"], inflation="1 <= inflation <= 5")
def create_rand_index_heatmap(weight: int, global_weight: int,
same_gene: bool = True, inflation: float = 1.2,
threshold: int = 0, cell_line: str = "ALL",
feature: str = "gene") -> None:
"""
:param weight: The minimum weight of interaction to consider
:param global_weight: The global weight to consider. if \
the global weight is equal to 0 then then density figure are calculated \
by project, else all project are merge together and the interaction \
seen in `global_weight` project are taken into account
:param same_gene: Say if we consider as co-localised, exons within the \
same gene (True) or not (False) (default True)
:param inflation: The inflation parameter (default 1.2)
:param cell_line: Interactions are only selected from projects made on \
a specific cell line (ALL to disable this filter). (default ALL)
:param threshold: A size threshold below which communities are removed \
(default 0).
:param feature: The feature of interest (gene or exon) (default 'gene')
"""
logging_def(ConfigClip.output_folder, __file__, "INFO")
communities = ConfigClip.communities
names = ConfigClip.communities_name
if threshold < 10:
del(communities[2])
del (names[2])
logging.info("recovering HIPMCL files")
for i in range(len(communities)):
if communities[i] == "":
names[i] = f"HIPMCL_g{global_weight}_w{weight}_{inflation}"
communities[i] = find_or_create_community(
"", weight, global_weight, same_gene, inflation,
cell_line, feature)
logging.info("Creating matrix")
matrix = create_matrix(communities, names, threshold, feature)
out_files = [ConfigClip.output_folder /
f"rand_index_heatmap_t{threshold}_{feature}.{ext}"
for ext in ["txt", "pdf"]]
matrix.to_csv(out_files[0], sep="\t")
make_heatmap(matrix, out_files[1])
if __name__ == "__main__":
create_rand_index_heatmap()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment