Skip to content
Snippets Groups Projects
Commit b7bcf85b authored by Marco Uderzo's avatar Marco Uderzo
Browse files

Optimized clustering_score() with more efficient logic and multicore processing

parent 32e0f0b5
No related branches found
No related tags found
No related merge requests found
def clustering_score(original_adata, score_value = 'bic', clustering_algorithm='leiden', dim_reduction = 'pca', min_res=0.1, max_res=2.0, step=0.1, plot=True): import numpy as np
#calinski_harabasz import scanpy as sc
import numpy as nu import pandas as pd
import scanpy as sc import sys
import pandas as pd from tqdm import tqdm
from sklearn.metrics import calinski_harabasz_score import multiprocessing
from sklearn.metrics import calinski_harabasz_score
import os
from joblib import Parallel, delayed
import multiprocessing
sc.settings.verbosity = 0
if dim_reduction == 'pca': def compute_clustering_score(r, original_adata, score_value, clustering_algorithm, dim_reduction):
dim_reduction = 'X_pca' """Compute clustering score for a given resolution."""
elif dim_reduction == 'umap': adata = original_adata.copy()
dim_reduction = 'X_umap' clustering_name = f"{clustering_algorithm}_res{r:.2f}"
print(f"Clustering using resolution {r:.2f} (PID: {os.getpid()})")
# Perform clustering
if clustering_algorithm == 'leiden':
sc.tl.leiden(adata, key_added=clustering_name, resolution=r)
elif clustering_algorithm == 'louvain':
sc.tl.louvain(adata, key_added=clustering_name, resolution=r)
else: else:
print('please choose pca or umap as dimensionality reduction') raise ValueError("Please choose 'leiden' or 'louvain' as clustering_algorithm")
exit
# Compute score
n_clusters = adata.obs[clustering_name].nunique()
n_points = len(adata.obs[clustering_name])
n_dimensions = adata.obsm[dim_reduction].shape[1]
res = list(nu.arange(min_res,max_res,step)) if score_value == 'bic':
score = [] score_name = "BIC score"
n_clus = [] n_parameters = (n_clusters - 1) + (n_dimensions * n_clusters) + 1
loglikelihood = sum(
len(X_cluster) * np.log(len(X_cluster))
- len(X_cluster) * np.log(n_points)
- (len(X_cluster) * n_dimensions / 2) * np.log(2 * np.pi * variance)
- (len(X_cluster) - 1) / 2
for cluster_id in adata.obs[clustering_name].unique()
if (X_cluster := adata.obsm[dim_reduction][(adata.obs[clustering_name] == cluster_id).values]).shape[0] > 1
and (variance := np.var(X_cluster, axis=0).sum()) > 0
)
score_value = -2 * (loglikelihood - (n_parameters / 2) * np.log(n_points))
for r in res: elif score_value == 'calinski':
index = res.index(r) score_name = "Calinski-Harabasz score"
adata = original_adata.copy() score_value = -1 * calinski_harabasz_score(adata.obsm[dim_reduction], adata.obs[clustering_name])
string_r = str(r)
clustering_name = '%s_res%s' %(clustering_algorithm,string_r) return r, score_value, n_clusters
print('Clustering by using the resolution %.2f, step %i of %i' %(r,index+1,len(res)))
if clustering_algorithm == 'leiden':
sc.tl.leiden(adata, key_added="%s_res%s" %(clustering_algorithm,string_r), resolution=r)
elif clustering_algorithm == 'louvain':
sc.tl.louvain(adata, key_added="%s_res%s" %(clustering_algorithm,string_r), resolution=r)
else:
print('please choose louvain or leiden as clustering_algorithm')
exit
if score_value == 'bic':
score_name = 'BIC score'
n_points = len(adata.obs[clustering_name])
n_clusters = len(set(adata.obs[clustering_name]))
n_dimensions = adata.obsm[dim_reduction].shape[1]
n_parameters = (n_clusters - 1) + (n_dimensions * n_clusters) + 1
loglikelihood=0
for cluster_id in set(adata.obs[clustering_name]):
cluster_mask = (adata.obs[clustering_name] == cluster_id)
X_cluster = adata.obsm[dim_reduction][cluster_mask]
n_points_cluster = len(X_cluster) def clustering_score(original_adata, score_value='bic', clustering_algorithm='leiden',
centroid = nu.mean(X_cluster, axis=0) dim_reduction='pca', min_res=0.1, max_res=2.0, step=0.1, plot=True):
variance = nu.sum((X_cluster - centroid) ** 2) / (len(X_cluster) - 1) """Compute clustering scores over a range of resolutions in parallel using Joblib."""
loglikelihood += \ sc.settings.verbosity = 0 # Suppress Scanpy verbosity
n_points_cluster * nu.log(n_points_cluster) \
- n_points_cluster * nu.log(n_points) \
- n_points_cluster * n_dimensions / 2 * nu.log(2 * nu.pi * variance) \
- (n_points_cluster - 1) / 2
bic = loglikelihood - (n_parameters / 2) * nu.log(n_points) # Validate dimensionality reduction
bic = -2*bic if dim_reduction == 'pca':
score.append(bic) dim_reduction = 'X_pca'
n_clus.append(n_clusters) elif dim_reduction == 'umap':
dim_reduction = 'X_umap'
else:
raise ValueError("Please choose 'pca' or 'umap' as dim_reduction")
del adata.obs[clustering_name] # Generate resolution range
resolutions = np.arange(min_res, max_res, step)
elif score_value == 'calinski':
score_name = 'Calinski-Harabasz score' # Determine number of available CPU cores
n_clusters = len(set(adata.obs[clustering_name])) num_cores = min(multiprocessing.cpu_count(), len(resolutions)) # Avoid excess processes
n_clus.append(n_clusters) print(f"Using {num_cores}/{multiprocessing.cpu_count()} CPU cores")
score.append(-1*calinski_harabasz_score(adata.obsm[dim_reduction], adata.obs[clustering_name]))
print(f"Starting parallel computation with Joblib using {num_cores} cores...")
results = Parallel(n_jobs=num_cores, backend="loky")(
delayed(compute_clustering_score)(r, original_adata, score_value, clustering_algorithm, dim_reduction)
for r in resolutions
)
df = pd.DataFrame() # Convert results to DataFrame
df[score_name] = score df = pd.DataFrame(results, columns=['resolution', score_value, 'n_clusters'])
df['resolution'] = res
df['n_clus'] = n_clus
if plot==True: # Plot results
df.plot(x='resolution',y=score_name) if plot:
print('\n') df.plot(x='resolution', y=score_value)
return df, res[nu.argmin(score)] # Return DataFrame and best resolution
\ No newline at end of file best_res = df.iloc[df[score_value].idxmin()]['resolution']
print(f"\nBest resolution: {best_res:.2f}")
return df, best_res
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment