Skip to content
Snippets Groups Projects
Commit 51b6c367 authored by Bertache's avatar Bertache
Browse files

doctest & binary mask

parent 18ae689c
Branches
No related tags found
No related merge requests found
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../hi-classifier")))
from positionfrag import get_fragments_for_read
from positionfrag import get_fragments_for_read, get_re_site_for_read
from readmanager import flag_to_strand
def flag_to_strand(flag):
......@@ -42,3 +42,15 @@ print("Read de 950, longueur 100 :", get_fragments_for_read("chr1", 950, 1050, r
print("Read de 2100, longueur 50 :", get_fragments_for_read("chr1", 2100, 2150, restr_index))
# Cas 3 : Un read qui ne chevauche aucun fragment (hors limites)
print("Read de 5000, longueur 100 :", get_fragments_for_read("chr1", 5000, 5100, restr_index))
print()
print()
# Test pour get_fragments_for_read
print("Test get_re_site_for_read:")
print("RE théorique : (1, 1000)")
print("Read de 950, Sens :", get_re_site_for_read("chr1", 950, "+", restr_index))
print("RE théorique : (2, 2000)")
print("Read de 2100, Antisens :", get_re_site_for_read("chr1", 2100, "-", restr_index))
print("RE théorique : (3, 3000) / Rien")
print("Read de 5000, Antisens :", get_re_site_for_read("chr1", 5000, "-", restr_index))
import re
import os
import sys
import tempfile
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from bisect import bisect_left
# import sys
# import os
# from ..hi_classifier.digest import find_restriction_sites, build_restriction_index_from_records
sys.path.insert(
0,
os.path.abspath(
os.path.join(os.path.dirname(__file__), "../hi-classifier")
),
)
from digest import build_restriction_index_from_records
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../hi-classifier")))
from digest import find_restriction_sites, build_restriction_index_from_records
def get_nearest_restriction_site(chrom, pos, orientation, restr_index):
def test_build_restriction_index_from_fasta():
"""
Retourne le tuple (site_num, position) du site de restriction le plus proche à partir
de la position 'pos' sur le chromosome 'chrom' dans la direction indiquée par 'orientation' ("+" ou "-").
- Pour orientation "+": retourne le premier site dont la position est >= pos.
- Pour orientation "-": retourne le dernier site dont la position est <= pos.
Si aucun site ne correspond, retourne None.
Teste la fonction build_restriction_index_from_records
en créant un FASTA temporaire avec 2 séquences.
"""
sites = restr_index[chrom]["sites"] # Liste de tuples (site_num, pos)
if not sites:
return None
# Extraire uniquement les positions
positions = [s[1] for s in sites]
if orientation == "+":
idx = bisect_left(positions, pos)
if idx < len(positions):
return sites[idx]
else:
return sites[-1]
elif orientation == "-":
idx = bisect_left(positions, pos)
if idx == 0:
return sites[0]
else:
return sites[idx-1]
else:
raise ValueError("Orientation inconnue : " + orientation)
# --- Tests sur des enregistrements synthétiques ---
def test_find_restriction_sites():
# Exemple avec une séquence synthétique et un motif simple
seq = "AAAGCTTCCCAAGCTTGGGAAGCTT"
pattern = "AAGCTT" # motif de HindIII
sites = find_restriction_sites(seq, pattern)
print("Test find_restriction_sites:")
print(" Sites trouvés:", sites)
# Attendu : [3, 12, 19] si indexé à partir de 0
def test_build_restriction_index_from_records():
# Création de deux enregistrements synthétiques pour deux chromosomes fictifs
seq1 = "AAAGCTTCCCAAGCTTGGGAAGCTT" # pour "chr1"
seq2 = "TTTAAGCTTGGGTTTAAGCTTCCC" # pour "chr2"
# 1) Créer deux séquences de test
seq1 = "AXAAGCTTCCCXAAGCTTGGGXAAGCTT" # contiendra le motif "AAGCTT"
seq2 = "TTTXAAGCTTGGGTTTXAAGCTTCCC"
record1 = SeqRecord(Seq(seq1), id="chr1")
record2 = SeqRecord(Seq(seq2), id="chr2")
records = [record1, record2]
enzyme_pattern = "AAGCTT"
index = build_restriction_index_from_records(records, enzyme_pattern)
# 2) Écrire ces séquences dans un fichier FASTA temporaire
with tempfile.NamedTemporaryFile("w", delete=False) as tmp_fasta:
# On utilise SeqIO pour écrire en format FASTA
SeqIO.write([record1, record2], tmp_fasta, "fasta")
fasta_path = tmp_fasta.name # chemin du fichier
print("Test build_restriction_index_from_records:")
for chrom in index:
# 3) Appeler la fonction en lui passant le chemin du fichier FASTA
enzyme_pattern = "X"
restr_index = build_restriction_index_from_records(
fasta_path, enzyme_pattern
)
# 4) Vérifier le contenu du dictionnaire restr_index
# Par exemple, afficher ou tester quelques valeurs
print("TEST: test_build_restriction_index_from_fasta")
for chrom in restr_index:
print(f" Chromosome: {chrom}")
print(" Sites de restriction:")
for site in index[chrom]["sites"]:
print(f" Site {site[0]} à la position {site[1]} bp")
print(" Fragments:")
for frag in index[chrom]["fragments"]:
print(f" Fragment {frag[0]} : [{frag[1][0]}, {frag[1][1]}]")
print(" Sites found:", restr_index[chrom]["sites"])
print(" Fragments:", restr_index[chrom]["fragments"])
def test_get_nearest_restriction_site():
# Utilisation des mêmes enregistrements synthétiques que précédemment
seq1 = "AAAGCTTCCCAAGCTTGGGAAGCTT"
record1 = SeqRecord(Seq(seq1), id="chr1")
records = [record1]
enzyme_pattern = "AAGCTT"
index = build_restriction_index_from_records(records, enzyme_pattern)
# 5) (Optionnel) on peut supprimer le fichier temporaire si on veut
# via os.remove(fasta_path)
pos_test = 10 # Position de test
site_forward = get_nearest_restriction_site("chr1", pos_test, "+", index)
site_reverse = get_nearest_restriction_site("chr1", pos_test, "-", index)
print("Test get_nearest_restriction_site:")
print(f" Pour pos {pos_test} en orientation '+': site {site_forward[0]} à {site_forward[1]} bp")
print(f" Pour pos {pos_test} en orientation '-': site {site_reverse[0]} à {site_reverse[1]} bp")
if __name__ == "__main__":
test_find_restriction_sites()
print()
test_build_restriction_index_from_records()
print()
test_get_nearest_restriction_site()
test_build_restriction_index_from_fasta()
......@@ -102,18 +102,16 @@ R1 = read_bam("./Res_R1.bam")
R2 = read_bam("./Res_R2.bam")
Erreur = False
i = 0
for el1, el2 in zip(R1, R2):
i += 1
if el1[0] != el2[0]:
Erreur = True
print(
"Il y a une erreur, les lectures sont différentes, l'outil est donc non fiable"
)
else:
print()
print(el1[1])
print()
print()
print(el1[0])
if not Erreur:
print("Test unitaire réussi avec succès !")
from typing import Tuple
def signal_handler(sig, frame, out_f, out_r):
"""
Handle termination signals to gracefully terminate processes.
......@@ -15,9 +18,22 @@ def signal_handler(sig, frame, out_f, out_r):
sys.exit()
def partitionning(num_processes):
def partitionning(num_processes: int) -> Tuple[int, int]:
"""
Partition the number of threads for writing and fragmenting.
Parameters:
num_processes (int): The total number of available CPU cores.
Examples:
>>> partitionning(4)
(2, 2)
>>> partitionning(6)
(2, 4)
>>> partitionning(2)
Traceback (most recent call last):
...
ValueError: You need at least 3 cores
"""
if num_processes > 2:
write_processes = 2
......@@ -29,6 +45,13 @@ def partitionning(num_processes):
def check_data(data):
"""
Examples:
>>> check_data([(0, 'Hello'), (1, 'Hello')])
True
>>> check_data([(0, 'Hello'), (1, 'World')])
False
"""
if data[0][1] != data[1][1]:
return False
return True
def read_to_tuple(read):
from multiprocessing import Queue
from typing import Any, List, Union
def read_to_tuple(read: Any) -> List[Union[None, str, int, List[int]]]:
"""
Transforme un objet pysam.AlignedSegment en une liste contenant
les informations essentielles pour chaque alignement. Si un attribut
est absent, la valeur par défaut (None ou une liste vide pour les qualités)
sera utilisée.
Parameters:
read (Any): Objet représentant la lecture BAM (p. ex. pysam.AlignedSegment).
Returns:
List[Union[None, str, int, List[int]]]: Liste des informations
essentielles de l'alignement.
"""
# Récupération sécurisée de chaque attribut avec une valeur par défaut
query_name = getattr(read, "query_name", None)
......@@ -38,16 +49,21 @@ def read_to_tuple(read):
def read_bam_pair(
bam_for_file, bam_rev_file, input_queue, Header_Queue, num_processes
):
bam_for_file: str,
bam_rev_file: str,
input_queue: Queue,
Header_Queue: Queue,
num_processes: int,
) -> None:
"""
Read simultaneously two BAM files and put read pairs into an input queue.
Parameters:
bam_for_file (str): Path to the forward BAM file.
bam_rev_file (str): Path to the reverse BAM file.
Input_Queue (Queue): Queue to store read pairs.
TFrag (int): Number of fragmenting threads.
input_queue (Queue): Queue to store read pairs.
Header_Queue (Queue): Queue to store headers from both BAM files.
num_processes (int): Number of processes (pour gérér la fin du flux).
"""
import sys
import traceback
......
import re
from typing import Any, Dict, List, Tuple, Union
from Bio import SeqIO
def find_restriction_sites(seq, pattern):
def find_restriction_sites(seq: Union[str, Any], pattern: str) -> List[int]:
"""
Recherche toutes les positions d'occurrence du motif (pattern)
Recherche toutes les positions d'occurrence du motif (pattern) dans la séquence seq.
Parameters:
seq (Union[str, Any]): Séquence dans laquelle rechercher (chaîne ou objet ayant une représentation str).
pattern (str): Motif à rechercher.
Returns:
List[int]: Liste triée des positions (0-index) où le motif est trouvé.
Examples:
>>> find_restriction_sites("GAATTCGAATTC", "GAATTC")
[0, 6]
>>> find_restriction_sites("AAAAAA", "TTTT") # Motif non présent
[]
>>> find_restriction_sites("AAAAAAAOAAAAAAAOAAAAAAOO", "O")
[7, 15, 22, 23]
"""
seq_str = str(seq).upper()
pattern = pattern.upper()
......@@ -11,11 +29,37 @@ def find_restriction_sites(seq, pattern):
return sorted(sites)
def find_restriction_sites_from_list(seq, ligation_site_list):
def find_restriction_sites_from_list(
seq: Union[str, Any], ligation_site_list: List[Tuple[re.Pattern, int]]
) -> List[int]:
"""
Recherche toutes les positions d'occurrence pour une séquence (seq) en utilisant une liste
de motifs pré-compilés (ligation_site_list). Chaque élément de cette liste est un tuple (compiled_regex, length).
Retourne une liste triée et sans doublons des positions (0-indexées).
Parameters:
seq (Union[str, Any]): Séquence dans laquelle rechercher.
ligation_site_list (List[Tuple[re.Pattern, int]]): Liste de tuples (regex compilée, longueur du site).
Returns:
List[int]: Liste triée et unique des positions d'occurrence.
Examples:
>>> import re
>>> test_seq = "GATATCATCGATATC"
>>> site_list = [(re.compile("GATATC"), 6), (re.compile("CGA"), 3)]
>>> find_restriction_sites_from_list(test_seq, site_list)
[0, 8, 9]
>>> import re
>>> test_seq = "AAAAAAAOAAAAAAAAA"
>>> site_list = [(re.compile("AOA"), 3), (re.compile("AAAAAAA"), 7)]
>>> find_restriction_sites_from_list(test_seq, site_list)
[0, 6, 8]
>>> import re
>>> test_seq = "XXXXXXXXXXXXXXX"
>>> site_list = [(re.compile("CCCCCC"), 6), (re.compile("UUUUU"), 5)]
>>> find_restriction_sites_from_list(test_seq, site_list)
[]
"""
seq_str = str(seq).upper()
positions = []
......@@ -24,7 +68,9 @@ def find_restriction_sites_from_list(seq, ligation_site_list):
return sorted(set(positions))
def build_restriction_index_from_records(records, enzyme_input):
def build_restriction_index_from_records(
fasta_path: str, enzyme_input: Union[str, List[Tuple[re.Pattern, int]]]
) -> Dict[str, Dict[str, Any]]:
"""
Construit l'index des sites de restriction à partir d'une liste d'enregistrements (SeqRecord).
......@@ -34,8 +80,18 @@ def build_restriction_index_from_records(records, enzyme_input):
Pour chaque chromosome, retourne un dictionnaire contenant :
- "sites": une liste de tuples (site_num, position), numérotée à partir de 1.
- "fragments": une liste de tuples (frag_num, (start, end)) définissant les intervalles des fragments.
Returns:
Dict[str, Dict[str, Any]]:
Clé = record.id (nom de chromosome/contig)
Valeur = {
"sites": [(1, pos1), (2, pos2), ...],
"fragments": [(1, (0, pos1)), (2, (pos1, pos2)), ...]
}
"""
restr_index = {}
records = list(SeqIO.parse(fasta_path, "fasta"))
restr_index: Dict[str, Dict[str, Any]] = {}
for record in records:
chrom = record.id
seq = record.seq
......
This diff is collapsed.
......@@ -69,7 +69,7 @@ def get_fragments_for_read(chrom, start, end, restr_index):
return overlapping_fragments
def annotate_paired_read(paired_read, restr_index):
def annotate_intra_paired_read(paired_read, restr_index):
"""
Pour une instance de PairedRead, met à jour ses attributs avec les informations extraites
à partir de l'index de restriction.
......
......@@ -16,7 +16,7 @@ def process_items(
import sys
from auxiliary import check_data
from positionfrag import annotate_paired_read
from positionfrag import annotate_intra_paired_read
from readmanager import PairedRead
try:
......@@ -35,10 +35,19 @@ def process_items(
current_pair.mapq_1 >= mapq_score
and current_pair.mapq_2 >= mapq_score
):
if current_pair.chrom_1 == current_pair.chrom_2:
# Récupération des informations sur la paire de read
annotate_paired_read(current_pair, dict_digested)
# Cherchez la présence des sites de restriction et de ligation
# Si fragment multiple
annotate_intra_paired_read(current_pair, dict_digested)
else:
#
# Pour le moment, il n'y a pas de classification pour les contacts extrachromosomique
#
# Idée : Prendre uniquement les catégories Up, Down...
# Et considérer toujours que la première lecture est issue
# du chr au chiffre le plus faible !
pass
output_queue.put([data[0], data[1]])
else:
......
......@@ -18,14 +18,28 @@ import re
CIGAR_PATTERN = re.compile(r"(\d+)([MIDNSHP=X])")
def cigar_reference_length(cigar_str):
def cigar_reference_length(cigar_str: str) -> int:
"""
Calcule la longueur consommée sur la référence à partir
d'une chaîne CIGAR (ex.: "10S40M2D10M5I30M", "100M2I3D50M", etc.).
On ajoute le nombre associé aux opérations qui consomment la référence :
M, I, S, =, X
et on le reste conformément à https://samtools.github.io/hts-specs/SAMv1.pdf
Selon ce code, on ajoute le nombre associé aux opérations
qui consomment la référence : M, I, S, =, X.
Les autres (D, N, H, P, etc.) ne sont pas ajoutées au total,
même si dans la spécification SAM, certaines (comme D) consomment
effectivement la référence.
Examples:
>>> cigar_reference_length("10S40M2D10M5I30M")
95
>>> cigar_reference_length("100M2I3D50M")
152
>>> cigar_reference_length("")
0
>>> cigar_reference_length(None)
0
>>> cigar_reference_length("4M")
4
"""
if not cigar_str:
return 0
......@@ -34,7 +48,7 @@ def cigar_reference_length(cigar_str):
tokens = CIGAR_PATTERN.findall(cigar_str)
for num_str, op in tokens:
n = int(num_str)
# Seules ces opérations consomment la référence
# Seules ces opérations sont ajoutées dans cette implémentation
if op in ("M", "I", "S", "=", "X"):
length += n
return length
......@@ -44,17 +58,64 @@ def flag_to_strand(flag):
"""
Retourne '+' si le read est sur le brin positif,
'-' si le read est sur le brin négatif (bit 0x10).
Examples:
>>> flag_to_strand(0) # Ni 0x10 ni 0x4 => brin positif
'+'
>>> flag_to_strand(16) # 0x10 => brin négatif
'-'
>>> flag_to_strand(40) # Lecture unmapped, on ne voit pas 0x10 => brin positif
'+'
>>> flag_to_strand(20) # 20 = 0x14 => 0x10 (negatif) + 0x4 (unmapped)
'-'
"""
return "-" if (flag and 0x10) else "+"
return "-" if (flag & 0x10) else "+"
class PairedRead:
"""
Classe stockant une paire de lectures (read_1, read_2).
- 'start_1' et 'start_2' : coordonnée la plus à gauche dans le génome (champ POS).
- 'strand_1' et 'strand_2' : brin '+' ou '-'.
- 'end_1' et 'end_2' : calculés en fonction du brin.
- On impose (si même chromosome) que read1 soit le plus amont.
Chaque 'read_tuple_i' est un tuple au format suivant:
[
query_name, # Nom de la lecture
flag, # Flag de l'alignement
reference_name, # Nom de la référence (chromosome)
reference_start, # Position de début sur la référence
mapping_quality, # Qualité de l'alignement
cigarstring, # Chaîne CIGAR
next_reference_name,# ...
next_reference_start,# ...
template_length, # Longueur du fragment (tlen)
query_sequence, # Séquence de la lecture
query_qualities, # Qualités (liste d'entiers) ou None
]
Les attributs calculés incluent:
- start_1, end_1, strand_1
- start_2, end_2, strand_2
- interchrom (booléen), etc.
Examples:
>>> # read1 => brin +, commence à 100, CIGAR=50M => end_1=149
>>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT")
>>> # read2 => brin -, commence à 200, CIGAR=10M => end_2=191
>>> read2 = ("r2", 16, "chr1", 200, 30, "10M", None, None, 0, "ACGT")
>>> pr = PairedRead(read1, read2)
>>> pr
<PairedRead r1/r2 chr=chr1 strand=(+,-) pos=(100-149, 200-191)>
>>> pr.interchrom
False
>>> # Vérifions l'ordre: read1 est plus amont => on n'a pas besoin de swap
...
>>> # Si on inversait les positions:
>>> read1b = ("r1b", 0, "chr1", 500, 30, "50M", None, None, 0, "ACGT")
>>> read2b = ("r2b", 16, "chr1", 100, 30, "10M", None, None, 0, "ACGT")
>>> prb = PairedRead(read1b, read2b)
>>> prb
<PairedRead r2b/r1b chr=chr1 strand=(-,+) pos=(100-91, 500-549)>
>>> # On voit que read2b/r1b ont été inversés pour que la coord la plus à gauche soit first.
"""
def __init__(self, read_tuple_1, read_tuple_2):
......@@ -131,7 +192,22 @@ class PairedRead:
self.interchrom = self.chrom_1 != self.chrom_2
def _swap_reads(self):
"""Échange toutes les infos de read1 et read2."""
"""
Échange toutes les infos de read1 et read2 si read2 est plus en amont.
Examples:
>>> # On donne à read1 chr1, start=500, et à read2 chr2, start=100
>>> # => le code ne va pas faire le swap dans __init__ car chrom_1 != chrom_2
>>> from readmanager import PairedRead
>>> read1 = ("r1", 0, "chr1", 500, 30, "50M", None, None, 0, "ACGT")
>>> read2 = ("r2", 16, "chr2", 100, 30, "50M", None, None, 0, "ACGT")
>>> pr = PairedRead(read1, read2)
>>> pr.query_name_1, pr.query_name_2
('r1', 'r2')
>>> pr._swap_reads()
>>> pr.query_name_1, pr.query_name_2
('r2', 'r1')
"""
# On échange : query_name, flag, chrom, start, end, mapq, cigar, strand
(self.query_name_1, self.query_name_2) = (
self.query_name_2,
......@@ -150,6 +226,17 @@ class PairedRead:
(self.seq1, self.seq2) = (self.seq2, self.seq1)
def __repr__(self):
"""
Génère une représentation textuelle de l'objet PairedRead.
Examples:
>>> from readmanager import PairedRead
>>> read1 = ("r1", 0, "chr1", 100, 30, "50M", None, None, 0, "ACGT")
>>> read2 = ("r2", 16, "chr1", 200, 30, "50M", None, None, 0, "ACGT")
>>> pr = PairedRead(read1, read2)
>>> pr
<PairedRead r1/r2 chr=chr1 strand=(+,-) pos=(100-149, 200-151)>
"""
return (
f"<PairedRead {self.query_name_1}/{self.query_name_2} "
f"chr={self.chrom_1 if not self.interchrom else 'inter'} "
......
import re
import sys
from typing import Any, List, Tuple, Union
from Bio.Restriction import RestrictionBatch
from Bio.Seq import Seq
def CaseAdaptation(List_Enzyme: Union[str, List[str]]) -> List[str]:
"""
Case sensitive enzymes adaptation.
Convertit les noms d'enzymes (en minuscule, majuscule ou mélange) vers
les noms exacts attendus par Bio.Restriction. Si l'argument est une chaîne,
il est converti en liste à un seul élément.
Returns:
List[str]: Liste d'enzymes adaptées en respectant la casse reconnue.
Examples:
>>> # Exemple avec l'enzyme "arima" (majuscule/minuscule mal gérée)
>>> result = CaseAdaptation("arima")
>>> print(result)
['DpnII', 'HinfI']
"""
if isinstance(List_Enzyme, str):
List_Enzyme = [List_Enzyme]
for i, enzyme in enumerate(List_Enzyme):
enzyme_lower = enzyme.lower()
if enzyme_lower == "hindiii":
List_Enzyme[i] = "HindIII"
elif enzyme_lower == "dpnii":
List_Enzyme[i] = "DpnII"
elif enzyme_lower == "bglii":
List_Enzyme[i] = "BglII"
elif enzyme_lower == "mboi":
List_Enzyme[i] = "MboI"
elif enzyme_lower == "arima":
# Double enzyme
List_Enzyme[i] = "DpnII"
List_Enzyme.append("HinfI")
return List_Enzyme
def FindLigaSite(
List_Enzyme: List[str], borderless: bool = False
) -> List[Tuple[re.Pattern, int]]:
"""
Finds the ligation sites for a given list of enzymes and their length.
Bio.Restriction elucide() renvoie par exemple "N^GATC_N" pour DpnII,
où:
- '^' marque le site de coupure forward
- '_' marque le site de coupure reverse
- 'N' peut signifier une base quelconque (non spécifiée).
Le 'give site' (partie avant la coupure reverse) est purifié en supprimant
'^' et d'éventuels 'N' redondants.
Le 'accept site' (partie après la coupure forward) est purifié en supprimant
'_' et d'éventuels 'N' redondants à la fin.
Ligation site = give_site + accept_site (en remplaçant éventuellement 'N' par '.').
Si le site n'est pas palindromique, on ajoute également son reverse complement.
Examples:
>>> # Exemple avec DpnII, qui a un site palindromique GATC
>>> result = FindLigaSite(["DpnII"])
>>> result
[(re.compile('GATCGATC'), 4)]
"""
restriction_batch = RestrictionBatch(List_Enzyme)
give_list: List[str] = []
accept_list: List[str] = []
ligation_site_list: List[Tuple[re.Pattern, int]] = []
for enz in restriction_batch:
site = enz.elucidate()
fw_cut = site.find("^")
rev_cut = site.find("_")
# Purify give site (jusqu'à rev_cut)
give_site = site[:rev_cut].replace("^", "")
# Retirer d'éventuels 'N' redondants en début
while give_site and give_site[0] == "N":
give_site = give_site[1:]
give_list.append(give_site)
# Purify accept site (à partir de fw_cut+1)
accept_site = site[fw_cut + 1 :].replace("_", "")
# Retirer d'éventuels 'N' redondants en fin
while accept_site and accept_site[-1] == "N":
accept_site = accept_site[:-1]
accept_list.append(accept_site)
# Combine give_site et accept_site
for give_site in give_list:
for accept_site in accept_list:
ligation_site = (give_site + accept_site).replace("N", ".")
compiled_regex = re.compile(ligation_site)
# Longueur selon le paramètre borderless
if borderless:
length = len(give_site) + len(accept_site)
else:
length = len(give_site)
ligation_site_list.append((compiled_regex, length))
# Reverse complement
reverse_complement_site = str(
Seq(ligation_site).reverse_complement()
)
if ligation_site != reverse_complement_site:
compiled_reverse_regex = re.compile(
reverse_complement_site.replace("N", ".")
)
if borderless:
length = len(give_site) + len(accept_site)
else:
length = len(accept_site)
ligation_site_list.append((compiled_reverse_regex, length))
return ligation_site_list
def get_restriction_site_patterns(
enzymes: List[str],
) -> List[Tuple[re.Pattern, int]]:
"""
Récupère pour chaque enzyme de la liste son site de restriction « purifié »
ainsi que son reverse complément s'il est différent, sous la forme
de motifs regex compilés (et leur longueur).
Si le site est, par exemple, "N^GATC_N", la partie effective devient "GATC"
(après suppression de '^', '_' et des 'N' superflus).
Parameters:
enzymes (List[str]): Liste des noms d'enzymes (ex: ["HindIII", "DpnII"]).
Returns:
List[Tuple[re.Pattern, int]]:
Liste de tuples (motif_compilé, longueur).
Ex: [(re.compile("GATC"), 4), (re.compile("GATC"), 4)] pour un palindrome.
Examples:
>>> # Exemple avec DpnII, qui a un site palindromique GATC
>>> result = get_restriction_site_patterns(["DpnII"])
>>> result
[(re.compile('.GATC.'), 6)]
"""
results: List[Tuple[re.Pattern, int]] = []
restriction_batch = RestrictionBatch(enzymes)
for enzyme in restriction_batch:
site_raw = enzyme.elucidate()
site_clean = site_raw.replace("^", "").replace("_", "")
# Remplacer 'N' par '.' pour autoriser n'importe quelle base
pattern_fw = re.compile(site_clean.replace("N", "."))
results.append((pattern_fw, len(site_clean)))
# Vérifier le RC
rc_site = str(Seq(site_clean).reverse_complement())
if rc_site != site_clean:
pattern_rc = re.compile(rc_site.replace("N", "."))
results.append((pattern_rc, len(rc_site)))
return results
def SearchInDataBase(Enzymes: str) -> Tuple[List[Any], List[Any]]:
"""
Recherche l'enzyme dans la "base" en adaptant sa casse, puis
récupère à la fois les sites de ligation (via FindLigaSite) et
les sites de restriction (via get_restriction_site_patterns),
pour finalement imprimer et retourner ces informations.
Parameters:
Enzymes (str): Nom(s) d'enzyme(s) séparés par des virgules,
ou chaîne spéciale "No restriction enzyme found".
Returns:
Tuple[List[str], List[str]]:
- Première liste : motifs (en clair) pour les sites de ligation
- Deuxième liste : motifs (en clair) pour les sites de restriction
Examples:
>>> # Exemple avec DpnII, qui a un site palindromique GATC
>>> result = SearchInDataBase("dpnii")
Ligation sites: GATCGATC
>>> result
([(re.compile('GATCGATC'), 4)], [(re.compile('.GATC.'), 6)])
>>> # Exemple avec DpnII, qui a un site palindromique GATC
>>> result = SearchInDataBase("arima")
Ligation sites: GATCGATC
Ligation sites: GATCA.TC
Ligation sites: GA.TGATC
Ligation sites: GA.TGATC
Ligation sites: GATCA.TC
Ligation sites: GA.TA.TC
>>> result
([(re.compile('GATCGATC'), 4), (re.compile('GATCA.TC'), 4), (re.compile('GA.TGATC'), 4), (re.compile('GA.TGATC'), 4), (re.compile('GATCA.TC'), 4), (re.compile('GA.TA.TC'), 4)], [(re.compile('.GATC.'), 6), (re.compile('GA.TC'), 5)])
"""
if Enzymes == "No restriction enzyme found":
print(Enzymes)
sys.exit(0)
else:
ListEnzyme = Enzymes.split(",")
try:
# On adapte la casse
adapted = CaseAdaptation(ListEnzyme)
# Sites de ligation
ligation_site_list = FindLigaSite(adapted)
# Sites de restriction
restriction_site_list = get_restriction_site_patterns(adapted)
# Affichage des sites de ligation sous forme de motifs en clair
if len(ligation_site_list) > 1:
for el in ligation_site_list:
# el = (re.compile(...), longueur)
print(f"Ligation sites: {el[0].pattern}", flush=True)
else:
print(
f"Ligation sites: {ligation_site_list[0][0].pattern}",
flush=True,
)
return ligation_site_list, restriction_site_list
except Exception:
print("Error in enzyme identification")
raise NameError
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment