Source code for mmcci.CCIData_class

import pandas as pd
import numpy as np
import anndata as ad
import pickle
from typing import Dict, List, Optional, Union
from copy import deepcopy
import json
from tqdm import tqdm

from . import tl


[docs] class CCIData: """ Class to store and manage Cell-Cell Interaction (CCI) data Attributes: metadata (Dict): Metadata for sample n_spots (int): Number of spots in the sample cci_scores (Dict): CCI score dataframe for each LR pair p_values (Dict): P-values dataframe for each LR pair adata (AnnData): AnnData object networks (Dict): Calculated CCI networks other_metadata (Dict): Other metadata assays (Dict): Assays for the sample """
[docs] def __init__( self, cci_scores: Dict = None, p_values: Dict = None, n_spots: int = None, platform: str = None, network: pd.DataFrame = None, network_p_values: pd.DataFrame = None, adata: ad.AnnData = None, other_metadata: Dict = None, assays: Dict = None ): self.metadata = {} self.assays = {} self.adata = adata self.assays['raw'] = {} self.metadata['n_spots'] = n_spots self.metadata['platform'] = platform if other_metadata: self.metadata.update(other_metadata) if cci_scores is not None: self.assays['raw']['cci_scores'] = cci_scores if p_values is not None: self.assays['raw']['p_values'] = p_values if network is not None: self.assays['raw']['network'] = network if network_p_values is not None: self.assays['raw']['network_p_values'] = network_p_values if assays: self.assays = assays
def __repr__(self): """Print assays (with number of LR pairs) and metadata""" assays = {} for assay in self.assays.keys(): if 'cci_scores' in self.assays[assay].keys(): assays[assay] = f"{len(self.assays[assay]['cci_scores'])} LR pairs" elif 'network' in self.assays[assay].keys(): assays[assay] = "network" else: assays[assay] = "none" if self.adata is not None: return f"CCIData object with assays: {assays}, metadata: {self.metadata}, \ and AnnData object" return f"CCIData object with assays: {assays} and metadata: {self.metadata}" def __str__(self): """Print assays (with number of LR pairs) and metadata""" return self.__repr__() def get_sample_metadata(self, sample_id: str) -> Dict: """ Get metadata for a sample Args: sample_id: Sample ID Returns: Metadata for the sample """ return self.metadata.get(sample_id) def get_sample_n_spots(self, sample_id: str) -> Optional[int]: """ Get number of spots for a sample Args: sample_id: Sample ID Returns: Number of spots for the sample """ return self.metadata['n_spots'].get(sample_id) def get_sample_cci_scores(self, sample_id: str) -> Optional[pd.DataFrame]: """ Get CCI scores for a sample Args: sample_id: Sample ID Returns: CCI scores for the sample """ return self.cci_scores.get(sample_id) def get_sample_p_values(self, sample_id: str) -> Optional[pd.DataFrame]: """ Get p-values for a sample Args: sample_id: Sample ID Returns: P-values for the sample """ return self.p_values.get(sample_id) def get_adata(self) -> Optional[ad.AnnData]: """ Get AnnData object Returns: AnnData object """ return self.adata def get_cell_types(self, assay: str = "raw") -> List[str]: """ Get cell types in a sample Args: assay: Assay to get cell types from Returns: List of cell types in the sample """ cell_types = [] if 'cci_scores' in self.assays[assay].keys(): for dfs in self.assays[assay]['cci_scores'].values(): cell_types.extend(dfs.index) elif 'network' in self.assays[assay].keys(): cell_types.extend(self.assays[assay]['network'].index) else: raise ValueError("No cell types found in sample.") return list(set(cell_types)) def copy(self) -> 'CCIData': """ Create a copy of the CCIData object Returns: Copy of the CCIData object """ cci_data = CCIData( assays=deepcopy(self.assays), other_metadata=deepcopy(self.metadata), adata=deepcopy(self.adata) ) return cci_data def rename_cell_types(self, replacements: Dict[str, str], assay = None) -> 'CCIData': """Renames cell types in a CCIData. Args: replacements (dict): A dictionary of replacements, where the keys are the old cell type names and the values are the new cell type names. assay (str): The assay to rename the cell types in. If None, all assays are renamed. Returns: CCIData: A new CCIData object with the cell types renamed. """ renamed_cci_data = self.copy() if assay is not None: for key in renamed_cci_data.assays[assay].keys(): if key == 'network' or key == 'overall': renamed_cci_data.assays[assay][key].rename( index=replacements, columns=replacements, inplace=True) else: for lr_pair in renamed_cci_data.assays[assay][key].keys(): renamed_cci_data.assays[assay][key][lr_pair].rename( index=replacements, columns=replacements, inplace=True) else: for assay in renamed_cci_data.assays.keys(): for key in renamed_cci_data.assays[assay].keys(): if key == 'network' or key == 'overall': renamed_cci_data.assays[assay][key].rename( index=replacements, columns=replacements, inplace=True) else: for lr_pair in renamed_cci_data.assays[assay][key].keys(): renamed_cci_data.assays[assay][key][lr_pair].rename( index=replacements, columns=replacements, inplace=True) return renamed_cci_data def merge_cell_types(self, cell_types: List[str], new_cell_type: str, assay = None) -> 'CCIData': """Merges cell types in a CCIData. Args: cell_types (list): A list of cell types to merge. new_cell_type (str): The name of the new cell type after merging. assay (str): The assay to merge the cell types in. If None, all assays are merged. Returns: CCIData: A new CCIData object with the cell types merged. """ merged_cci_data = self.copy() assays = merged_cci_data.assays.keys() if assay is not None: assays = [assay] for assay in assays: for key in merged_cci_data.assays[assay].keys(): if key == 'network' or key == 'overall': df = merged_cci_data.assays[assay][key] # Sum the rows row_sums = df.loc[cell_types].sum() # Sum the columns col_sums = df[cell_types].sum() # Drop original cell types df = df.drop(cell_types, axis=0) df = df.drop(cell_types, axis=1) # Add new merged cell type df.loc[new_cell_type] = row_sums df[new_cell_type] = col_sums merged_cci_data.assays[assay][key] = df elif key == 'cci_scores': for lr_pair in merged_cci_data.assays[assay][key].keys(): df = merged_cci_data.assays[assay][key][lr_pair] # Sum the rows row_sums = df.loc[cell_types].sum() # Sum the columns col_sums = df[cell_types].sum() # Drop original cell types df = df.drop(cell_types, axis=0) df = df.drop(cell_types, axis=1) # Add new merged cell type df.loc[new_cell_type] = row_sums df[new_cell_type] = col_sums merged_cci_data.assays[assay][key][lr_pair] = df elif key == 'p_values': for lr_pair in merged_cci_data.assays[assay][key].keys(): df = merged_cci_data.assays[assay][key][lr_pair] # Sum the rows row_min = df.loc[cell_types].min() # Sum the columns col_min = df[cell_types].min() # Drop original cell types df = df.drop(cell_types, axis=0) df = df.drop(cell_types, axis=1) # Add new merged cell type df.loc[new_cell_type] = row_min df[new_cell_type] = col_min return merged_cci_data def subset_lrs(self, lr_pairs: List[str], assay = "raw", new_assay = "subset") -> 'CCIData': """Subsets the LR pairs in a CCIData object. Args: lr_pairs (list): A list of LR pairs to include in the subsetted data. assay (str): The assay to subset the LR pairs in. If None, all assays are subsetted. new_assay (str): The name of the new assay after subsetting. Returns: CCIData: A new CCIData object with the LR pairs subsetted. """ subsetted_cci_data = self.copy() subsetted_cci_data.assays[new_assay] = {} subsetted_cci_data.assays[new_assay]['cci_scores'] = \ {lr: self.assays[assay]['cci_scores'][lr] for lr in lr_pairs} subsetted_cci_data.assays[new_assay]['p_values'] = \ {lr: self.assays[assay]['p_values'][lr] for lr in lr_pairs} return subsetted_cci_data def scale(self, scale_factor: float, assay = "raw", new_assay = "scaled") -> 'CCIData': """Scales the CCI scores in a CCIData object. Args: scale_factor (float): The factor to scale the CCI scores by. assay (str): The assay to scale the CCI scores in. If None, all assays are scaled. new_assay (str): The name of the new assay after scaling. Returns: CCIData: A new CCIData object with the CCI scores scaled. """ scaled_cci_data = self.copy() scaled_cci_data.assays[new_assay] = self.assays[assay].copy() for lr_pair in self.assays[assay]['cci_scores'].keys(): scaled_cci_data.assays[new_assay]['cci_scores'][lr_pair] = \ self.assays[assay]['cci_scores'][lr_pair] * scale_factor return scaled_cci_data def scale_by_nspots(self, assay = "raw", new_assay = "scaled") -> 'CCIData': """Scales the CCI scores in a CCIData object by the number of spots. Args: assay (str): The assay to scale the CCI scores in. If None, all assays are scaled. new_assay (str): The name of the new assay after scaling. Returns: CCIData: A new CCIData object with the CCI scores scaled. """ if 'n_spots' not in self.metadata.keys(): raise ValueError("'n_spots' not found in metadata.") scale_factor = 1e6 / self.metadata['n_spots'] return self.scale(scale_factor, assay, new_assay) def filter_by_p_vals(self, cutoff: float = 0.05, assay = "raw", new_assay = "filtered") -> 'CCIData': """Filters the CCI scores in a CCIData object by p-value. Args: cutoff (float): The p-value cutoff to filter the CCI scores by. assay (str): The assay to filter the CCI scores in. If None, all assays are filtered. new_assay (str): The name of the new assay after filtering. Returns: CCIData: A new CCIData object with the CCI scores filtered. """ filtered_cci_data = self.copy() filtered_cci_data.assays[new_assay] = self.assays[assay].copy() for lr_pair, df in self.assays[assay]['cci_scores'].items(): for i, row in df.iterrows(): for j in row.index: p_vals = self.assays[assay]['p_values'][lr_pair] if p_vals.loc[i, j] > cutoff: df.loc[i, j] = 0 filtered_cci_data.assays[new_assay]['cci_scores'][lr_pair] = df return filtered_cci_data def calc_overall(self, assay = "raw", name = "overall", normalize = True) -> 'CCIData': """Calculates the overall CCI scores in a CCIData object. Args: assay (str): The assay to calculate the overall CCI scores in. If None, all assays are calculated name (str): The name of the network after calculating the overall CCI scores. normalize (bool): If True, normalize each LR CCI score network. Defaults to True. Returns: CCIData: A new CCIData object with the overall CCI scores calculated. """ overall_cci_data = self.copy() sample = overall_cci_data.assays[assay]['cci_scores'] total = None for lr_pair in sample.keys(): df_sum = sample[lr_pair].sum().sum() if df_sum > 0: if total is not None: total, sample[lr_pair] = tl.align_dataframes(total, sample[lr_pair]) if normalize: total = total + sample[lr_pair] / df_sum else: total = total + sample[lr_pair] total = total.fillna(0) else: if normalize: total = sample[lr_pair] / df_sum else: total = sample[lr_pair] total = total.fillna(0) total = total / total.sum().sum() total = total.fillna(0) overall_cci_data.assays[assay][name] = total return overall_cci_data def get_lr_proportions( self, sender: str = None, reciever: str = None, assay: str = "raw", key: str = "cci_scores" ) -> dict: """Calculates the proportion of each LR pair in a sample for a specific cell type sender and receiver. Args: sender (str): The sender cell type. reciever (str): The receiver cell type. assay (str) (optional): The assay to use. Defaults to 'raw'. key (str) (optional): The key to use. Defaults to 'cci_scores'. Returns: dict: A list of LR pairs and proportion of its weighting. """ if assay not in self.assays: raise ValueError(f"Assay {assay} not found in sample.") if key not in self.assays[assay]: raise ValueError(f"Key {key} not found in sample.") if sender is None and reciever is None: raise ValueError("Please provide a sender or receiver cell type.") lr_pairs = self.assays[assay][key].keys() for lr in lr_pairs: if type(self.assays[assay][key][lr]) != pd.DataFrame: del self.assays[assay][key][lr] if sender is not None: subset = { lr: df.loc[[sender]] for lr, df in self.assays[assay][key].items() if sender in df.index } if reciever is not None: subset = { lr: df[[reciever]] for lr, df in self.assays[assay][key].items() if reciever in df.columns } subset = { key: df for key, df in subset.items() if not df.map(lambda x: x == 0).all().all() } subset = { key: df for key, df in subset.items() if not df.map(lambda x: x == 0).all().all() } lr_props = {} total = 0 if sender is not None and reciever is not None: for lr_pair in set(subset.keys()): score = subset[lr_pair].at[sender, reciever] total += score for lr_pair in set(subset.keys()): lr_props[lr_pair] = subset[lr_pair].at[sender, reciever] / total if sender is None: # sum across all sender cell types for lr_pair in set(subset.keys()): score = subset[lr_pair].loc[:, reciever].sum() total += score for lr_pair in set(subset.keys()): lr_props[lr_pair] = subset[lr_pair].loc[:, reciever].sum() / total if reciever is None: # sum across all receiver cell types for lr_pair in set(subset.keys()): score = subset[lr_pair].loc[sender, :].sum() total += score for lr_pair in set(subset.keys()): lr_props[lr_pair] = subset[lr_pair].loc[sender, :].sum() / total lr_props = dict(sorted(lr_props.items(), key=lambda item: item[1], reverse=True)) return lr_props def get_p_vals( self, assay: str = "raw", key: str = "p_values", sender: str = None, reciever: str = None ) -> dict: """Returns the p-values for each LR pair in a sample for a specific sender and receiver cell type. Args: assay (str) (optional): The assay to use. Defaults to 'raw'. key (str) (optional): The key to use. Defaults to 'p_values'. sender (str) (optional): The sender cell type. Defaults to None. reciever (str) (optional): The receiver cell type. Defaults to None. Returns: dict: A dictionary of LR pairs and their p-values. """ if assay not in self.assays: raise ValueError(f"Assay {assay} not found in sample.") if key not in self.assays[assay]: raise ValueError(f"Key {key} not found in sample.") if sender is None or reciever is None: raise ValueError("Please provide a sender and receiver cell type.") lr_pairs = self.assays[assay][key].keys() for lr in lr_pairs: if type(self.assays[assay][key][lr]) != pd.DataFrame: del self.assays[assay][key][lr] subset = { lr: df.loc[[sender], [reciever]] for lr, df in self.assays[assay][key].items() if sender in df.index and reciever in df.columns } subset = { key: df for key, df in subset.items() if not df.map(lambda x: x == 0).all().all() } subset = { key: df for key, df in subset.items() if not df.map(lambda x: x == 0).all().all() } p_vals = {} for lr_pair in set(subset.keys()): p_vals[lr_pair] = subset[lr_pair].at[sender, reciever] return p_vals def save(self, path: str): """Saves CCIData object to JSON or pkl file. Args: path (str): Path to save the JSON or pkl file """ if not path.endswith('.json'): path += '.json' if path.endswith('.json'): with open(path, 'w') as f: json.dump(self.to_dict(), f) if path.endswith('.pkl'): with open(path, 'wb') as f: pickle.dump(self, f) def to_dict(self) -> dict: """Convert CCIData object to a JSON-serializable dictionary. Returns: dict: Dictionary representation of the CCIData object """ data_dict = { 'metadata': self.metadata, 'assays': {} } # Convert assays for assay_name, assay in self.assays.items(): data_dict['assays'][assay_name] = {} for key, value in assay.items(): if key in ['cci_scores', 'p_values']: # Convert dict of DataFrames data_dict['assays'][assay_name][key] = { k: v.to_dict() for k,v in value.items() } elif isinstance(value, pd.DataFrame): # Convert single DataFrame data_dict['assays'][assay_name][key] = value.to_dict() else: data_dict['assays'][assay_name][key] = value return data_dict def create_pathway_assay( self, assay: str = "raw", gsea_results: pd.DataFrame = None, strict: bool = True, cutoff: float = 0.05, assay_name = "pathway" ) -> 'CCIData': """Creates a pathway assay from GSEA results. Args: assay (str): The assay to use. Defaults to 'raw'. gsea_results (pd.DataFrame): GSEA results. Defaults to None. strict (bool): If True, both ligand and receptor must be in the gene list. If False, only one must be in the gene list. Defaults to True. cutoff (float): The p-value cutoff to filter the GSEA results by. Defaults to 0.05. assay_name (str): The name of the new assay. Defaults to 'pathway'. Returns: CCIData: A new CCIData object with the pathway assay. """ copy = self.copy() grouped_cci_scores = {} with tqdm(total=len(gsea_results), desc="Converting to pathways") as pbar: for term in gsea_results['Term']: filtered_df = gsea_results[gsea_results['Term'] == term] if filtered_df['Adjusted P-value'].values[0] > cutoff: tqdm.update(pbar, 1) continue gene_list = filtered_df['Genes'].tolist() genes = [] for gene in gene_list: genes.extend(gene.lower().split(";")) cci_scores = [] for key in self.assays[assay]['cci_scores'].keys(): lig, rec = key.lower().split("_") if strict: if lig in genes and rec in genes: cci_scores.append(self.assays[assay]['cci_scores'][key]) else: if lig in genes or rec in genes: cci_scores.append(self.assays[assay]['cci_scores'][key]) total = None for df in cci_scores: if df.sum().sum() > 0: if total is not None: total = total + df total = total.fillna(0) else: total = df total = total.fillna(0) if total is None: tqdm.update(pbar, 1) continue total = total / total.sum().sum() total = total.fillna(0) grouped_cci_scores[term] = total tqdm.update(pbar, 1) copy.assays[assay_name] = {'cci_scores': grouped_cci_scores} return copy