diff --git a/hi-classifier/__init__.py b/hi-classifier/__init__.py deleted file mode 100644 index 8b137891791fe96927ad78e64b0aad7bded08bdc..0000000000000000000000000000000000000000 --- a/hi-classifier/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/hi-classifier/auxiliary.py b/hi-classifier/auxiliary.py deleted file mode 100644 index 3500294867767444510cd97ea4ccd0e204c1b154..0000000000000000000000000000000000000000 --- a/hi-classifier/auxiliary.py +++ /dev/null @@ -1,135 +0,0 @@ -from typing import Tuple - - -def signal_handler(sig, frame, out_f, out_r): - """ - Handle termination signals to gracefully terminate processes. - - Parameters: - sig (int): Signal number. - frame (frame object): Current stack frame. - outF (subprocess.Popen): Process for the forward output. - outR (subprocess.Popen): Process for the reverse output. - """ - import sys - - out_f.terminate() - out_r.terminate() - sys.exit() - - -def partitionning(num_processes: int) -> Tuple[int, int, int]: - """ - Partition the number of threads for writing and fragmenting. - - Parameters: - num_processes (int): The total number of available CPU cores. - - Examples: - >>> partitionning(5) - (1, 1, 1) - >>> partitionning(6) - (1, 3, 1) - >>> partitionning(2) - Traceback (most recent call last): - ... - ValueError: You need at least 5 cores - """ - if num_processes >= 5: - reading_process = 1 - write_processes = 1 - compute_processes = ( - num_processes - write_processes * 3 - reading_process - ) - else: - raise ValueError("You need at least 5 cores") - - return write_processes, compute_processes, reading_process - - -def check_data(data): - """ - Examples: - >>> check_data([(0, 'Hello'), (1, 'Hello')]) - True - >>> check_data([(0, 'Hello'), (1, 'World')]) - False - """ - if str(data[0][0]) != str(data[1][0]): - print(str(data[0][0]), str(data[1][0]), flush=True) - print("Paired reads doesn't have the same name, suspicious.") - return False - return True - -def pair_complete(read_item): - """ - Vérifie que pour chaque lecture de la paire, les champs essentiels - (query_name, flag, reference_name, reference_start, mapping_quality, cigarstring) - sont présents et non vides. - - Retourne True si tous ces champs sont valides pour les deux lectures, - False sinon. - - Parameters: - read (tuple): Un tuple contenant deux listes (ou tuples) représentant - les informations de la lecture forward et reverse. - - Returns: - bool: True si les six premiers éléments sont valides pour les deux lectures, sinon False. - """ - for read in read_item: - if ( - (read[2] == "*") or (read[2] is None) or (read[2] == -1) - ): # reference_name - return False - if ( - (int(read[3]) == 0) or (read[3] is None) or (read[3] == -1) - ): # reference_start - return False - if ( - (read[5] == "*") or (read[5] is None) or (read[5] == -1) - ): # cigarstring - return False - return True - -def terminaison( - output_valide_queue, - output_other_queue, - output_del_queue, - Counts_Dang_Intra_Queue, - Counts_Biot_Intra_Queue, - Counts_Dang_Inter_Queue, - Counts_Biot_Inter_Queue, - Counts_Undigested_Site_Queue, - Counts_Dangling_Intra, - Counts_Biotin_Intra, - Counts_Dangling_Inter, - Counts_Biotin_Inter, - Counts_Undigested_Site, -): - """ - Ajout des dernières informations calculés et fermeture des queues de traitement - """ - # Intra - Counts_Dang_Intra_Queue.put(Counts_Dangling_Intra) - Counts_Biot_Intra_Queue.put(Counts_Biotin_Intra) - # Inter - Counts_Dang_Inter_Queue.put(Counts_Dangling_Inter) - Counts_Biot_Inter_Queue.put(Counts_Biotin_Inter) - # Undigested - Counts_Undigested_Site_Queue.put(Counts_Undigested_Site) - - # Closing queues - output_valide_queue.put(None) - output_other_queue.put(None) - # To del - output_del_queue.put(None) - - # Intra - Counts_Dang_Intra_Queue.put(None) - Counts_Biot_Intra_Queue.put(None) - # Inter - Counts_Dang_Inter_Queue.put(None) - Counts_Biot_Inter_Queue.put(None) - # Undigested - Counts_Undigested_Site_Queue.put(None) diff --git a/hi-classifier/bam.py b/hi-classifier/bam.py deleted file mode 100644 index f80d3ada679e538391de2201feff125bb831349b..0000000000000000000000000000000000000000 --- a/hi-classifier/bam.py +++ /dev/null @@ -1,102 +0,0 @@ -from multiprocessing import Queue -from typing import Any, List, Union - - -def read_to_tuple(read: Any) -> List[Union[None, str, int, List[int]]]: - """ - Transforme un objet pysam.AlignedSegment en une liste contenant - les informations essentielles pour chaque alignement. Si un attribut - est absent, la valeur par défaut (None ou une liste vide pour les qualités) - sera utilisée. - - Parameters: - read (Any): Objet représentant la lecture BAM (p. ex. pysam.AlignedSegment). - - Returns: - List[Union[None, str, int, List[int]]]: Liste des informations - essentielles de l'alignement. - """ - # Récupération sécurisée de chaque attribut avec une valeur par défaut - query_name = getattr(read, "query_name", None) - flag = getattr(read, "flag", None) - reference_name = getattr(read, "reference_name", None) - reference_start = getattr(read, "reference_start", None) - if reference_start is not None: - reference_start = int(reference_start) - mapping_quality = getattr(read, "mapping_quality", None) - cigarstring = getattr(read, "cigarstring", None) - next_reference_name = getattr(read, "next_reference_name", None) - next_reference_start = getattr(read, "next_reference_start", None) - template_length = getattr(read, "template_length", None) - query_sequence = getattr(read, "query_sequence", None) - - # Pour les qualités, on renvoie une liste si elles existent, sinon None - query_qualities = getattr(read, "query_qualities", None) - if query_qualities is not None: - query_qualities = list(query_qualities) - - return [ - query_name, # Nom de la lecture - flag, # Flag de l'alignement - str(reference_name), # Nom de la référence (chromosome) - reference_start, # Position de début sur la référence - mapping_quality, # Qualité de l'alignement - cigarstring, # Chaîne CIGAR - next_reference_name, # Nom de la référence pour la paire suivante - next_reference_start, # Position de la paire suivante - template_length, # Longueur du fragment (tlen) - query_sequence, # Séquence de la lecture - query_qualities, # Qualités (liste d'entiers) ou None - ] - - -def read_bam_pair( - bam_for_file: str, - bam_rev_file: str, - input_queue: Queue, - Header_Queue: Queue, - compute_processes: int, -) -> None: - """ - Read simultaneously two BAM files and put read pairs into an input queue. - - Parameters: - bam_for_file (str): Path to the forward BAM file. - bam_rev_file (str): Path to the reverse BAM file. - input_queue (Queue): Queue to store read pairs. - Header_Queue (Queue): Queue to store headers from both BAM files. - compute_processes (int): Number of processes (pour gérér la fin du flux). - """ - import sys - import traceback - - import pysam - - print("Start reading process") - try: - with ( - pysam.AlignmentFile(bam_for_file, "rb") as bam_for, - pysam.AlignmentFile(bam_rev_file, "rb") as bam_rev, - ): - for i in range(3): # Num of writing process - Header_Queue.put( - [bam_for.header.to_dict(), bam_rev.header.to_dict()] - ) - - for read_for, read_rev in zip(bam_for, bam_rev): - if read_for and read_rev: - # Convert read objects to serializable format - read_for = read_to_tuple(read_for) - read_rev = read_to_tuple(read_rev) - # print(read_for, flush=True) # ----> 1000 / 1000 lignes - input_queue.put([read_for, read_rev]) - - except Exception as e: - print(f"Error: with {bam_for_file} or {bam_rev_file}, {e}") - traceback.print_exc() - sys.exit(1) - - finally: - print("Ending reading process") - for _ in range(compute_processes): - input_queue.put(None) diff --git a/hi-classifier/classify.py b/hi-classifier/classify.py deleted file mode 100644 index 82ccb955b502bb199a406ba516b9d735da24b30c..0000000000000000000000000000000000000000 --- a/hi-classifier/classify.py +++ /dev/null @@ -1,168 +0,0 @@ -import os -import signal -import sys -import time -from multiprocessing import Queue - -from auxiliary import partitionning -from bam import read_bam_pair -from digest import build_restriction_index_from_records -from processing import process_items -from processmanager import ProcessManager -from restrictionsite import SearchInDataBase -from stats import compact_and_save_counts -from writer import write_bam_pair - -def categorize(args, logging): - """ - Main function to orchestrate the reading, processing, and writing of BAM files to FastQ. - - Parameters: - args (argparse.Namespace): Namespace object containing command-line arguments. - """ - bam_for_file = args.bam_for_file - bam_rev_file = args.bam_rev_file - output = args.output - num_threads = args.num_threads - mapq_score = args.mapq_score - enzyme: str = args.enzyme - fasta_ref = args.fasta_ref - len_max = args.len_max - tolerance = int(args.tolerance) - - ligation_site_list, restriction_site_list = SearchInDataBase(enzyme) - if restriction_site_list is None: - logging.error("Ligation site list is empty.") - sys.exit(0) - - dict_digested = build_restriction_index_from_records( - fasta_ref, restriction_site_list - ) - if dict_digested is None: - logging.error("Restriction index is empty.") - sys.exit(0) - - if not os.path.exists(bam_for_file) or not os.path.exists(bam_rev_file): - logging.error("BAM's file does not exist.") - sys.exit() - - Input_Queue = Queue() - Header_Queue = Queue() - Output_Queue_Valide = Queue() - Output_Queue_Other = Queue() - Counts_Dang_Intra_Queue = Queue() - Counts_Biot_Intra_Queue = Queue() - Counts_Dang_Inter_Queue = Queue() - Counts_Biot_Inter_Queue = Queue() - Counts_Undigested_Site_Queue = Queue() - Output_Queue_Del = Queue() - write_processes, compute_processes, reading_process = partitionning( - num_threads - ) - - manager = ProcessManager() - # Set up signal handlers - signal.signal(signal.SIGINT, manager.handle_signal) - signal.signal(signal.SIGTERM, manager.handle_signal) - - print("Start multiprocessing") - print() - try: - # Start worker processes - manager.start_worker( - target=read_bam_pair, - args=( - bam_for_file, - bam_rev_file, - Input_Queue, - Header_Queue, - compute_processes, - ), - ) - - # Process for processing items - [ - manager.start_worker( - target=process_items, - args=( - Input_Queue, - Output_Queue_Valide, - Output_Queue_Other, - Output_Queue_Del, - Counts_Dang_Intra_Queue, - Counts_Biot_Intra_Queue, - Counts_Dang_Inter_Queue, - Counts_Biot_Inter_Queue, - Counts_Undigested_Site_Queue, - mapq_score, - dict_digested, - ligation_site_list, - restriction_site_list, - len_max, - tolerance, - ), - ) - for _ in range(compute_processes) - ] - - # Process for writing pairs - manager.start_worker( - target=write_bam_pair, - args=( - Output_Queue_Other, - Header_Queue, - output, - compute_processes, - "Dangling", - ), - ) - # Process for writing pairs - manager.start_worker( - target=write_bam_pair, - args=( - Output_Queue_Valide, - Header_Queue, - output, - compute_processes, - "Valid", - ), - ) - - # Process for (to del) pairs - manager.start_worker( - target=write_bam_pair, - args=( - Output_Queue_Del, - Header_Queue, - output, - compute_processes, - "ToDel", - ), - ) - - # Get stats about events - merged_counts = compact_and_save_counts( - Counts_Dang_Intra_Queue, - Counts_Biot_Intra_Queue, - Counts_Dang_Inter_Queue, - Counts_Biot_Inter_Queue, - Counts_Undigested_Site_Queue, - output, - compute_processes, - plot=True, - ) - - # Monitor processes - while manager.running(): - if not manager.check_processes(): - sys.exit(1) - time.sleep(0.5) - - except Exception as e: - print(f"Error in cut: {e}") - manager.shutdown() - sys.exit(1) - - finally: - print("Treatment done succesfully !") - manager.shutdown() diff --git a/hi-classifier/digest.py b/hi-classifier/digest.py deleted file mode 100644 index 97ad304d4c3992f7cc591a69f9a12bf4972b92b7..0000000000000000000000000000000000000000 --- a/hi-classifier/digest.py +++ /dev/null @@ -1,125 +0,0 @@ -import re -from typing import Any, Dict, List, Tuple, Union - -from Bio import SeqIO - - -def find_restriction_sites(seq: Union[str, Any], pattern: str) -> List[int]: - """ - Recherche toutes les positions d'occurrence du motif (pattern) dans la séquence seq. - - Parameters: - seq (Union[str, Any]): Séquence dans laquelle rechercher (chaîne ou objet ayant une représentation str). - pattern (str): Motif à rechercher. - - Returns: - List[int]: Liste triée des positions (0-index) où le motif est trouvé. - - Examples: - >>> find_restriction_sites("GAATTCGAATTC", "GAATTC") - [0, 6] - >>> find_restriction_sites("AAAAAA", "TTTT") # Motif non présent - [] - >>> find_restriction_sites("AAAAAAAOAAAAAAAOAAAAAAOO", "O") - [7, 15, 22, 23] - """ - seq_str = str(seq).upper() - pattern = pattern.upper() - sites = [m.start() for m in re.finditer(pattern, seq_str)] - return sorted(sites) - - -def find_restriction_sites_from_list( - seq: Union[str, Any], ligation_site_list: List[Tuple[re.Pattern, int]] -) -> List[int]: - """ - Recherche toutes les positions d'occurrence pour une séquence (seq) en utilisant une liste - de motifs pré-compilés (ligation_site_list). Chaque élément de cette liste est un tuple (compiled_regex, length). - Retourne une liste triée et sans doublons des positions (0-indexées). - - Parameters: - seq (Union[str, Any]): Séquence dans laquelle rechercher. - ligation_site_list (List[Tuple[re.Pattern, int]]): Liste de tuples (regex compilée, longueur du site). - - Returns: - List[int]: Liste triée et unique des positions d'occurrence. - - Examples: - >>> import re - >>> test_seq = "GATATCATCGATATC" - >>> site_list = [(re.compile("GATATC"), 6), (re.compile("CGA"), 3)] - >>> find_restriction_sites_from_list(test_seq, site_list) - [0, 8, 9] - >>> import re - >>> test_seq = "AAAAAAAOAAAAAAAAA" - >>> site_list = [(re.compile("AOA"), 3), (re.compile("AAAAAAA"), 7)] - >>> find_restriction_sites_from_list(test_seq, site_list) - [0, 6, 8] - >>> import re - >>> test_seq = "XXXXXXXXXXXXXXX" - >>> site_list = [(re.compile("CCCCCC"), 6), (re.compile("UUUUU"), 5)] - >>> find_restriction_sites_from_list(test_seq, site_list) - [] - """ - seq_str = str(seq).upper() - positions = [] - for regex, length in ligation_site_list: - positions.extend([m.start() for m in re.finditer(regex, seq_str)]) - return sorted(set(positions)) - - -def build_restriction_index_from_records( - fasta_path: str, enzyme_input: Union[str, List[Tuple[re.Pattern, int]]] -) -> Dict[str, Dict[str, Any]]: - """ - Construit l'index des sites de restriction à partir d'une liste d'enregistrements (SeqRecord). - - Si enzyme_input est une chaîne, on recherche les sites via find_restriction_sites. - Si enzyme_input est une liste (ex. retournée par SearchInDataBase), on utilise find_restriction_sites_from_list. - - Pour chaque chromosome, retourne un dictionnaire contenant : - - "sites": une liste de tuples (site_num, position), numérotée à partir de 1. - - "fragments": une liste de tuples (frag_num, (start, end)) définissant les intervalles des fragments. - - Returns: - Dict[str, Dict[str, Any]]: - Clé = record.id (nom de chromosome/contig) - Valeur = { - "sites": [(1, pos1), (2, pos2), ...], - "fragments": [(1, (0, pos1)), (2, (pos1, pos2)), ...] - } - """ - records = list(SeqIO.parse(fasta_path, "fasta")) - - restr_index: Dict[str, Dict[str, Any]] = {} - for record in records: - chrom = record.id - seq = record.seq - if isinstance(enzyme_input, str): - sites_positions = find_restriction_sites(seq, enzyme_input) - else: - # On suppose ici que enzyme_input est une liste de tuples (compiled_regex, length) - sites_positions = find_restriction_sites_from_list( - seq, enzyme_input - ) - - # Créer la liste de sites avec numérotation - sites = [(i + 1, pos) for i, pos in enumerate(sites_positions)] - - # Construction des intervalles de fragments - fragments = [] - frag_num = 1 - if sites_positions: - fragments.append((frag_num, (0, sites_positions[0]))) - frag_num += 1 - for i in range(len(sites_positions) - 1): - fragments.append( - (frag_num, (sites_positions[i], sites_positions[i + 1])) - ) - frag_num += 1 - fragments.append((frag_num, (sites_positions[-1], len(seq)))) - else: - fragments.append((frag_num, (0, len(seq)))) - - restr_index[chrom] = {"sites": sites, "fragments": fragments} - return restr_index diff --git a/hi-classifier/event_collector.py b/hi-classifier/event_collector.py deleted file mode 100644 index 03425da19e78cd1ef40a7e70b7aa9af4a52a1660..0000000000000000000000000000000000000000 --- a/hi-classifier/event_collector.py +++ /dev/null @@ -1,452 +0,0 @@ -def update_restriction_counts( - paired_read, - label, - counts_vl_intra, - counts_dl_intra, - counts_vl_inter, - counts_dl_inter, -): - """ - Met à jour quatre dictionnaires de comptage des sites de restriction, - en distinguant les lectures intra-chromosomiques et inter-chromosomiques. - - Pour une paire de lectures (paired_read) annotée, selon le label de classification : - - Si le label est dans {"Up", "Down", "Close", "Joined", "SelfCircle"}, - on incrémente les compteurs "valides" pour les sites re_1 et re_2. - - Si le label contient "DLLeft", on incrémente le compteur du site re_proximal_1. - - Si le label contient "DLRight", on incrémente le compteur du site re_proximal_2. - - Si le label contient "DLBoth", on incrémente les compteurs pour les deux sites. - - Les dictionnaires sont séparés en deux groupes selon que la paire est intra- - ou inter-chromosomique. - - Parameters: - paired_read (PairedRead): L'objet PairedRead annoté, possédant les attributs : - - chrom_1 (et chrom_2) : noms de chromosomes (str) - - re_1 et re_2 : tuples (site_num, position) - - re_proximal_1 et re_proximal_2 : tuples (site_num, position) - - interchrom (bool) : True si la paire est inter-chromosomique - label (str): Label de classification (ex: "Up", "Close-DLLeft", "Far-DLRight", etc.) - counts_vl_intra (dict): Dictionnaire des compteurs de coupes valides pour les lectures intra. - counts_dl_intra (dict): Dictionnaire des compteurs de coupes dangling pour les lectures intra. - counts_vl_inter (dict): Dictionnaire des compteurs de coupes valides pour les lectures inter. - counts_dl_inter (dict): Dictionnaire des compteurs de coupes dangling pour les lectures inter. - - Returns: - tuple: (counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter) - Examples: - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400), - ... (5, 500) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)), - ... (5, (400, 500)) - ... ] - ... }, - ... "chr2": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)), - ... (5, (400, 500)) - ... ] - ... } - ... } - >>> from readmanager import PairedRead - >>> from positionfrag import annotate_paired_read - >>> from filter_interchrom import Classified_inter - >>> from filter_intrachrom import Classified_intra - >>> read1 = ("r1", 16, "chr1", 202, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 401, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=1) - >>> pr.re_1 - (1, 100) - >>> pr.re_2 - (5, 500) - >>> pr.re_proximal_1 - (2, 200) - >>> pr.re_proximal_2 - (4, 400) - >>> pr.re_intern_1 - (2, 200) - >>> pr.re_intern_2 - (5, 500) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=2) - >>> undigested - False - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=1) - >>> undigested - ['chr1', (2, 200)] - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> undigested - ['chr1', (2, 200)] - >>> - >>> read1 = ("r1", 16, "chr1", 202, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 401, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> undigested - ['chr1', (2, 200), 'chr1', (4, 400)] - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=3) - >>> undigested - False - >>> label = Classified_inter(pr, tolerance=3) - >>> label - 'Down-DLBoth' - >>> counts_vl_intra = {} - >>> counts_dl_intra = {} - >>> counts_vl_inter = {} - >>> counts_dl_inter = {} - >>> counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter = update_restriction_counts(pr, label, counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter) - >>> counts_vl_intra - {'chr1': {100: 1, 300: 1}} - >>> counts_dl_inter - {} - >>> counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter = update_restriction_counts(pr, label, counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter) - >>> counts_vl_intra - {'chr1': {100: 2, 300: 2}} - >>> counts_dl_intra - {'chr1': {200: 2, 400: 2}} - >>> counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter = update_restriction_counts(pr, label, counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter) - >>> counts_vl_intra - {'chr1': {100: 3, 300: 3}} - >>> counts_dl_intra - {'chr1': {200: 3, 400: 3}} - >>> counts_vl_inter - {} - >>> counts_dl_inter - {} - >>> # Cas inter-chromosomique - >>> read1 = ("r1", 16, "chr1", 201, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr2", 310, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> annotate_paired_read(pr, restr_index, tolerance=2) - False - >>> label = Classified_inter(pr, tolerance=2) - >>> counts_vl_intra = {} - >>> counts_dl_intra = {} - >>> counts_vl_inter = {} - >>> counts_dl_inter = {} - >>> counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter = update_restriction_counts(pr, label, counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter) - >>> counts_vl_inter - {'chr1': {100: 1}, 'chr2': {400: 1}} - >>> counts_dl_inter - {'chr1': {200: 1}, 'chr2': {}} - >>> # On ajoute le même élément ! - >>> counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter = update_restriction_counts(pr, label, counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter) - >>> counts_vl_inter - {'chr1': {100: 2}, 'chr2': {400: 2}} - >>> counts_dl_inter - {'chr1': {200: 2}, 'chr2': {}} - >>> counts_vl_intra - {} - >>> counts_dl_intra - {} - """ - - def increment_count(count_dict, chrom, pos): - if chrom not in count_dict: - count_dict[chrom] = {} - count_dict[chrom][pos] = count_dict[chrom].get(pos, 0) + 1 - - current_class = label.split("-")[0] - current_event = label.split("-")[1] - - if not paired_read.interchrom: - # On suppose que les lectures sont sur le même chromosome - chrom = paired_read.chrom_1 - - # S'assurer que le chromosome est présent dans les dictionnaires - if chrom not in counts_vl_intra: - counts_vl_intra[chrom] = {} - if chrom not in counts_dl_intra: - counts_dl_intra[chrom] = {} - - # Si label indique un contact valide, on incrémente re_1 et re_2 - if current_class in {"Up", "Down", "Close", "Joined", "SelfCircle"}: - for site in [paired_read.re_1, paired_read.re_2]: - if site is not None: - pos = site[1] - increment_count(counts_vl_intra, chrom, pos) - - # Si label contient "DanglingLeft", on incrémente re_proximal_1 - if "DLLeft" in current_event: - if paired_read.re_proximal_1 is not None: - pos = paired_read.re_proximal_1[1] - increment_count(counts_dl_intra, chrom, pos) - - # Si label contient "DanglingRight", on incrémente re_proximal_2 - if "DLRight" in current_event: - if paired_read.re_proximal_2 is not None: - pos = paired_read.re_proximal_2[1] - increment_count(counts_dl_intra, chrom, pos) - - # Si label contient "DanglingBoth", on incrémente pour les deux - if "DLBoth" in current_event: - if paired_read.re_proximal_1 is not None: - pos = paired_read.re_proximal_1[1] - increment_count(counts_dl_intra, chrom, pos) - if paired_read.re_proximal_2 is not None: - pos = paired_read.re_proximal_2[1] - increment_count(counts_dl_intra, chrom, pos) - - else: - # On suppose que les lectures sont sur le même chromosome - chrom1 = paired_read.chrom_1 - chrom2 = paired_read.chrom_2 - - # S'assurer que le chromosome est présent dans les dictionnaires - if chrom1 not in counts_vl_inter: - counts_vl_inter[chrom1] = {} - counts_dl_inter[chrom1] = {} - if chrom2 not in counts_vl_inter: - counts_vl_inter[chrom2] = {} - counts_dl_inter[chrom2] = {} - - # Si label indique un contact valide, on incrémente re_1 et re_2 - if current_class in { - "Up", - "Down", - "Close", - "Far", - "Joined", - "SelfCircle", - }: - site_1 = paired_read.re_1 - site_2 = paired_read.re_2 - if site_1 is not None: - pos = site_1[1] - increment_count(counts_vl_inter, chrom1, pos) - - if site_2 is not None: - pos = site_2[1] - increment_count(counts_vl_inter, chrom2, pos) - - # Si label contient "DanglingLeft", on incrémente re_proximal_1 - if "DLLeft" in current_event: - if paired_read.re_proximal_1 is not None: - pos = paired_read.re_proximal_1[1] - increment_count(counts_dl_inter, chrom1, pos) - - # Si label contient "DanglingRight", on incrémente re_proximal_2 - if "DLRight" in current_event: - if paired_read.re_proximal_2 is not None: - pos = paired_read.re_proximal_2[1] - increment_count(counts_dl_inter, chrom2, pos) - - # Si label contient "DanglingBoth", on incrémente pour les deux - if "DLBoth" in current_event: - if paired_read.re_proximal_1 is not None: - pos = paired_read.re_proximal_1[1] - increment_count(counts_dl_inter, chrom1, pos) - if paired_read.re_proximal_2 is not None: - pos = paired_read.re_proximal_2[1] - increment_count(counts_dl_inter, chrom2, pos) - - return counts_vl_intra, counts_dl_intra, counts_vl_inter, counts_dl_inter - - -def update_undigested_counts(undigested, counts_undigested_site): - """ - Met à jour le dictionnaire counts_undigested_site avec le comptage des sites non digérés. - - Le paramètre undigested doit être soit : - - False, si aucune lecture non digérée n'est détectée, - - Une liste de la forme [chrom, re_intern] pour un seul site non digéré, - - Une liste de la forme [chrom1, re_intern1, chrom2, re_intern2] pour deux sites non digérés. - - Chaque re_intern est un tuple dont le second élément correspond à la position du site. - - Le dictionnaire counts_undigested_site est organisé comme suit : - { chromosome: { site_position: count, ... } } - - Returns: - dict: Le dictionnaire mis à jour. - - Examples: - >>> counts = {} - >>> undigested = ["chr1", (3, 300)] - >>> update_undigested_counts(undigested, counts) - {'chr1': {300: 1}} - >>> undigested = ["chr1", (3, 300), "chr2", (4, 400)] - >>> update_undigested_counts(undigested, counts) - {'chr1': {300: 2}, 'chr2': {400: 1}} - >>> update_undigested_counts(False, counts) - {'chr1': {300: 2}, 'chr2': {400: 1}} - """ - if not undigested: - return counts_undigested_site - - # Si la liste contient 2 éléments, on traite un seul site. - if len(undigested) == 2: - chrom, re_intern = undigested - pos = re_intern[1] - if chrom not in counts_undigested_site: - counts_undigested_site[chrom] = {} - counts_undigested_site[chrom][pos] = ( - counts_undigested_site[chrom].get(pos, 0) + 1 - ) - # Si la liste contient 4 éléments, on traite deux sites (potentiellement sur deux chromosomes différents) - elif len(undigested) == 4: - chrom1, re_intern1, chrom2, re_intern2 = undigested - pos1 = re_intern1[1] - pos2 = re_intern2[1] - if chrom1 not in counts_undigested_site: - counts_undigested_site[chrom1] = {} - if chrom2 not in counts_undigested_site: - counts_undigested_site[chrom2] = {} - counts_undigested_site[chrom1][pos1] = ( - counts_undigested_site[chrom1].get(pos1, 0) + 1 - ) - counts_undigested_site[chrom2][pos2] = ( - counts_undigested_site[chrom2].get(pos2, 0) + 1 - ) - else: - raise ValueError( - "La structure de 'undigested' doit contenir 0, 2 ou 4 éléments." - ) - return counts_undigested_site - - -def length_checker(current_pair, label, undigested, len_max): - """ - Vérifie si la longueur théorique du fragment séquencé dépasse le seuil 'len_max'. - - La longueur théorique est calculée différemment selon le type de contact (label) : - - - Pour les contacts valides ("Up", "Down", "Close", "Far"): - longueur = |RE_1 - start_1| + |RE_2 - start_2| - où RE_1 et RE_2 (extraites via annotate_paired_read) représentent respectivement - le site de restriction le plus proche de la fin du read (RE_Valide) pour read1 et read2. - - - Pour le contact "Joined": - longueur = |start_2 - start_1| - - - Pour "SelfCircle": - longueur = |RE_2 - RE_1| - (où RE_1 et RE_2 correspondent aux sites de restriction associés aux extrémités) - - - Pour les contacts comportant exclusivement un dangling (par exemple "DLLeft", "DLBoth", etc.): - longueur = |start_2 - start_1| - - Si la longueur théorique calculée dépasse len_max, alors la paire est considérée comme trop longue : - - Le label est remplacé par "Other" - - undigested est mis à False - - Parameters: - current_pair (PairedRead): Objet contenant, pour chaque read, les attributs suivants : - - start_1, start_2 : positions de début - - re_1, re_2 : tuples (site_num, position) correspondant au RE_Valide (site le plus proche de la fin du read) - label (str): Label de classification (ex: "Up", "Close-DLLeft", "Joined", "SelfCircle", etc.) - undigested (bool): Indicateur initial (True si la paire contient des informations sur des sites non digérés) - len_max (int): Longueur maximale autorisée pour la paire. - - Returns: - tuple: (label, undigested) mis à jour. - - Examples: - >>> class DummyPair: - ... chrom_1 = "chr1" - ... chrom_2 = "chr1" - ... start_1 = 100 - ... start_2 = 300 - ... re_1 = (2, 150) # RE_Valide pour read1 - ... re_2 = (3, 350) # RE_Valide pour read2 - >>> pair = DummyPair() - >>> label, undigested = length_checker(pair, "Up", True, 150) - >>> # Calcul: |150 - 100| + |350 - 300| = 50 + 50 = 100 < 150 - >>> label - 'Up' - >>> undigested - True - >>> # Exemple 2: Même contact mais longueur trop importante - >>> label, undigested = length_checker(pair, "Up", True, 80) - >>> label - 'Other' - >>> undigested - False - >>> # Exemple 3: Pour un contact "Joined" - >>> pair.start_1 = 100; pair.start_2 = 400 - >>> label, undigested = length_checker(pair, "Joined", True, 250) - >>> # |400 - 100| = 300 > 250 - >>> label - 'Other' - >>> undigested - False - >>> # Exemple 4: Pour un contact "SelfCircle" - >>> pair.re_1 = (2, 150); pair.re_2 = (2, 140) - >>> label, undigested = length_checker(pair, "SelfCircle", True, 20) - >>> # |140 - 150| = 10 < 20 - >>> label - 'SelfCircle' - >>> undigested - True - >>> # Exemple 5: Pour un contact avec dangling ("DLLeft") - >>> class DummyPair: - ... chrom_1 = "chr1" - ... chrom_2 = "chr1" - ... start_1 = 100 - ... start_2 = 140 - ... re_1 = (2, 150) # RE_Valide pour read1 - ... re_2 = (3, 350) # RE_Valide pour read2 - >>> pair = DummyPair() - >>> label, undigested = length_checker(pair, "DL-DLLeft", True, 50) - >>> label - 'DL-DLLeft' - >>> undigested - True - >>> label, undigested = length_checker(pair, "DL-DLLeft", True, 30) - >>> label - 'Other' - >>> undigested - False - """ - too_long = False - theoretical_length = None - current_class = label.split("-")[0] - - if current_class in {"Up", "Down", "Close", "Far"}: - # Pour contacts valides : utiliser la distance entre le site RE_Valide et le début de chaque read - if current_pair.re_1 is None or current_pair.re_2 is None: - theoretical_length = float("inf") - else: - theoretical_length = abs( - int(current_pair.re_1[1]) - int(current_pair.start_1) - ) + abs(int(current_pair.re_2[1]) - int(current_pair.start_2)) - elif (current_class == "Joined") or ("DL" in current_class): - theoretical_length = abs(int(current_pair.start_2) - int(current_pair.start_1)) - elif current_class == "SelfCircle": - if current_pair.re_1 is None or current_pair.re_2 is None: - theoretical_length = float("inf") - else: - theoretical_length = abs( - int(current_pair.re_2[1]) - int(current_pair.re_1[1]) - ) - else: - too_long = True - - if (theoretical_length is not None) and (theoretical_length > len_max): - too_long = True - - if too_long: - label = "Other-None" - undigested = False - - return label, undigested diff --git a/hi-classifier/filter_interchrom.py b/hi-classifier/filter_interchrom.py deleted file mode 100644 index a3a47a9e3744128e22ac39ef1bd1da93d7aca4b6..0000000000000000000000000000000000000000 --- a/hi-classifier/filter_interchrom.py +++ /dev/null @@ -1,230 +0,0 @@ -from filter_intrachrom import ( - condition_dangling_both, - condition_dangling_left, - condition_dangling_right, - condition_orientation_convergent, - condition_orientation_divergent, - condition_orientation_identical_down, - condition_orientation_identical_up, -) - - -def compute_mask_inter(paired_read, tolerance=3): - """ - Calcule un mask binaire (dictionnaire de booléens) pour la paire de lectures. - Ce mask contient les conditions évaluées pour : - - Les fragments (simple ou multiple) : Identical, Adjacent, Neither - - L'orientation : Convergent, Divergent, Identical - - La présence de dangling (gauche et droite) - - - Évalue un ensemble de conditions sur le PairedRead et retourne un - dictionnaire de booléens indiquant si chaque condition est remplie. - - Parameters: - paired_read (PairedRead): La paire de lectures à analyser. - tolerance (int): Tolérance utilisée pour les conditions de type 'dangling'. - - Returns: - Dict[str, bool]: Un dictionnaire contenant les résultats des conditions, - par exemple: - { - "orientation_convergent": ..., - "orientation_divergent": ..., - "orientation_identical_up": ..., - "orientation_identical_down": ..., - "dangling_left": ..., - "dangling_right": ..., - "dangling_both": ... - } - - Examples: - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)), - ... (5, (400, 500)) - ... ] - ... } - ... } - >>> from readmanager import PairedRead - >>> from positionfrag import annotate_paired_read - >>> read1 = ("r1", 0, "chr1", 120, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 210, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> compute_mask_inter(pr, tolerance=3) - {'orientation_convergent': False, 'orientation_divergent': False, 'orientation_identical_up': True, 'orientation_identical_down': False, 'dangling_left': False, 'dangling_right': False, 'dangling_both': False} - - """ - - mask = {} - - # -- Basé sur l'orientation -- - mask["orientation_convergent"] = condition_orientation_convergent( - paired_read - ) - mask["orientation_divergent"] = condition_orientation_divergent( - paired_read - ) - mask["orientation_identical_up"] = condition_orientation_identical_up( - paired_read - ) - mask["orientation_identical_down"] = condition_orientation_identical_down( - paired_read - ) - - # -- Basé sur le "dangling" (sites de restriction proches) -- - mask["dangling_left"] = condition_dangling_left(paired_read, tolerance) - mask["dangling_right"] = condition_dangling_right(paired_read, tolerance) - mask["dangling_both"] = condition_dangling_both(paired_read, tolerance) - - return mask - - -def Classified_inter(paired_read, tolerance=3): - """ - Classifie la paire de lectures (paired_read) selon les catégories de Contact valide - et ajoute, en information secondaire, la présence de dangling (DLLeft, DLRight, DLBoth). - - Priorité principale (orientation) : - - Si orientation_divergent: "Close" - - Si orientation_identical_up: "Up" - - Si orientation_convergent: "Far" - - Si orientation_identical_down: "Down" - - Sinon: "Other-None" - - Ensuite, si une condition de dangling est vraie, le label est complété par un suffixe - indiquant la ou les conditions (par exemple, "DLLeft", "DLRight" ou "DLBoth"). - - Returns: - str: Le label de classification, par exemple "Close-DLLeft" ou "Far" etc. - - Examples: - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)), - ... (5, (400, 500)) - ... ] - ... }, - ... "chr2": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)), - ... (5, (400, 500)) - ... ] - ... } - ... } - >>> from readmanager import PairedRead - >>> from positionfrag import annotate_paired_read - >>> read1 = ("r1", 16, "chr1", 121, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr2", 310, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> Classified_inter(pr, tolerance=3) - 'Close-None' - >>> Classified_inter(pr, tolerance=10) - 'Close-DLRight' - >>> # Test "Up" sans dangling - >>> read1 = ("r1", 0, "chr1", 121, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr2", 310, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> Classified_inter(pr, tolerance=3) - 'Up-None' - >>> # Test "Far" sans dangling - >>> read1 = ("r1", 0, "chr2", 121, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 310, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> Classified_inter(pr, tolerance=3) - 'Close-None' - >>> # Test "Down" sans dangling - >>> read1 = ("r1", 16, "chr2", 101, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 310, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> pr - <PairedRead r2/r1 chr=inter strand=(-,-) pos=(310-260, 101-51)> - >>> pr = PairedRead(read2, read1) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> pr - <PairedRead r2/r1 chr=inter strand=(-,-) pos=(310-260, 101-51)> - >>> Classified_inter(pr, tolerance=3) - 'Down-DLRight' - >>> # Test ajout du suffixe dangling : simulons un cas "Close" avec dangling_left - >>> read1 = ("r1", 16, "chr1", 101, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr2", 310, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> Classified_inter(pr, tolerance=3) - 'Close-DLLeft' - >>> # Test suffixe dangling_both : prioritaire sur les autres - >>> read1 = ("r1", 16, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr2", 300, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> Classified_inter(pr, tolerance=3) - 'Close-DLBoth' - >>> # Test suffixe dangling_right seul - >>> read1 = ("r1", 0, "chr1", 120, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr2", 300, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> Classified_inter(pr, tolerance=3) - 'Far-DLRight' - """ - mask = compute_mask_inter(paired_read, tolerance) - main_label = [] - - # Détermination du label principal selon l'orientation - if mask["orientation_divergent"]: - main_label = "Close" - elif mask["orientation_identical_up"]: - main_label = "Up" - elif mask["orientation_convergent"]: - main_label = "Far" - elif mask["orientation_identical_down"]: - main_label = "Down" - else: - main_label = "Erreur_Algorythmique" - - # Informations secondaires sur le dangling - dl_labels = [] - if mask["dangling_both"]: - dl_labels = "DLBoth" - elif mask["dangling_left"]: - dl_labels = "DLLeft" - elif mask["dangling_right"]: - dl_labels = "DLRight" - else: - dl_labels = "None" - - return main_label + "-" + dl_labels diff --git a/hi-classifier/filter_intrachrom.py b/hi-classifier/filter_intrachrom.py deleted file mode 100644 index e7f5b400c0232bfc929efc1521d5ab214061c0e0..0000000000000000000000000000000000000000 --- a/hi-classifier/filter_intrachrom.py +++ /dev/null @@ -1,810 +0,0 @@ -############################################ -# CONDITIONS BASÉES SUR LES FRAGMENTS SIMPLES -############################################ - - -def condition_single_fragment(paired_read): - """ - Renvoie True si chaque read est assigné à UN SEUL fragment. - - Examples: - >>> from readmanager import PairedRead - >>> # On crée un PairedRead factice - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.fragment_1 = [(3, (100,200))] - >>> pr.fragment_2 = [(4, (201,300))] - >>> condition_single_fragment(pr) - True - - >>> pr.fragment_1 = [(3, (100,200))] - >>> pr.fragment_2 = [(4, (201,300)), (5, (301, 400))] - >>> condition_single_fragment(pr) - False - """ - return (len(paired_read.fragment_1) == 1) and ( - len(paired_read.fragment_2) == 1 - ) - - -def condition_frag_identical(paired_read): - """ - Pour une paire de lectures sur un fragment simple, - renvoie True si le numéro du fragment de read1 est égal à celui de read2. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.fragment_1 = [(3, (100,200))] - >>> pr.fragment_2 = [(3, (201,300))] # même numéro de fragment : 3 - >>> condition_frag_identical(pr) - True - >>> pr.fragment_2 = [(4, (201,300))] - >>> condition_frag_identical(pr) - False - """ - if condition_single_fragment(paired_read): - frag1_num = paired_read.fragment_1[0][0] - frag2_num = paired_read.fragment_2[0][0] - return frag1_num == frag2_num - return False - - -def condition_frag_adjacent(paired_read): - """ - Pour une paire de lectures sur un fragment simple, - renvoie True si les numéros des fragments diffèrent exactement de 1. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 300, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.fragment_1 = [(10, (100,200))] - >>> pr.fragment_2 = [(9, (201,300))] - >>> condition_frag_adjacent(pr) - True - >>> pr.fragment_2 = [(11, (201,300))] - >>> condition_frag_adjacent(pr) - True - >>> pr.fragment_2 = [(12, (201,300))] - >>> condition_frag_adjacent(pr) - False - """ - if condition_single_fragment(paired_read): - frag1_num = paired_read.fragment_1[0][0] - frag2_num = paired_read.fragment_2[0][0] - return abs(frag1_num - frag2_num) == 1 - return False - - -def condition_frag_neither(paired_read): - """ - Pour une paire sur un fragment simple, - renvoie True si les fragments ne sont ni identiques ni adjacents. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 300, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.fragment_1 = [(10, (100,200))] - >>> pr.fragment_2 = [(12, (201,300))] - >>> condition_frag_neither(pr) - True - >>> pr.fragment_2 = [(10, (201,300))] - >>> condition_frag_neither(pr) - False - """ - if condition_single_fragment(paired_read): - frag1_num = paired_read.fragment_1[0][0] - frag2_num = paired_read.fragment_2[0][0] - return (frag1_num != frag2_num) and (abs(frag1_num - frag2_num) != 1) - return False - - -############################################ -# CONDITIONS BASÉES SUR LES FRAGMENTS MULTIPLES -############################################ - - -def condition_multiple_fragment(paired_read): - """ - Renvoie True si chaque read est assigné à UN SEUL fragment. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.fragment_1 = [(3, (100,200)), (4, (200,300))] - >>> pr.fragment_2 = [(5, (300,400))] - >>> condition_multiple_fragment(pr) - True - >>> pr.fragment_1 = [(3, (100,200))] - >>> condition_multiple_fragment(pr) - False - """ - return (len(paired_read.fragment_1) != 1) or ( - len(paired_read.fragment_2) != 1 - ) - - -def condition_frag_multiple_identical(paired_read): - """ - Pour une paire non simple, renvoie True si au moins un fragment de read1 - est identique à au moins un fragment de read2. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.fragment_1 = [(2, (100,200)), (3, (200,300))] - >>> pr.fragment_2 = [(1, (50,99)), (3, (200,300))] - >>> # pr est "non simple" (fragment_1 a 2 éléments) - >>> condition_frag_multiple_identical(pr) - True - >>> pr.fragment_2 = [(1, (50,99)), (4, (301,400))] - >>> condition_frag_multiple_identical(pr) - False - """ - if not condition_single_fragment(paired_read): - for frag1 in paired_read.fragment_1: - for frag2 in paired_read.fragment_2: - if frag1[0] == frag2[0]: - return True - return False - - -def condition_frag_multiple_adjacent(paired_read): - """ - Pour une paire non simple, renvoie True si TOUTES les combinaisons de fragments - entre read1 et read2 sont adjacentes (différence de 1). - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.fragment_1 = [(2, (100,200)), (3, (200,300))] - >>> pr.fragment_2 = [(2, (100,200)), (4, (301,400))] - >>> condition_frag_multiple_adjacent(pr) - True - >>> pr.fragment_2 = [(1, (50,99))] - >>> condition_frag_multiple_adjacent(pr) - True - >>> pr.fragment_2 = [(5, (401,500))] - >>> condition_frag_multiple_adjacent(pr) - False - """ - if not condition_single_fragment(paired_read): - for frag1 in paired_read.fragment_1: - for frag2 in paired_read.fragment_2: - if abs(frag1[0] - frag2[0]) <= 1: - return True - return False - return False - - -def condition_frag_multiple_neither(paired_read): - """ - Pour une paire non simple, renvoie True si aucun des fragments de read1 n'est - voisin d'aucun fragment de read2 (ni identique, ni adjacents). - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.fragment_1 = [(2, (100,200)), (3, (200,300))] - >>> pr.fragment_2 = [(5, (400,500)), (6, (501,600))] - >>> condition_frag_multiple_neither(pr) - True - >>> pr.fragment_2 = [(3, (200,300)), (5, (400,500))] - >>> condition_frag_multiple_neither(pr) - False - """ - if not condition_single_fragment(paired_read): - for frag1 in paired_read.fragment_1: - for frag2 in paired_read.fragment_2: - if (frag1[0] == frag2[0]) or (abs(frag1[0] - frag2[0]) == 1): - return False - return True - return False - - -############################################ -# CONDITIONS BASÉES SUR L'ORIENTATION -############################################ - - -def condition_orientation_convergent(paired_read): - """ - Convergence : read1 en '+' et read2 en '-'. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0x10, "chr1", 200, 30, "50M", None, None, 0, "ACGT") # 0x10 => brin négatif - >>> pr = PairedRead(read1, read2) - >>> condition_orientation_convergent(pr) - True - >>> condition_orientation_divergent(pr) - False - """ - return paired_read.strand_1 == "+" and paired_read.strand_2 == "-" - - -def condition_orientation_divergent(paired_read): - """ - Divergence : read1 en '-' et read2 en '+'. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0x10, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> condition_orientation_divergent(pr) - True - >>> condition_orientation_convergent(pr) - False - """ - return paired_read.strand_1 == "-" and paired_read.strand_2 == "+" - - -def condition_orientation_identical_up(paired_read): - """ - Identique : les deux reads ont le même brin, '+'. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> condition_orientation_identical_up(pr) - True - >>> condition_orientation_identical_down(pr) - False - """ - return (paired_read.strand_1 == "+") and (paired_read.strand_2 == "+") - - -def condition_orientation_identical_down(paired_read): - """ - Identique : les deux reads ont le même brin, '-'. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0x10, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0x10, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> condition_orientation_identical_down(pr) - True - """ - return (paired_read.strand_1 == "-") and (paired_read.strand_2 == "-") - - -############################################ -# CONDITIONS BASÉES SUR LES SITES DE RESTRICTION -############################################ - - -def condition_dangling_left(paired_read, TOLERANCE): - """ - Dangling Left : le début du read1 doit être sur un site de restriction. - Ici, on suppose que paired_read.re_1 contient la position du site de restriction assigné à read1. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 105, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.re_proximal_1 = (1, 100) # site de restriction à 100 - >>> condition_dangling_left(pr, TOLERANCE=10) - True - >>> condition_dangling_left(pr, TOLERANCE=4) - False - """ - return abs(paired_read.start_1 - paired_read.re_proximal_1[1]) <= TOLERANCE - - -def condition_dangling_right(paired_read, TOLERANCE): - """ - Dangling Right : le début du read2 doit être sur un site de restriction. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 208, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.re_proximal_2 = (1, 210) - >>> condition_dangling_right(pr, TOLERANCE=5) - True - >>> condition_dangling_right(pr, TOLERANCE=1) - False - """ - return abs(paired_read.start_2 - paired_read.re_proximal_2[1]) <= TOLERANCE - - -def condition_dangling_both(paired_read, TOLERANCE): - """ - Dangling Both : les deux reads doivent être sur des sites de restriction. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 105, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 208, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.re_proximal_1 = (1, 100) - >>> pr.re_proximal_2 = (2, 210) - >>> condition_dangling_both(pr, TOLERANCE=5) - True - >>> condition_dangling_both(pr, TOLERANCE=1) - False - """ - return condition_dangling_left( - paired_read, TOLERANCE - ) and condition_dangling_right(paired_read, TOLERANCE) - - -############################################ -# CONDITIONS BASÉES SUR LES SITES DE RESTRICTION -############################################ - - -def condition_identical_RE(paired_read): - """ - Identical RE : les deux reads doivent se rapporter au même site de restriction. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.re_1 = (10, 100) # (site_num, position) - >>> pr.re_2 = (10, 200) - >>> condition_identical_RE(pr) - True - >>> pr.re_2 = (11, 200) - >>> condition_identical_RE(pr) - False - """ - return paired_read.re_1[0] == paired_read.re_2[0] - - -def condition_adjacent_RE(paired_read): - """ - Adjacent RE : les deux reads doivent se rapporter au même site de restriction. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.re_1 = (10, 100) - >>> pr.re_2 = (11, 200) - >>> condition_adjacent_RE(pr) - True - >>> pr.re_2 = (12, 200) - >>> condition_adjacent_RE(pr) - False - """ - return abs(paired_read.re_1[0] - paired_read.re_2[0]) == 1 - - -def condition_distant_RE(paired_read): - """ - Distant RE : les deux reads doivent se rapporter au même site de restriction. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.re_1 = (10, 100) - >>> pr.re_2 = (13, 200) - >>> condition_distant_RE(pr) - True - >>> pr.re_2 = (11, 200) - >>> condition_distant_RE(pr) - False - """ - return abs(paired_read.re_1[0] - paired_read.re_2[0]) > 1 - - -def compute_mask(paired_read, tolerance=3): - """ - Calcule un mask binaire (dictionnaire de booléens) pour la paire de lectures. - Ce mask contient les conditions évaluées pour : - - Les fragments (simple ou multiple) : Identical, Adjacent, Neither - - L'orientation : Convergent, Divergent, Identical - - La présence de dangling (gauche et droite) - - - Évalue un ensemble de conditions sur le PairedRead et retourne un - dictionnaire de booléens indiquant si chaque condition est remplie. - - Parameters: - paired_read (PairedRead): La paire de lectures à analyser. - tolerance (int): Tolérance utilisée pour les conditions de type 'dangling'. - - Returns: - Dict[str, bool]: Un dictionnaire contenant les résultats des conditions, - par exemple: - { - "single_fragment": True/False, - "frag_identical": ..., - "frag_adjacent": ..., - "frag_neither": ..., - "multiple_fragment": ..., - "frag_multiple_identical": ..., - "frag_multiple_adjacent": ..., - "frag_multiple_neither": ..., - "orientation_convergent": ..., - "orientation_divergent": ..., - "orientation_identical_up": ..., - "orientation_identical_down": ..., - "dangling_left": ..., - "dangling_right": ..., - "dangling_both": ..., - "re_identical": ..., - "re_adjacent": ..., - "re_distant": ... - } - """ - mask = {} - - # -- Basé sur les fragments simples -- - mask["single_fragment"] = condition_single_fragment(paired_read) - mask["frag_identical"] = condition_frag_identical(paired_read) - mask["frag_adjacent"] = condition_frag_adjacent(paired_read) - mask["frag_neither"] = condition_frag_neither(paired_read) - - # -- Basé sur les fragments multiples -- - mask["multiple_fragment"] = condition_multiple_fragment(paired_read) - mask["frag_multiple_identical"] = condition_frag_multiple_identical( - paired_read - ) - mask["frag_multiple_adjacent"] = condition_frag_multiple_adjacent( - paired_read - ) - mask["frag_multiple_neither"] = condition_frag_multiple_neither( - paired_read - ) - - # -- Basé sur l'orientation -- - mask["orientation_convergent"] = condition_orientation_convergent( - paired_read - ) - mask["orientation_divergent"] = condition_orientation_divergent( - paired_read - ) - mask["orientation_identical_up"] = condition_orientation_identical_up( - paired_read - ) - mask["orientation_identical_down"] = condition_orientation_identical_down( - paired_read - ) - - # -- Basé sur le "dangling" (sites de restriction proches) -- - mask["dangling_left"] = condition_dangling_left(paired_read, tolerance) - mask["dangling_right"] = condition_dangling_right(paired_read, tolerance) - mask["dangling_both"] = condition_dangling_both(paired_read, tolerance) - - # -- Basé sur les sites de restriction (re_1, re_2) -- - mask["re_identical"] = condition_identical_RE(paired_read) - mask["re_adjacent"] = condition_adjacent_RE(paired_read) - mask["re_distant"] = condition_distant_RE(paired_read) - - return mask - - -def Classified_intra(paired_read, tolerance=3): - """ - Classifie la paire de lectures (paired_read) selon les catégories décrites. - Elle s'appuie sur la fonction compute_mask(paired_read, tolerance) - qui renvoie un dictionnaire de conditions booléennes. - - Ordre d'évaluation (prioritaire) : - 1) Contact valide et biotinylés (condition_distant_RE == True) - - Close : orientation_divergent, single_fragment => ni identique ni adjacent - - Up : orientation_identical_up, single_fragment => ni identique ni adjacent - - Far : orientation_convergent, single_fragment => ni identique ni adjacent - - Down : orientation_identical_down, single_fragment => ni identique ni adjacent - - Si fragment_multiple => on n'applique pas de condition supplémentaire dans cet exemple - - 2) Contact informatif et biotinylés (condition_distant_RE == False) - - SelfCircle : orientation_divergent + re_adjacent + single_fragment + frag_identical - - Joined : orientation_convergent + re_identical - * si frag_multiple => frag_multiple_adjacent ou frag_multiple_identical - * si frag_simple => frag_adjacent - - 3) Semi-biotinylés (single_fragment + frag_identical + orientation_convergent) - - Dangling Left : condition_dangling_left - - Dangling Right : condition_dangling_right - - Dangling Both : condition_dangling_both - - 4) Combinaisons (Joined × Dangling) - exemple - - Joined × Dangling Left : ... - - Joined × Dangling Right : ... - - Joined × Dangling Both : ... - (Ici on peut faire un label combiné ou un label distinct) - - 5) Divers => "Other" - - Returns: - str: Le label de classification (ex: "Close", "Joined", "Semi-Biot Right", etc.) - - Examples: - >>> from readmanager import PairedRead - >>> from positionfrag import annotate_paired_read - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)), - ... (5, (400, 500)) - ... ] - ... } - ... } - >>> read1 = ("r1", 0, "chr1", 101, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 300, 30, "50M", None, None, 0, "ACGT") # brin négatif - >>> pr = PairedRead(read1, read2) - >>> # On annote pr avec restr_index - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> pr.re_1 - (2, 200) - >>> pr.strand_1 - '+' - >>> pr.start_1 - 101 - >>> pr.end_1 - 151 - >>> pr.fragment_1 - [(2, (100, 200))] - >>> pr.re_2 - (4, 400) - >>> pr.strand_2 - '+' - >>> pr.start_2 - 300 - >>> pr.end_2 - 350 - >>> pr.re_1 - (2, 200) - >>> pr.re_proximal_1 - (1, 100) - >>> pr.re_2 - (4, 400) - >>> pr.re_proximal_2 - (3, 300) - >>> pr.fragment_2 - [(4, (300, 400))] - >>> # On appelle maintenant Classified_intra - >>> label = Classified_intra(pr, tolerance=5) - >>> label - 'Up-DLBoth' - >>> ################################################################# - >>> # Exemple 1 : "Close" - >>> # Conditions : re_distant True, orientation_divergent True, - >>> # single_fragment True, frag_identical False, frag_adjacent False. - >>> read1 = ("r1", 16, "chr1", 256, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 326, 30, "50M", None, None, 0, "ACGT") # read2 sur '-' - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=5) - >>> label - 'Close-None' - >>> ################################################################# - >>> # Exemple 2 : "Up" - >>> read1 = ("r1", 0, "chr1", 42, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 265, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=5) - >>> label - 'Up-None' - >>> ################################################################# - >>> # Exemple 3 : "Far" - >>> read1 = ("r1", 0, "chr1", 42, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 18, "chr1", 358, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=5) - >>> label - 'Far-None' - >>> ################################################################# - # Exemple 4 : "Joined-DLLeft" - >>> read1 = ("r1", 0, "chr1", 102, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 291, 30, "70M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=4) - >>> label - 'Joined-DLLeft' - >>> label = Classified_intra(pr, tolerance=1) - >>> label - 'Joined-None' - >>> label = Classified_intra(pr, tolerance=10) - >>> label - 'Joined-DLBoth' - >>> ################################################################# - >>> # Exemple 5 : "SelfCircle" - >>> read1 = ("r1", 48, "chr1", 140, 30, "32M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 151, 30, "45M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=5) - >>> label - 'SelfCircle-None' - >>> ################################################################# - # Exemple 6 : "Joined" (informative) - >>> read1 = ("r1", 0, "chr1", 120, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 254, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=5) - >>> label - 'Joined-None' - >>> ################################################################# - >>> # Exemple 7 : "DLLeft" - >>> read1 = ("r1", 0, "chr1", 110, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 185, 30, "30M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=1) - >>> label - 'Other-None' - >>> label = Classified_intra(pr, tolerance=10) - >>> label - 'DL-DLLeft' - >>> ################################################################# - >>> # Exemple 8 : "DLBoth" - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 195, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=5) - >>> label - 'DL-DLBoth' - >>> ################################################################# - >>> # Exemple 9 : "Other" - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> undigested = annotate_paired_read(pr, restr_index, tolerance=0) - >>> label = Classified_intra(pr, tolerance=5) - >>> label - 'Other-None' - """ - mask = compute_mask(paired_read, tolerance) - - # --------------------------------------------------------- - # 1) CONTACT VALIDE (re_distant == True) - # --------------------------------------------------------- - if mask["re_distant"]: - # On sous-classe ensuite selon l'orientation - # et le fait d'être single_fragment => ni identique ni adjacent - # NOTE : vous pouvez adapter la logique si fragment_multiple est autorisé. - - # => "non identique et non adjacent" => pas identical et pas adjacent - - if mask["orientation_divergent"]: - # => "Close" si single and not identical/adjacent - if mask["dangling_both"]: - return "Close-DLBoth" - elif mask["dangling_left"]: - return "Close-DLLeft" - elif mask["dangling_right"]: - return "Close-DLRight" - else: - return "Close-None" # par défaut - - elif mask["orientation_identical_up"]: - if mask["dangling_both"]: - return "Up-DLBoth" - if mask["dangling_left"]: - return "Up-DLLeft" - elif mask["dangling_right"]: - return "Up-DLRight" - else: - return "Up-None" - - elif mask["orientation_convergent"]: - # => "Far" - if mask["dangling_both"]: - return "Far-DLBoth" - elif mask["dangling_left"]: - return "Far-DLLeft" - elif mask["dangling_right"]: - return "Far-DLRight" - else: - return "Far-None" - - elif mask["orientation_identical_down"]: - # => "Down" - if mask["dangling_both"]: - return "Down-DLBoth" - elif mask["dangling_left"]: - return "Down-DLLeft" - elif mask["dangling_right"]: - return "Down-DLRight" - else: - return "Down-None" - - # Si on ne matche aucune orientation "possible", on passe en "Erreur_Algorythmique" - return "Erreur_Algorythmique" # Amas Zingue ! - - # --------------------------------------------------------- - # 2) CONTACT INFORMATIF (re_distant == False) - # --------------------------------------------------------- - # a) SelfCircle: orientation_divergent + re_adjacent + single_fragment + frag_identical - if ( - mask["orientation_divergent"] - and mask["re_adjacent"] - and mask["single_fragment"] - and mask["frag_identical"] - ): - return "SelfCircle-None" - - # b) Joined : orientation_convergent + re_identical - if mask["orientation_convergent"] and mask["re_identical"]: - # Si single => adjacent - if mask["single_fragment"] and mask["frag_adjacent"]: - # Combinaison spécifique : - if mask["dangling_both"]: - return "Joined-DLBoth" - - elif mask["dangling_left"]: - return "Joined-DLLeft" - - elif mask["dangling_right"]: - return "Joined-DLRight" - else: - return "Joined-None" - - # Si fragment multiple => frag_multiple_adjacent ou frag_multiple_identical - if mask["multiple_fragment"] and ( - mask["frag_multiple_adjacent"] or mask["frag_multiple_identical"] - ): - return "Joined-None" - - # --------------------------------------------------------- - # 3) SEMI-BIOTINYLÉS : single + frag_identical + orientation_convergent - # + dangling - # --------------------------------------------------------- - if ( - mask["single_fragment"] - and mask["frag_identical"] - and mask["orientation_convergent"] - ): - # On regarde si c'est dangling left, right, both - dl = mask["dangling_left"] - dr = mask["dangling_right"] - db = mask["dangling_both"] # généralement = dl & dr - - if db: - return "DL-DLBoth" - elif dl: - return "DL-DLLeft" - elif dr: - return "DL-DLRight" - else: - return "Other-None" # :o - - # --------------------------------------------------------- - # 4) DIVERS => "Other" - # --------------------------------------------------------- - return "Other-None" diff --git a/hi-classifier/main.py b/hi-classifier/main.py deleted file mode 100644 index 4ef99567f154b4c359483c91a0f24509600b2e18..0000000000000000000000000000000000000000 --- a/hi-classifier/main.py +++ /dev/null @@ -1,106 +0,0 @@ -""" -This script is a the Hi-Classifier project, designed to process paired-end FASTQ files by fragmenting DNA sequences at specified restriction enzyme sites. - -Copyright © 2025 Samir Bertache - -SPDX-License-Identifier: AGPL-3.0-or-later - -=============================================================================== - -This program is free software: you can redistribute it and/or modify it under -the terms of the GNU Affero General Public License as published by the -Free Software Foundation, either version 3 of the License, or (at your option) -any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -See the GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see <https://www.gnu.org/licenses/>. -""" - -import argparse -import logging - -from classify import categorize - -# Setup logging -logging.basicConfig(level=logging.INFO) -__version__ = "0.0.1" - - -def main_cli(logging): - parser = argparse.ArgumentParser( - description="Process Hi-C BAM files to filter and classify them." - ) - parser.add_argument( - "--bam_for_file", - type=str, - help="Path to forward BAM file.", - required=True, - ) - parser.add_argument( - "--bam_rev_file", - type=str, - help="Path to reverse BAM file.", - required=True, - ) - parser.add_argument( - "--output", - type=str, - help='Path/Name to output file. Following this format : output + "_R1/2_" + TypeOfPair', - required=True, - ) - parser.add_argument( - "--num_threads", - type=int, - help="Total number of threads.", - required=True, - default=4, - ) - parser.add_argument( - "--mapq_score", - type=int, - help="Minimal MAPQ score conserved", - required=True, - default=1, - ) - parser.add_argument( - "--enzyme", - type=str, - help="Enzyme utilisé pour l'expérimentation Hi-C", - required=True, - default=1, - ) - parser.add_argument( - "--fasta_ref", - type=str, - help="Genome de référence", - required=True, - ) - parser.add_argument( - "--len_max", - type=int, - help="Longueur maximal des fragments théoriques séquencés", - required=False, - default=1500, - ) - parser.add_argument( - "--tolerance", - type=int, - help="Tolérance en pb pour considérer un dangling end", - required=False, - default=1, - ) - parser.add_argument( - "--version", action="version", version=f"%(prog)s {__version__}" - ) - - args = parser.parse_args() - categorize(args, logging) - - -if __name__ == "__main__": - main_cli(logging) diff --git a/hi-classifier/positionfrag.py b/hi-classifier/positionfrag.py deleted file mode 100644 index f69cc6ec72339cc410493f599b89f8f4bf335b14..0000000000000000000000000000000000000000 --- a/hi-classifier/positionfrag.py +++ /dev/null @@ -1,471 +0,0 @@ -from bisect import bisect_left - - -def get_re_site_for_read(chrom, pos, strand, restr_index): - """ - Pour une position 'pos' sur le chromosome 'chrom' et pour un brin donné (strand: '+' ou '-'), - renvoie le tuple (site_num, position) du site de restriction le plus proche dans la direction du read, - en ignorant le site si sa position est exactement égale à pos (la première base du read). - - - Pour un read forward ('+'): retourne le premier site dont la position est > pos. - - Pour un read reverse ('-'): retourne le dernier site dont la position est < pos. - - Si aucun site n'est trouvé, retourne None. - - Ne prends pas en compte la position initiale, cela permettra d'analyser les lectures valides qui démarre - sur un site de restriction ! - - Examples: - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400), - ... (5, 500) - ... ], - ... "fragments": [] - ... } - ... } - >>> get_re_site_for_read("chr1", 50, "+", restr_index) - (1, 100) - >>> get_re_site_for_read("chr1", 250, "+", restr_index) - (3, 300) - >>> get_re_site_for_read("chr1", 450, "+", restr_index) - (5, 500) - >>> get_re_site_for_read("chr1", 200, "-", restr_index) - (1, 100) - >>> get_re_site_for_read("chr1", 450, "-", restr_index) - (4, 400) - """ - sites = restr_index[chrom]["sites"] # Liste de tuples (site_num, position) - if not sites: - return None - positions = [s[1] for s in sites] - if strand == "+": - idx = bisect_left(positions, pos) - if idx < len(positions): - if positions[idx] == pos: - # Si le site trouvé est exactement à pos, on retourne le suivant (s'il existe) - if idx < len(positions) - 1: - return sites[idx + 1] - else: - return None - return sites[idx] - else: - return sites[-1] - elif strand == "-": - idx = bisect_left(positions, pos) - if idx == 0: - return None - else: - if positions[idx - 1] == pos: - if idx - 1 > 0: - return sites[idx - 2] - else: - return None - return sites[idx - 1] - else: - raise ValueError("Orientation inconnue : " + strand) - - -def get_proximal_re_site_for_read(chrom, pos, strand, restr_index): - """ - Pour une position 'pos' sur le chromosome 'chrom' et pour un brin donné (strand: '+' ou '-'), - renvoie le tuple (site_num, position) du site de restriction le plus proche dans la direction du read. - - Pour un read forward ('+'): retourne le premier site dont la position est >= pos. - - Pour un read reverse ('-'): retourne le dernier site dont la position est <= pos. - - Examples: - >>> # On définit un index de restriction fictif - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)) - ... ] - ... } - ... } - >>> # Test forward ('+') - >>> get_proximal_re_site_for_read("chr1", 200, "+", restr_index) - (2, 200) - >>> get_proximal_re_site_for_read("chr1", 450, "+", restr_index) - (4, 400) - >>> # Test reverse ('-') - >>> get_proximal_re_site_for_read("chr1", 249, "-", restr_index) - (2, 200) - >>> get_proximal_re_site_for_read("chr1", 450, "-", restr_index) - (4, 400) - """ - sites = restr_index[chrom]["sites"] # Liste de tuples (site_num, position) - if not sites: - return None - positions = [s[1] for s in sites] - if strand == "+": - idx = bisect_left(positions, pos) - if idx < len(positions): - return sites[idx] - else: - return sites[-1] - elif strand == "-": - idx = bisect_left(positions, pos) - if idx == 0: - return sites[0] - else: - return sites[idx - 1] - else: - raise ValueError("Orientation inconnue : " + strand) - - -def get_re_proximal(chrom, pos, strand, restr_index): - """ - Pour une position 'pos' sur le chromosome 'chrom', renvoie le site de restriction - le plus proche, en comparant les sites pour les deux orientations. - - Sens = site de restriction pour le brin "+" (forward) - AntiSens = site de restriction pour le brin "-" (reverse) - - Renvoie le tuple (site_num, position) dont la position est la plus proche de pos. - - Examples: - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)) - ... ] - ... } - ... } - >>> # pos=210 => côté forward => (3,300), distance=90 - >>> # côté reverse => (2,200), distance=10 - >>> # => On attend (2,200) comme site le plus proche - >>> get_re_proximal("chr1", 210, "+", restr_index) - (2, 200) - >>> # pos=250 => forward => (3,300), distance=50 - >>> # reverse => (2,200), distance=50 => égalité => on renvoie AntiSens - >>> get_re_proximal("chr1", 240, "+", restr_index) - (2, 200) - """ - Sens = get_proximal_re_site_for_read(chrom, pos, "+", restr_index) - AntiSens = get_proximal_re_site_for_read(chrom, pos, "-", restr_index) - - if Sens is None: - return AntiSens - if AntiSens is None: - return Sens - - if abs(Sens[1] - pos) < abs(AntiSens[1] - pos): - return Sens - else: - return AntiSens - - -def get_fragments_for_read(chrom, start, end, restr_index): - """ - Renvoie la liste de fragments (chaque fragment étant un tuple (frag_num, (frag_start, frag_end))) - dans lesquels se trouve la lecture. - L'intervalle de la lecture est défini par [min(start, end), max(start, end)). - - Il faut plus d'une base qui se chevauche pour qu'on considère le fragment - - Examples: - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)) - ... ] - ... } - ... } - >>> # Lecture recouvre [150, 250], donc chevauche fragments 2 (100,200) et 3 (200,300) - >>> get_fragments_for_read("chr1", 150, 250, restr_index) - [(2, (100, 200)), (3, (200, 300))] - >>> # Lecture recouvre [50, 99], chevauche fragment 1 (0,100) - >>> get_fragments_for_read("chr1", 99, 50, restr_index) - [(1, (0, 100))] - """ - low = min(start, end) - high = max(start, end) - fragments = restr_index[chrom]["fragments"] - overlapping_fragments = [] - for frag in fragments: - frag_num, (frag_start, frag_end) = frag - if low < frag_end and high > frag_start: - overlapping_fragments.append(frag) - return overlapping_fragments - - - -def check_repeat_event(paired_read, tolerance): - """ - Vérifie et élimine les doublons dans le comptage des sites non digérés afin - d'éviter un double comptage lorsqu'un même événement apparaît à la fois comme - site interne et comme dangling end. - - Dans certaines conditions, le site interne (re_intern) peut être identique au - site proximal (re_proximal). Si tel est le cas et si le critère de dangling - (défini par la tolérance en pb) est rempli, alors l'événement est considéré - comme déjà comptabilisé via les dangling ends et le comptage du site interne - est annulé. - - Paramètres: - paired_read (object): Objet contenant les informations de la paire de lectures. - Doit posséder les attributs suivants : - - re_intern_1 et re_proximal_1 pour la première lecture - - re_intern_2 et re_proximal_2 pour la deuxième lecture - site_non_digested_left (bool): Indique si le site non digéré à gauche est initialement compté. - site_non_digested_right (bool): Indique si le site non digéré à droite est initialement compté. - tolerance (int): Tolérance en pb utilisée pour appliquer le critère de dangling et éviter - la redondance dans le comptage. - - Renvoie: - tuple: (site_non_digested_left, site_non_digested_right) - Les indicateurs mis à jour, où un site est désactivé (False) s'il est redondant - avec un dangling end, conformément à la tolérance définie. - - Examples: - >>> # Supposons que pour read1, re_intern_1 vaut 400 et re_proximal_1 vaut 401. - >>> # Avec une tolérance de 3, abs(400-401)=1 <= 3, donc la condition est remplie. - >>> class Dummy: - ... pass - >>> pr = Dummy() - >>> pr.start_1 = 400 - >>> pr.re_intern_1 = (4, 400) - >>> pr.re_proximal_1 = (4, 400) - >>> pr.re_1 = (5, 500) # Différent de re_intern_1 → flag mis à True - >>> pr.start_2 = 797 - >>> pr.re_intern_2 = (8, 800) - >>> pr.re_proximal_2 = (8, 800) - >>> pr.re_2 = (8, 800) - >>> # Appel de la fonction avec tolérance 3. - >>> # Pour read1, site_non_digested_left devrait être remis à False. - >>> check_repeat_event(pr, tolerance=2) - (False, False) - >>> pr.re_2 = (9, 900) - >>> check_repeat_event(pr, tolerance=0) - (False, True) - >>> check_repeat_event(pr, tolerance=3) - (False, False) - >>> pr.start_1 = 400 - >>> pr.re_intern_1 = (4, 400) - >>> pr.re_proximal_1 = (4, 400) - >>> pr.re_1 = (5, 500) # Différent de re_intern_1 → flag mis à True - >>> pr.start_2 = 797 - >>> pr.re_intern_2 = (8, 800) - >>> pr.re_proximal_2 = (8, 800) - >>> pr.re_2 = (9, 900) - >>> check_repeat_event(pr, tolerance=4) - (False, False) - >>> check_repeat_event(pr, tolerance=1) - (False, True) - """ - site_non_digested_left = False - site_non_digested_right = False - - if paired_read.re_1 != paired_read.re_intern_1: - site_non_digested_left = True - if paired_read.re_2 != paired_read.re_intern_2: - site_non_digested_right = True - - if site_non_digested_left: - if abs(paired_read.re_intern_1[1] - paired_read.start_1) <= tolerance: - site_non_digested_left = False - if site_non_digested_right: - if abs(paired_read.re_intern_2[1] - paired_read.start_2) <= tolerance: - site_non_digested_right = False - - return site_non_digested_left, site_non_digested_right - - - - -def annotate_paired_read(paired_read, restr_index, tolerance): - """ - Pour une instance de PairedRead, met à jour ses attributs avec les informations extraites - à partir de l'index de restriction. - - Pour chaque read (read1 et read2), trois sites de restriction sont déterminés : - - - RE_Proximal : Le site de restriction le plus proche du début de la lecture, dans le sens du read. - (Utilisé pour le comptage des sites digérés non ligués, alias Dangling End) - - - RE : Le site de restriction le plus proche de la fin de la lecture (le dernier élément mappé - dans le sens du read). Ce site est utilisé comme base de travail pour la classification. - - - RE_Interne : Le site de restriction le plus proche du milieu de la lecture. Ce site est utilisé pour - le comptage des sites non digérés et non ligués. Si ce site est différent de RE, alors, - il porte l'information de site non digérés. - - De plus, la fonction extrait la liste des fragments couvrant l'intervalle [start, end) de la lecture. - - NB : L'intervalle est défini comme [start, end). - - Parameters: - paired_read (PairedRead): L'objet PairedRead à annoter. - restr_index (dict): Dictionnaire d'index des sites de restriction par chromosome, par exemple: - { - "chr1": { - "sites": [(1, 100), (2, 200), (3, 300), (4, 400)], - "fragments": [(1, (0, 100)), (2, (100, 200)), (3, (200, 300)), - (4, (300, 400)), (5, (400, 500))] - }, - "chr2": { ... } - } - - Modifie l'objet paired_read en ajoutant les attributs suivants pour chaque read : - - re_1 et re_2 : RE_Valide (RE), site le plus proche de la fin du read. - - re_proximal_1 et re_proximal_2 : site de restriction le plus proche du début (dans le sens du read). - - re_intern_1 et re_intern_2 : RE_Interne, site le plus proche du milieu du read. - - fragment_1 et fragment_2 : liste des fragments couvrant l'intervalle [start, end). - - Examples: - >>> # On simule un index + un PairedRead - >>> from readmanager import PairedRead, flag_to_strand - >>> restr_index = { - ... "chr1": { - ... "sites": [ - ... (1, 100), - ... (2, 200), - ... (3, 300), - ... (4, 400), - ... (5, 500) - ... ], - ... "fragments": [ - ... (1, (0, 100)), - ... (2, (100, 200)), - ... (3, (200, 300)), - ... (4, (300, 400)), - ... (5, (400, 500)) - ... ] - ... } - ... } - >>> read1 = ("r1", 0, "chr1", 249, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 447, 30, "50M", None, None, 0, "ACGT") # brin négatif - >>> pr = PairedRead(read1, read2) - >>> annotate_paired_read(pr, restr_index, tolerance=0) - ['chr1', (4, 400)] - >>> pr.re_1, pr.re_2 - ((3, 300), (3, 300)) - >>> pr.re_2 - (3, 300) - >>> pr.re_intern_1, pr.re_intern_2 - ((3, 300), (4, 400)) - >>> pr.re_proximal_1, pr.re_proximal_2 - ((2, 200), (4, 400)) - >>> pr.fragment_1 - [(3, (200, 300))] - >>> pr.fragment_2 - [(4, (300, 400)), (5, (400, 500))] - >>> read1 = ("r1", 18, "chr1", 239, 30, "50M", None, None, 0, "ACGT") # brin négatif - >>> read2 = ("r2", 16, "chr1", 419, 30, "50H10M", None, None, 0, "ACGT") # brin négatif - >>> pr = PairedRead(read1, read2) - >>> annotate_paired_read(pr, restr_index, tolerance=0) - ['chr1', (2, 200)] - >>> pr.re_intern_1, pr.re_intern_2 - ((2, 200), (4, 400)) - >>> pr.re_1, pr.re_2 - ((1, 100), (4, 400)) - >>> pr.fragment_1 - [(2, (100, 200)), (3, (200, 300))] - >>> pr.fragment_2 - [(5, (400, 500))] - >>> read1 = ("r1", 0, "chr1", 198, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 447, 30, "50M", None, None, 0, "ACGT") # brin négatif - >>> pr = PairedRead(read1, read2) - >>> annotate_paired_read(pr, restr_index, tolerance=1) - ['chr1', (2, 200), 'chr1', (4, 400)] - >>> annotate_paired_read(pr, restr_index, tolerance=3) - ['chr1', (4, 400)] - >>> read1 = ("r1", 0, "chr1", 198, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 407, 30, "50M", None, None, 0, "ACGT") # brin négatif - >>> pr = PairedRead(read1, read2) - >>> annotate_paired_read(pr, restr_index, tolerance=1) - ['chr1', (2, 200)] - >>> annotate_paired_read(pr, restr_index, tolerance=3) - False - >>> read1 = ("r1", 0, "chr1", 1198, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 0, "chr1", 4017, 30, "50M", None, None, 0, "ACGT") # brin négatif - >>> pr = PairedRead(read1, read2) - >>> annotate_paired_read(pr, restr_index, tolerance=3) - False - """ - from readmanager import verify_re_sites - - # Pour read1 - chrom1 = paired_read.chrom_1 - pos1 = paired_read.start_1 - end1 = paired_read.end_1 - strand1 = paired_read.strand_1 - paired_read.re_intern_1 = get_re_site_for_read( - chrom1, pos1, strand1, restr_index - ) - paired_read.re_1 = get_re_site_for_read(chrom1, end1, strand1, restr_index) - paired_read.re_proximal_1 = get_re_proximal( - chrom1, pos1, strand1, restr_index - ) - paired_read.fragment_1 = get_fragments_for_read( - chrom1, paired_read.start_1, paired_read.end_1, restr_index - ) - - # Pour read2 - chrom2 = paired_read.chrom_2 - pos2 = paired_read.start_2 - end2 = paired_read.end_2 - strand2 = paired_read.strand_2 - paired_read.re_intern_2 = get_re_site_for_read( - chrom2, pos2, strand2, restr_index - ) - paired_read.re_2 = get_re_site_for_read(chrom2, end2, strand2, restr_index) - paired_read.re_proximal_2 = get_re_proximal( - chrom2, pos2, strand2, restr_index - ) - paired_read.fragment_2 = get_fragments_for_read( - chrom2, paired_read.start_2, paired_read.end_2, restr_index - ) - - if verify_re_sites(paired_read): - # Une fois annoté, on vérifie qu'il n'y ai pas de reddondance : - site_non_digested_left, site_non_digested_right = check_repeat_event( - paired_read, tolerance - ) - - if site_non_digested_left and site_non_digested_right: - return [ - paired_read.chrom_1, - paired_read.re_intern_1, - paired_read.chrom_2, - paired_read.re_intern_2, - ] - elif site_non_digested_left: - return [paired_read.chrom_1, paired_read.re_intern_1] - elif site_non_digested_right: - return [paired_read.chrom_2, paired_read.re_intern_2] - else: - return False - else: - return False diff --git a/hi-classifier/processing.py b/hi-classifier/processing.py deleted file mode 100644 index 8a7d39c381a965905b6a6ea9b2f5a4a4b2d483f3..0000000000000000000000000000000000000000 --- a/hi-classifier/processing.py +++ /dev/null @@ -1,216 +0,0 @@ - -def get_label(current_pair, dict_digested, tolerance): - """ - Annotation spécifique Intra et Interchromosomique - Vérification de l'annotation (présence des sites de restriction) - Récupération des labels - Récupération des information de non digestion - """ - from filter_interchrom import Classified_inter - from filter_intrachrom import Classified_intra - from positionfrag import annotate_paired_read - from readmanager import verify_re_sites - - try: - if current_pair.chrom_1 == current_pair.chrom_2: - # Récupération des informations sur la paire de read - undigested = annotate_paired_read( - current_pair, dict_digested, tolerance - ) - if not verify_re_sites(current_pair): - label = False - undigested = False - else: - label = Classified_intra(current_pair, tolerance) - - elif current_pair.interchrom: - undigested = annotate_paired_read( - current_pair, - dict_digested, - tolerance, - ) - if not verify_re_sites(current_pair): - label = False - undigested = False - else: - label = Classified_inter(current_pair, tolerance) - else: - raise ValueError("Chromosone Name's problems") - - return label, undigested - - except Exception as e: - print(f"Error in labelling: {e}") - return False, False - - -def sending_to_queue( - data, label, output_valide_queue, output_other_queue, output_del_queue -): - """ - Sending pair to specific queue - """ - if label: - class_current = label.split("-")[0] - if class_current in { - "Up", - "Down", - "Close", - "Far", - }: - #print() - output_valide_queue.put((label, [data[0], data[1]])) - # Sending pair to dangling queue - elif class_current in {"DL", "Joined", "SelfCircle"}: - #print() - output_other_queue.put((label, [data[0], data[1]])) - else: - label = "Other-None" - #print() - output_del_queue.put((label, [data[0], data[1]])) - - # Sending pair to del queue - else: - #print() - label = "Other-None" - output_del_queue.put((label, [data[0], data[1]])) - - -def qualitad(current_pair, mapq_score_max): - """ - Vérifie que la qualité minimale requise pour les lectures soit respecté - """ - mapq_score_1 = current_pair.mapq_1 - mapq_score_2 = current_pair.mapq_2 - - if mapq_score_2 and mapq_score_1: - if (mapq_score_1 > mapq_score_max) and (mapq_score_2 > mapq_score_max): - return True - return False - - -def process_items( - input_queue, - output_valide_queue, - output_other_queue, - output_del_queue, - Counts_Dang_Intra_Queue, - Counts_Biot_Intra_Queue, - Counts_Dang_Inter_Queue, - Counts_Biot_Inter_Queue, - Counts_Undigested_Site_Queue, - mapq_score, - dict_digested, - ligation_site_list, - restriction_site_list, - len_max, - tolerance=1, -): - """ - - Parameters: - input_queue (Queue): Queue to get read pairs. - output_queue (Queue): Queue to put processed read pairs. - """ - import sys - - from auxiliary import check_data, terminaison, pair_complete - from event_collector import ( - length_checker, - update_restriction_counts, - update_undigested_counts, - ) - from readmanager import PairedRead - - try: - print("Start processing") - # Création des dictionnaire de comptage du processus courant : - # Intrachromosomique - Counts_Dangling_Intra = dict() - Counts_Biotin_Intra = dict() - # Interchromosomique - Counts_Biotin_Inter = dict() - Counts_Dangling_Inter = dict() - # Undigested - Counts_Undigested_Site = dict() - - while True: - data = input_queue.get() - label = False - - if data is None: - terminaison( - output_valide_queue, - output_other_queue, - output_del_queue, - Counts_Dang_Intra_Queue, - Counts_Biot_Intra_Queue, - Counts_Dang_Inter_Queue, - Counts_Biot_Inter_Queue, - Counts_Undigested_Site_Queue, - Counts_Dangling_Intra, - Counts_Biotin_Intra, - Counts_Dangling_Inter, - Counts_Biotin_Inter, - Counts_Undigested_Site, - ) - print("Ending processing") - break - # print(data[0], flush=True) # ----> 1000 /1000 lignes - if check_data(data) and (pair_complete(data)): - try: - # print(data, PairComplete(data)) - current_pair = PairedRead(data[0], data[1]) - if qualitad(current_pair, mapq_score): - # print(data[0], flush=True) # ----> 1000 /1000 lignes - # Get Information about pair - label, undigested = get_label( - current_pair, dict_digested, tolerance - ) - # print(data[0], flush=True) # ----> 1000 /1000 lignes - - if label: - # Check length validity - label, undigested = length_checker( - current_pair, label, undigested, len_max - ) - - # Count undigested site on read - if undigested: - Counts_Undigested_Site = ( - update_undigested_counts( - undigested, Counts_Undigested_Site - ) - ) - - # Count all information about digested site - if "Other" not in label: - ( - Counts_Biotin_Intra, - Counts_Dangling_Intra, - Counts_Biotin_Inter, - Counts_Dangling_Inter, - ) = update_restriction_counts( - paired_read=current_pair, - label=label, - counts_vl_intra=Counts_Biotin_Intra, - counts_dl_intra=Counts_Dangling_Intra, - counts_vl_inter=Counts_Biotin_Inter, - counts_dl_inter=Counts_Dangling_Inter, - ) - - except Exception as e: - print(f"Error in PairedRead: {e}") - - # Sending pair to specific queue (valid or dangling, or others) - sending_to_queue( - data, - label, - output_valide_queue, - output_other_queue, - output_del_queue, - ) - - except ValueError as e: - print(f"Error in process_items: {e}") - sys.exit() diff --git a/hi-classifier/processmanager.py b/hi-classifier/processmanager.py deleted file mode 100644 index b86740ff7eca5a9a0c80af583adc56880370c9bb..0000000000000000000000000000000000000000 --- a/hi-classifier/processmanager.py +++ /dev/null @@ -1,63 +0,0 @@ -import os -import sys -import traceback -from multiprocessing import Process, Queue - - -class WorkerProcess(Process): - def __init__(self, target, args, error_queue): - super().__init__(target=target, args=args) - self.error_queue = error_queue - - def run(self): - try: - super().run() - except Exception as e: - print(f"Worker error: {e}") - self.error_queue.put((str(e), traceback.format_exc())) - sys.exit(1) - - -class ProcessManager: - def __init__(self): - self.processes = [] - self.error_queue = Queue() - self.main_pid = os.getpid() # Stocker le PID du processus parent - - def start_worker(self, target, args): - worker = WorkerProcess(target, args, self.error_queue) - self.processes.append(worker) - worker.start() - - def running(self): - return any(p.is_alive() for p in self.processes) - - def check_processes(self): - for p in self.processes: - if not p.is_alive() and p.exitcode != 0: - print("Worker process crashed!") - return False - - if not self.error_queue.empty(): - error, tb = self.error_queue.get() - print(f"Worker error: {error}") - self.shutdown() - return True - - def shutdown(self): - # N'exécute le shutdown que dans le processus parent initial - if os.getpid() != self.main_pid: - return - for p in self.processes: - try: - if p.is_alive(): - p.terminate() - p.join() - except AssertionError: - # En cas d'erreur, on ignore et on continue - pass - - def handle_signal(self, signum, frame): - print("Received shutdown signal") - self.shutdown() - os._exit(1) # Pour éviter les erreurs dans le terminal lors des Ctrl+C diff --git a/hi-classifier/readmanager.py b/hi-classifier/readmanager.py deleted file mode 100644 index 1586e7cd7259c021a4be7e18f08418bae842c800..0000000000000000000000000000000000000000 --- a/hi-classifier/readmanager.py +++ /dev/null @@ -1,330 +0,0 @@ -import re - -# [ -# query_name, # Nom de la lecture -# flag, # Flag de l'alignement -# reference_name, # Nom de la référence (chromosome) -# reference_start, # Position de début sur la référence -# mapping_quality, # Qualité de l'alignement -# cigarstring, # Chaîne CIGAR -# next_reference_name, # Nom de la référence pour la paire suivante -# next_reference_start, # Position de la paire suivante -# template_length, # Longueur du fragment (tlen) -# query_sequence, # Séquence de la lecture -# query_qualities, # Qualités (liste d'entiers) ou None -# ] - -# Pour extraire toutes les paires (nombre, opération) du CIGAR -CIGAR_PATTERN = re.compile(r"(\d+)([MIDNSHP=X])") - - -def cigar_reference_length(cigar_str: str) -> int: - """ - Calcule la longueur consommée sur la référence à partir - d'une chaîne CIGAR (ex.: "10S40M2D10M5I30M", "100M2I3D50M", etc.). - - Selon ce code, on ajoute le nombre associé aux opérations - qui consomment la référence : M, I, =, X. (Soft-Clipping à étudier) - Les autres (D, N, H, P, etc.) ne sont pas ajoutées au total, - même si dans la spécification SAM, certaines (comme D) consomment - effectivement la référence. - - Examples: - >>> cigar_reference_length("10S40M2D10M5I30M") - 85 - >>> cigar_reference_length("100M2I3D50M") - 152 - >>> cigar_reference_length("") - 0 - >>> cigar_reference_length(None) - 0 - >>> cigar_reference_length("4M") - 4 - >>> cigar_reference_length("4X15M") - 19 - >>> cigar_reference_length("8H4M8H") - 4 - """ - if not cigar_str: - return 0 - - length = 0 - tokens = CIGAR_PATTERN.findall(cigar_str) - for num_str, op in tokens: - n = int(num_str) - # Seules ces opérations sont ajoutées dans cette implémentation - if op in ("M", "I", "=", "X"): - length += n - return length - - -def flag_to_strand(flag): - """ - Retourne '+' si le read est sur le brin positif, - '-' si le read est sur le brin négatif (bit 0x10). - - Examples: - >>> flag_to_strand(0) # Ni 0x10 ni 0x4 => brin positif - '+' - >>> flag_to_strand(16) # 0x10 => brin négatif - '-' - >>> flag_to_strand(40) # Lecture unmapped, on ne voit pas 0x10 => brin positif - '+' - >>> flag_to_strand(20) # 20 = 0x14 => 0x10 (negatif) + 0x4 (unmapped) - '-' - >>> flag_to_strand(18) # 20 = 0x14 => 0x10 (negatif) + 0x4 (unmapped) - '-' - >>> flag_to_strand(12) # 20 = 0x14 => 0x10 (negatif) + 0x4 (unmapped) - '+' - """ - return "-" if (flag & 0x10) else "+" - - -class PairedRead: - """ - Classe stockant une paire de lectures (read_1, read_2). - - Chaque 'read_tuple_i' est un tuple au format suivant: - [ - query_name, # Nom de la lecture - flag, # Flag de l'alignement - reference_name, # Nom de la référence (chromosome) - reference_start, # Position de début sur la référence - mapping_quality, # Qualité de l'alignement - cigarstring, # Chaîne CIGAR - next_reference_name,# ... - next_reference_start,# ... - template_length, # Longueur du fragment (tlen) - query_sequence, # Séquence de la lecture - query_qualities, # Qualités (liste d'entiers) ou None - ] - - Les attributs calculés incluent: - - start_1, end_1, strand_1 - - start_2, end_2, strand_2 - - interchrom (booléen), etc. - - - Examples: - >>> # read1 => brin +, commence à 100, CIGAR=50M => end_1=149 - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> # read2 => brin -, commence à 200, CIGAR=10M => end_2=191 - >>> read2 = ("r2", 16, "chr1", 200, 30, "10M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr - <PairedRead r1/r2 chr=chr1 strand=(+,-) pos=(100-150, 200-190)> - >>> pr.interchrom - False - >>> # Vérifions l'ordre: read1 est plus amont => on n'a pas besoin de swap - ... - >>> # Si on inversait les positions: - >>> read1b = ("r1b", 0, "chr1", 500, 30, "50M", None, None, 0, "ACGT") - >>> read2b = ("r2b", 16, "chr1", 100, 30, "10M", None, None, 0, "ACGT") - >>> prb = PairedRead(read1b, read2b) - >>> prb - <PairedRead r2b/r1b chr=chr1 strand=(-,+) pos=(100-90, 500-550)> - >>> # On voit que read2b/r1b ont été inversés pour que la coord la plus à gauche soit first. - """ - - def __init__(self, read_tuple_1, read_tuple_2): - # ---------- Infos brutes read1 ---------- - self.query_name_1 = read_tuple_1[0] - self.flag_1 = int(read_tuple_1[1]) - self.chrom_1 = read_tuple_1[2] - self.start_1 = int(read_tuple_1[3]) - self.mapq_1 = read_tuple_1[4] - self.cigar_1 = read_tuple_1[5] - self.seq1 = read_tuple_1[9] - # Déterminer le brin - self.strand_1 = flag_to_strand(self.flag_1) - self.re_1 = None - self.re_intern_1 = None - self.fragment_1 = None - self.re_proximal_1 = None - - # ---------- Infos brutes read2 ---------- - self.query_name_2 = read_tuple_2[0] - self.flag_2 = int(read_tuple_2[1]) - self.chrom_2 = read_tuple_2[2] - self.start_2 = int(read_tuple_2[3]) - self.mapq_2 = read_tuple_2[4] - self.cigar_2 = read_tuple_2[5] - self.seq2 = read_tuple_2[9] - - self.strand_2 = flag_to_strand(self.flag_2) - self.re_2 = None - self.re_intern_2 = None - self.fragment_2 = None - self.re_proximal_2 = None - - # --- Calcul end_1 selon self.strand_1 --- - self.end_1 = None - if self.start_1 is not None and self.cigar_1: - ref_len_1 = cigar_reference_length(self.cigar_1) - if self.strand_1 == "+": - # Brin positif => s'étend à droite - self.end_1 = self.start_1 + ref_len_1 - else: - # Brin négatif => s'étend à gauche - self.end_1 = self.start_1 - ref_len_1 - - # --- Calcul end_2 selon self.strand_2 --- - self.end_2 = None - if self.start_2 is not None and self.cigar_2: - ref_len_2 = cigar_reference_length(self.cigar_2) - if self.strand_2 == "+": - self.end_2 = self.start_2 + int(ref_len_2) - else: - self.end_2 = self.start_2 - int(ref_len_2) - - # Indique si c'est interchromosomique - self.interchrom = self.chrom_1 != self.chrom_2 - - # --- Imposer l'ordre si même chromosome --- - if ( - self.chrom_1 == self.chrom_2 - and self.start_1 is not None - and self.start_2 is not None - ): - # Compare la coordonnée la plus à gauche - leftmost_1 = ( - min(self.start_1, self.end_1) - if self.end_1 is not None - else self.start_1 - ) - leftmost_2 = ( - min(self.start_2, self.end_2) - if self.end_2 is not None - else self.start_2 - ) - - if leftmost_2 < leftmost_1: - self._swap_reads() - - # --- Imposer l'ordre si différents chromosomes --- - if self.chrom_1 > self.chrom_2: - # Ordre non numérique ! - self._swap_reads() - - def _swap_reads(self): - """ - Échange toutes les infos de read1 et read2 si read2 est plus en amont. - - Examples: - >>> # On donne à read1 chr1, start=500, et à read2 chr2, start=100 - >>> # => le code ne va pas faire le swap dans __init__ car chrom_1 != chrom_2 - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 500, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.query_name_1, pr.query_name_2 - ('r2', 'r1') - >>> pr._swap_reads() - >>> pr.query_name_1, pr.query_name_2 - ('r1', 'r2') - >>> read1 = ("r1", 0, "chr3", 500, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr2", 100, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr.query_name_1, pr.query_name_2 - ('r2', 'r1') - >>> pr._swap_reads() - >>> pr.query_name_1, pr.query_name_2 - ('r1', 'r2') - """ - # On échange : query_name, flag, chrom, start, end, mapq, cigar, strand - (self.query_name_1, self.query_name_2) = ( - self.query_name_2, - self.query_name_1, - ) - (self.flag_1, self.flag_2) = (self.flag_2, self.flag_1) - (self.chrom_1, self.chrom_2) = (self.chrom_2, self.chrom_1) - (self.start_1, self.start_2) = (self.start_2, self.start_1) - (self.end_1, self.end_2) = (self.end_2, self.end_1) - (self.mapq_1, self.mapq_2) = (self.mapq_2, self.mapq_1) - (self.cigar_1, self.cigar_2) = (self.cigar_2, self.cigar_1) - (self.strand_1, self.strand_2) = (self.strand_2, self.strand_1) - - (self.re_1, self.re_2) = (self.re_2, self.re_1) - (self.fragment_1, self.fragment_2) = (self.fragment_2, self.fragment_1) - (self.seq1, self.seq2) = (self.seq2, self.seq1) - (self.re_proximal_1, self.re_proximal_2) = ( - self.re_proximal_2, - self.re_proximal_1, - ) - - def __repr__(self): - """ - Génère une représentation textuelle de l'objet PairedRead. - - Examples: - >>> from readmanager import PairedRead - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> pr - <PairedRead r1/r2 chr=chr1 strand=(+,-) pos=(100-150, 200-150)> - """ - return ( - f"<PairedRead {self.query_name_1}/{self.query_name_2} " - f"chr={self.chrom_1 if not self.interchrom else 'inter'} " - f"strand=({self.strand_1},{self.strand_2}) " - f"pos=({self.start_1}-{self.end_1}, {self.start_2}-{self.end_2})>" - ) - - -def verify_re_sites(paired_read): - """ - Vérifie la présence de tous les sites de restriction attendus pour une paire de lectures. - - Pour chaque PairedRead, on attend que soient définis six sites de restriction : - - Pour la première lecture (read1) : - • re_proximal_1 : Le site de restriction le plus proche du début du read. - • re_intern_1 : Le site de restriction le plus proche du milieu (pour le comptage des sites non digérés). - • re_1 : Le site de restriction le plus proche de la fin du read. - - Pour la deuxième lecture (read2) : - • re_proximal_2 : Le site de restriction le plus proche du début du read. - • re_intern_2 : Le site de restriction le plus proche du milieu. - • re_2 : Le site de restriction le plus proche de la fin du read. - - Parameters: - paired_read (PairedRead): Instance de la classe PairedRead, - supposée être annotée préalablement avec les sites de restriction. - - Returns: - tuple: (status, missing) - - status (bool): True si tous les six sites sont présents (non None), False sinon. - - missing (list): Liste des noms des attributs manquants (ceux qui sont None). - - Examples: - >>> # Exemple où tous les sites sont présents - >>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT") - >>> read2 = ("r2", 16, "chr1", 200, 30, "50M", None, None, 0, "ACGT") - >>> pr = PairedRead(read1, read2) - >>> # Simulons l'annotation des sites de restriction - >>> pr.re_proximal_1 = (1, 100) - >>> pr.re_intern_1 = (2, 200) - >>> pr.re_1 = (3, 300) - >>> pr.re_proximal_2 = (4, 400) - >>> pr.re_intern_2 = (5, 500) - >>> pr.re_2 = (6, 600) - >>> verify_re_sites(pr) - True - - >>> # Exemple avec un site manquant - >>> pr.re_intern_2 = None - >>> verify_re_sites(pr) - False - """ - missing = [] - attributes = [ - "re_proximal_1", - "re_intern_1", - "re_1", - "re_proximal_2", - "re_intern_2", - "re_2", - ] - for attr in attributes: - if getattr(paired_read, attr) is None: - missing.append(attr) - return len(missing) == 0 diff --git a/hi-classifier/restrictionsite.py b/hi-classifier/restrictionsite.py deleted file mode 100644 index bbcfc11561319dfa18120f974128e06af0912fe8..0000000000000000000000000000000000000000 --- a/hi-classifier/restrictionsite.py +++ /dev/null @@ -1,244 +0,0 @@ -import re -import sys -from typing import Any, List, Tuple, Union - -from Bio.Restriction import RestrictionBatch -from Bio.Seq import Seq - - -def CaseAdaptation(List_Enzyme: Union[str, List[str]]) -> List[str]: - """ - Case sensitive enzymes adaptation. - - Convertit les noms d'enzymes (en minuscule, majuscule ou mélange) vers - les noms exacts attendus par Bio.Restriction. Si l'argument est une chaîne, - il est converti en liste à un seul élément. - - Returns: - List[str]: Liste d'enzymes adaptées en respectant la casse reconnue. - - Examples: - >>> # Exemple avec l'enzyme "arima" (majuscule/minuscule mal gérée) - >>> result = CaseAdaptation("arima") - >>> print(result) - ['DpnII', 'HinfI'] - """ - if isinstance(List_Enzyme, str): - List_Enzyme = [List_Enzyme] - - for i, enzyme in enumerate(List_Enzyme): - enzyme_lower = enzyme.lower() - if enzyme_lower == "hindiii": - List_Enzyme[i] = "HindIII" - elif enzyme_lower == "dpnii": - List_Enzyme[i] = "DpnII" - elif enzyme_lower == "bglii": - List_Enzyme[i] = "BglII" - elif enzyme_lower == "mboi": - List_Enzyme[i] = "MboI" - elif enzyme_lower == "arima": - # Double enzyme - List_Enzyme[i] = "DpnII" - List_Enzyme.append("HinfI") - return List_Enzyme - - -def FindLigaSite( - List_Enzyme: List[str], borderless: bool = False -) -> List[Tuple[re.Pattern, int]]: - """ - Finds the ligation sites for a given list of enzymes and their length. - - Bio.Restriction elucide() renvoie par exemple "N^GATC_N" pour DpnII, - où: - - '^' marque le site de coupure forward - - '_' marque le site de coupure reverse - - 'N' peut signifier une base quelconque (non spécifiée). - - Le 'give site' (partie avant la coupure reverse) est purifié en supprimant - '^' et d'éventuels 'N' redondants. - Le 'accept site' (partie après la coupure forward) est purifié en supprimant - '_' et d'éventuels 'N' redondants à la fin. - - Ligation site = give_site + accept_site (en remplaçant éventuellement 'N' par '.'). - Si le site n'est pas palindromique, on ajoute également son reverse complement. - - Examples: - >>> # Exemple avec DpnII, qui a un site palindromique GATC - >>> result = FindLigaSite(["DpnII"]) - >>> result - [(re.compile('GATCGATC'), 4)] - """ - restriction_batch = RestrictionBatch(List_Enzyme) - give_list: List[str] = [] - accept_list: List[str] = [] - ligation_site_list: List[Tuple[re.Pattern, int]] = [] - - for enz in restriction_batch: - site = enz.elucidate() - fw_cut = site.find("^") - rev_cut = site.find("_") - - # Purify give site (jusqu'à rev_cut) - give_site = site[:rev_cut].replace("^", "") - # Retirer d'éventuels 'N' redondants en début - while give_site and give_site[0] == "N": - give_site = give_site[1:] - give_list.append(give_site) - - # Purify accept site (à partir de fw_cut+1) - accept_site = site[fw_cut + 1 :].replace("_", "") - # Retirer d'éventuels 'N' redondants en fin - while accept_site and accept_site[-1] == "N": - accept_site = accept_site[:-1] - accept_list.append(accept_site) - - # Combine give_site et accept_site - for give_site in give_list: - for accept_site in accept_list: - ligation_site = (give_site + accept_site).replace("N", ".") - compiled_regex = re.compile(ligation_site) - - # Longueur selon le paramètre borderless - if borderless: - length = len(give_site) + len(accept_site) - else: - length = len(give_site) - - ligation_site_list.append((compiled_regex, length)) - - # Reverse complement - reverse_complement_site = str( - Seq(ligation_site).reverse_complement() - ) - if ligation_site != reverse_complement_site: - compiled_reverse_regex = re.compile( - reverse_complement_site.replace("N", ".") - ) - if borderless: - length = len(give_site) + len(accept_site) - else: - length = len(accept_site) - ligation_site_list.append((compiled_reverse_regex, length)) - - return ligation_site_list - - -def get_restriction_site_patterns( - enzymes: List[str], -) -> List[Tuple[re.Pattern, int]]: - """ - Récupère pour chaque enzyme de la liste son site de restriction « purifié » - ainsi que son reverse complément s'il est différent, sous la forme - de motifs regex compilés (et leur longueur). - - Si le site est, par exemple, "N^GATC_N", la partie effective devient "GATC" - (après suppression de '^', '_', et des 'N' superflus autour). - - Parameters: - enzymes (List[str]): Liste des noms d'enzymes (ex: ["HindIII", "DpnII"]). - - Returns: - List[Tuple[re.Pattern, int]]: - Liste de tuples (motif_compilé, longueur). - Ex: [(re.compile("GATC"), 4)] pour DpnII (palindromique). - - Examples: - >>> # Exemple avec DpnII, qui a un site palindromique GATC - >>> result = get_restriction_site_patterns(["DpnII"]) - >>> result - [(re.compile('GATC'), 4)] - """ - results: List[Tuple[re.Pattern, int]] = [] - restriction_batch = RestrictionBatch(enzymes) - - for enzyme in restriction_batch: - site_raw = enzyme.elucidate() - # 1) On enlève '^' et '_' (si présents) - site_clean = site_raw.replace("^", "").replace("_", "") - - # 2) Supprimer les 'N' en début - while site_clean.startswith("N"): - site_clean = site_clean[1:] - # 3) Supprimer les 'N' en fin - while site_clean.endswith("N"): - site_clean = site_clean[:-1] - - # Le motif final - pattern_fw = re.compile(site_clean) - results.append((pattern_fw, len(site_clean))) - - # 4) Calcul du reverse complément - rc_site = str(Seq(site_clean).reverse_complement()) - if rc_site != site_clean: - pattern_rc = re.compile(rc_site) - results.append((pattern_rc, len(rc_site))) - - return results - - -def SearchInDataBase(Enzymes: str) -> Tuple[List[Any], List[Any]]: - """ - Recherche l'enzyme dans la "base" en adaptant sa casse, puis - récupère à la fois les sites de ligation (via FindLigaSite) et - les sites de restriction (via get_restriction_site_patterns), - pour finalement imprimer et retourner ces informations. - - Parameters: - Enzymes (str): Nom(s) d'enzyme(s) séparés par des virgules, - ou chaîne spéciale "No restriction enzyme found". - - Returns: - Tuple[List[str], List[str]]: - - Première liste : motifs (en clair) pour les sites de ligation - - Deuxième liste : motifs (en clair) pour les sites de restriction - - Examples: - >>> # Exemple avec DpnII, qui a un site palindromique GATC - >>> result = SearchInDataBase("dpnii") - Ligation sites: GATCGATC - >>> result - ([(re.compile('GATCGATC'), 4)], [(re.compile('GATC'), 4)]) - - >>> # Exemple avec DpnII, qui a un site palindromique GATC - >>> result = SearchInDataBase("arima") - Ligation sites: GATCGATC - Ligation sites: GATCA.TC - Ligation sites: GA.TGATC - Ligation sites: GA.TGATC - Ligation sites: GATCA.TC - Ligation sites: GA.TA.TC - >>> result - ([(re.compile('GATCGATC'), 4), (re.compile('GATCA.TC'), 4), (re.compile('GA.TGATC'), 4), (re.compile('GA.TGATC'), 4), (re.compile('GATCA.TC'), 4), (re.compile('GA.TA.TC'), 4)], [(re.compile('GATC'), 4), (re.compile('GANTC'), 5)]) - """ - if Enzymes == "No restriction enzyme found": - print(Enzymes) - sys.exit(0) - else: - ListEnzyme = Enzymes.split(",") - - try: - # On adapte la casse - adapted = CaseAdaptation(ListEnzyme) - # Sites de ligation - ligation_site_list = FindLigaSite(adapted) - # Sites de restriction - restriction_site_list = get_restriction_site_patterns(adapted) - - # Affichage des sites de ligation sous forme de motifs en clair - if len(restriction_site_list) > 1: - for el in restriction_site_list: - # el = (re.compile(...), longueur) - print(f"Ligation sites: {el[0].pattern}", flush=True) - else: - print( - f"Ligation sites: {restriction_site_list[0][0].pattern}", - flush=True, - ) - - print() - return ligation_site_list, restriction_site_list - - except Exception: - print("Error in enzyme identification") - raise NameError diff --git a/hi-classifier/stats.py b/hi-classifier/stats.py deleted file mode 100644 index bbe3bd768769649f2de6c5792c2f041133fb7dd5..0000000000000000000000000000000000000000 --- a/hi-classifier/stats.py +++ /dev/null @@ -1,232 +0,0 @@ -import json -import os - -import matplotlib.pyplot as plt - - -def merge_count_dicts(dict_a, dict_b): - """ - Fusionne deux dictionnaires de comptage de la forme {chrom: {pos: count}}, - en sommant les valeurs pour les mêmes positions. - - Parameters: - dict_a (dict): Premier dictionnaire. - dict_b (dict): Second dictionnaire. - - Returns: - dict: Dictionnaire fusionné. - - Examples: - >>> a = {"chr1": {100: 2, 200: 3}} - >>> b = {"chr1": {100: 1, 300: 4}, "chr2": {150: 5}} - >>> merge_count_dicts(a, b) - {'chr1': {100: 3, 200: 3, 300: 4}, 'chr2': {150: 5}} - """ - merged = dict_a.copy() - for chrom, subdict in dict_b.items(): - if chrom not in merged: - merged[chrom] = {} - for pos, count in subdict.items(): - merged[chrom][pos] = merged[chrom].get(pos, 0) + count - return merged - - -def merge_count_dicts_from_queue(q, compute_processes): - """ - Vide une queue contenant des dictionnaires de comptage et retourne - un dictionnaire fusionné en sommant les compteurs pour chaque position. - - Parameters: - q (multiprocessing.Queue): Queue contenant des dictionnaires de la forme {chrom: {pos: count}} - - Returns: - dict: Dictionnaire fusionné. - - Examples: - >>> from multiprocessing import Queue - >>> q = Queue() - >>> q.put({"chr1": {100: 2, 200: 3}}) - >>> q.put({"chr1": {100: 1, 300: 4}, "chr2": {150: 5}}) - >>> q.put(None) - >>> merge_count_dicts_from_queue(q, 1) - {'chr1': {100: 3, 200: 3, 300: 4}, 'chr2': {150: 5}} - >>> q = Queue() - >>> q.put({"chr1": {100: 2, 200: 3}}) - >>> q.put({"chr1": {100: 1, 300: 4}, "chr2": {150: 5}}) - >>> q.put(None) - >>> q.put(None) - >>> merge_count_dicts_from_queue(q, 2) - {'chr1': {100: 3, 200: 3, 300: 4}, 'chr2': {150: 5}} - """ - merged = {} - num_none = 0 - while True: - dictio = q.get() - if dictio is None: - num_none += 1 - # Si on a bien récupérer tout les None - # On a récupéré les dictionnaires - # Donc on arrête - if num_none == compute_processes: - break - else: - merged = merge_count_dicts(merged, dictio) - return merged - - -def compact_and_save_counts( - counts_dang_intra_queue, - counts_biot_intra_queue, - counts_dang_inter_queue, - counts_biot_inter_queue, - counts_undigested_site_queue, - output_prefix, - compute_processes, - plot=True, -): - """ - Fusionne et enregistre les dictionnaires de comptage contenus dans cinq queues, - puis génère des graphiques par chromosome et des graphiques globaux. - - Les fichiers JSON seront enregistrés dans le sous-répertoire "counts". - Les images PNG pour chaque groupe seront enregistrées dans "Figures/<group>/", - et les graphiques globaux seront enregistrés dans "Global/Dangling/" et "Global/Valid/". - - Returns: - dict: Dictionnaire regroupant les dictionnaires fusionnés pour chaque groupe. - Exemple: - { - "vl_intra": {...}, - "dl_intra": {...}, - "vl_inter": {...}, - "dl_inter": {...}, - "undigested": {...} - } - """ - # Créer les répertoires de sortie - print("Start creating outputs stats") - os.makedirs("Counts", exist_ok=True) - os.makedirs("Figures", exist_ok=True) - for group in [ - "vl_intra", - "dl_intra", - "vl_inter", - "dl_inter", - "undigested", - ]: - os.makedirs(os.path.join("Figures", group), exist_ok=True) - os.makedirs(os.path.join("Global", "Dangling"), exist_ok=True) - os.makedirs(os.path.join("Global", "Valid"), exist_ok=True) - - # Fusion des dictionnaires issus de chaque queue - merged_vl_intra = merge_count_dicts_from_queue( - counts_biot_intra_queue, compute_processes - ) - merged_dl_intra = merge_count_dicts_from_queue( - counts_dang_intra_queue, compute_processes - ) - merged_vl_inter = merge_count_dicts_from_queue( - counts_biot_inter_queue, compute_processes - ) - merged_dl_inter = merge_count_dicts_from_queue( - counts_dang_inter_queue, compute_processes - ) - merged_undigested = merge_count_dicts_from_queue( - counts_undigested_site_queue, compute_processes - ) - - merged_counts = { - "vl_intra": merged_vl_intra, - "dl_intra": merged_dl_intra, - "vl_inter": merged_vl_inter, - "dl_inter": merged_dl_inter, - "undigested": merged_undigested, - } - - # Enregistrer chaque dictionnaire en JSON dans le répertoire "counts" - for group, data in merged_counts.items(): - filename = os.path.join( - "Counts", f"{output_prefix}_count_{group}.json" - ) - with open(filename, "w") as f: - json.dump(data, f, indent=2) - - # Générer les graphiques par chromosome pour chaque groupe et enregistrer dans "Figures/<group>/" - if plot: - for group, data in merged_counts.items(): - for chrom, subdict in data.items(): - if subdict: - positions = sorted(subdict.keys()) - counts = [subdict[pos] for pos in positions] - plt.figure(figsize=(10, 6)) - plt.plot(positions, counts) - plt.yscale("log") - plt.xlabel(f"Position sur {chrom}") - plt.ylabel("Nombre de lectures (échelle log)") - plt.title( - f"Distribution des compteurs pour {group} sur {chrom}" - ) - plt.tight_layout() - png_file = os.path.join( - "Figures", - group, - f"{output_prefix}_{group}_{chrom}.png", - ) - plt.savefig(png_file) - plt.close() - - # Générer les graphiques globaux - # Fusionner les données pour les groupes "Valid" (vl_intra + vl_inter) - valid_global = {} - for d in [merged_vl_intra, merged_vl_inter]: - valid_global = merge_count_dicts(valid_global, d) - # Fusionner les données pour les groupes "Dangling" (dl_intra + dl_inter) - dangling_global = {} - for d in [merged_dl_intra, merged_dl_inter]: - dangling_global = merge_count_dicts(dangling_global, d) - - if plot: - # Graphique global pour Valid - for chrom, subdict in valid_global.items(): - if subdict: - positions = sorted(subdict.keys()) - counts = [subdict[pos] for pos in positions] - plt.figure(figsize=(10, 6)) - plt.plot(positions, counts, marker="o") - plt.yscale("log") - plt.xlabel(f"Position sur {chrom}") - plt.ylabel("Nombre de lectures (échelle log)") - plt.title( - f"Distribution globale des sites valides sur {chrom}" - ) - plt.tight_layout() - png_file = os.path.join( - "Global", - "Valid", - f"{output_prefix}_global_valid_{chrom}.png", - ) - plt.savefig(png_file) - plt.close() - # Graphique global pour Dangling - for chrom, subdict in dangling_global.items(): - if subdict: - positions = sorted(subdict.keys()) - counts = [subdict[pos] for pos in positions] - plt.figure(figsize=(10, 6)) - plt.plot(positions, counts, marker="o", linestyle="-") - plt.yscale("log") - plt.xlabel(f"Position sur {chrom}") - plt.ylabel("Nombre de lectures (échelle log)") - plt.title( - f"Distribution globale des sites Dangling sur {chrom}" - ) - plt.tight_layout() - png_file = os.path.join( - "Global", - "Dangling", - f"{output_prefix}_global_dangling_{chrom}.png", - ) - plt.savefig(png_file) - plt.close() - - return merged_counts diff --git a/hi-classifier/writer.py b/hi-classifier/writer.py deleted file mode 100644 index f5016a1e8dc045f45469cad2b439aab516469559..0000000000000000000000000000000000000000 --- a/hi-classifier/writer.py +++ /dev/null @@ -1,173 +0,0 @@ -def open_output(output, Header_Queue, Name): - """ - Open output files for writing. - - Parameters: - output_for (str): Chemin du BAM de la lecture forward. - output_rev (str): Chemin du BAM de la lecture reverse. - header (dict): Header à utiliser pour les fichiers BAM. - - Returns: - bam_out_for, bam_out_rev: Objets pysam.AlignmentFile ouverts en écriture. - """ - import signal - import sys - - import pysam - - Header = Header_Queue.get() - output_forward = str(output) + "_R1_" + str(Name) + ".bam" - output_reverse = str(output) + "_R2_" + str(Name) + ".bam" - - # Open output files for writing - bam_out_for = pysam.AlignmentFile(output_forward, "wb", header=Header[0]) - bam_out_rev = pysam.AlignmentFile(output_reverse, "wb", header=Header[1]) - - # Register signal handlers - def my_signal_handler(sig, frame): - bam_out_for.close() - bam_out_rev.close() - sys.exit(1) - - signal.signal(signal.SIGINT, my_signal_handler) - signal.signal(signal.SIGTSTP, my_signal_handler) - - return bam_out_for, bam_out_rev, Header - - -def From_list_to_pysam_AlignedSegment(data, template, type): - """ - Convertit une liste de valeurs (issues de read_to_tuple) en un objet pysam.AlignedSegment, - en utilisant le header du template pour l'initialisation. - La liste `data` doit contenir les informations dans l'ordre suivant : - [query_name, flag, reference_name, reference_start, mapping_quality, - cigarstring, next_reference_name, next_reference_start, template_length, - query_sequence, query_qualities] - Si certains attributs sont manquants, des valeurs par défaut seront utilisées. - Retourne un objet pysam.AlignedSegment associé au header du template. - """ - import pysam - - # Valeurs par défaut pour chaque attribut - defaults = [ - "", # query_name - None, # flag - "*", # reference_name (choisissez "*" si aucune réf) - 0, # reference_start - 0, # mapping_quality - "*", # cigarstring - "*", # next_reference_name - 0, # next_reference_start - 0, # template_length - "", # query_sequence - None, # query_qualities - ] - # Compléter data à 11 éléments si nécessaire - values = [ - data[i] if i < len(data) and data[i] is not None else defaults[i] - for i in range(11) - ] - - # Correction du champ reference_name : - # Si la valeur est None ou la chaîne "None", on utilise la valeur par défaut ("*") - ref_name = values[2] - if ref_name in (None, "None"): - ref_name = defaults[2] - values[2] = ref_name - - # Crée le segment en passant le header du template - Line = pysam.AlignedSegment(template.header) - - # Affectation des champs avec conversion si nécessaire - Line.query_name = str(values[0]) + "_" + type - Line.flag = int(values[1]) - Line.reference_name = str(values[2]) - Line.reference_start = int(values[3]) - Line.mapping_quality = int(values[4]) - Line.cigarstring = str(values[5]) - Line.next_reference_name = str(values[6]) - Line.next_reference_start = int(values[7]) - Line.template_length = int(values[8]) - Line.query_sequence = str(values[9]) - Line.query_qualities = values[10] # Si None, pysam gère - - return Line - - -def write_bam_pair( - output_queue, - Header_Queue, - output, - num_process, - Name, -): - """ - Write FastQ file pairs to the output using data from the output queue. - - Parameters: - output_queue (Queue): Queue to get processed read pairs. - out_f (subprocess.Popen): Process for the forward output. - out_r (subprocess.Popen): Process for the reverse output. - num_process (int): Number of fragmenting threads. - """ - import sys - - out_f, out_r, header = open_output(output, Header_Queue, Name) - print("Start writing") - - while num_process > 0: - try: - data = output_queue.get() - # print(data[0]) - if data is None: - num_process -= 1 - continue # Passe à l'itération suivante - - else: - # print(data[0], flush=True) - type = data[0].split("-")[0] - # Retour en pysam.AlignedSegment - Forward_Line = From_list_to_pysam_AlignedSegment( - data[1][0], out_f, type - ) - Reverse_Line = From_list_to_pysam_AlignedSegment( - data[1][1], out_r, type - ) - - if out_f is not None and out_r is not None: - out_f.write(Forward_Line) - out_r.write(Reverse_Line) - else: - raise ValueError( - "Error: Streaming output process is not running." - ) - - except Exception as e: - print(f"Error in write_pairs: {e}") - manage_errors(out_f, out_r) - sys.exit(1) - - print("Ending writing processes") - ensure_ending(out_f, out_r) - - -def ensure_ending(out_f, out_r): - out_f.close() - out_r.close() - - -def manage_errors(out_f, out_r): - """ - Gestion simplifiée des erreurs pour la fermeture des fichiers BAM. - """ - try: - if out_f is not None: - out_f.close() - except Exception as e: - print(f"Erreur lors de la fermeture du fichier forward : {e}") - - try: - if out_r is not None: - out_r.close() - except Exception as e: - print(f"Erreur lors de la fermeture du fichier reverse : {e}")