Source code for ark.analysis.neighborhood_analysis

import os
import warnings

import matplotlib.pylab as plt
import numpy as np
import pandas as pd
import seaborn as sns
import xarray as xr
from tqdm.notebook import tqdm
from alpineer import misc_utils

import ark.settings as settings
from ark.analysis import spatial_analysis_utils


[docs]def create_neighborhood_matrix(all_data, dist_mat_dir, included_fovs=None, distlim=50, self_neighbor=False, fov_col=settings.FOV_ID, cell_label_col=settings.CELL_LABEL, cell_type_col=settings.CELL_TYPE): """Calculates the number of neighbor phenotypes for each cell. Args: all_data (pandas.DataFrame): data for all fovs. Includes the columns for fov, label, and cell phenotype. dist_mat_dir (str): directory containing the distance matrices included_fovs (list): fovs to include in analysis. If argument is none, default is all fovs used. distlim (int): cell proximity threshold. Default is 50. self_neighbor (bool): If true, cell counts itself as a neighbor in the analysis. Default is False. fov_col (str): column with the cell fovs. cell_label_col (str): column with the cell labels. cell_type_col (str): column with the cell types. Returns: pandas.DataFrame: DataFrame containing phenotype counts per cell tupled with DataFrame containing phenotype frequencies of counts per phenotype/total phenotypes for each cell """ # Set up input and parameters if included_fovs is None: included_fovs = all_data[fov_col].unique() # Check if included fovs found in fov_col misc_utils.verify_in_list(fov_names=included_fovs, unique_fovs=all_data[fov_col].unique()) # Subset just the fov, label, and cell phenotype columns all_neighborhood_data = all_data[ [fov_col, cell_label_col, cell_type_col] ].reset_index(drop=True) # Extract the cell phenotypes cluster_names = all_neighborhood_data[cell_type_col].drop_duplicates() # Get the total number of phenotypes cluster_num = len(cluster_names) included_columns = [fov_col, cell_label_col, cell_type_col] # Initialize empty matrices for cell neighborhood data cell_neighbor_counts = pd.DataFrame( np.zeros((all_neighborhood_data.shape[0], cluster_num + len(included_columns))) ) # Replace the first, second (possibly third) columns of cell_neighbor_counts cell_neighbor_counts[list(range(len(included_columns)))] = \ all_neighborhood_data[included_columns] cols = included_columns + list(cluster_names) # Rename the columns to match cell phenotypes cell_neighbor_counts.columns = cols cell_neighbor_freqs = cell_neighbor_counts.copy(deep=True) with tqdm(total=len(included_fovs), desc="Neighbors Matrix Generation", unit="FOVs") \ as neighbor_mat_progress: for fov in included_fovs: neighbor_mat_progress.set_postfix(FOV=fov) # Subsetting expression matrix to only include patients with correct fov label current_fov_idx = all_neighborhood_data.loc[:, fov_col] == fov current_fov_neighborhood_data = all_neighborhood_data[current_fov_idx] # Get the subset of phenotypes included in the current fov fov_cluster_names = current_fov_neighborhood_data[cell_type_col].drop_duplicates() # Retrieve fov-specific distance matrix from distance matrix dictionary dist_matrix = xr.load_dataarray(os.path.join(dist_mat_dir, str(fov) + '_dist_mat.xr')) # Get cell_neighbor_counts and cell_neighbor_freqs for fovs counts, freqs = spatial_analysis_utils.compute_neighbor_counts( current_fov_neighborhood_data, dist_matrix, distlim, self_neighbor, cell_label_col=cell_label_col, cluster_name_col=cell_type_col) # Add to neighbor counts+freqs for only matching phenos between fov and whole dataset cell_neighbor_counts.loc[current_fov_neighborhood_data.index, fov_cluster_names] \ = counts cell_neighbor_freqs.loc[current_fov_neighborhood_data.index, fov_cluster_names]\ = freqs neighbor_mat_progress.update(1) # Remove cells that have no neighbors within the distlim total_cell_count = cell_neighbor_counts.shape[0] keep_cells = cell_neighbor_counts.drop(included_columns, axis=1).sum(axis=1) != 0 cell_neighbor_counts = cell_neighbor_counts.loc[keep_cells].reset_index(drop=True) cell_neighbor_freqs = cell_neighbor_freqs.loc[keep_cells].reset_index(drop=True) # issue warning if more than 5% of cells are dropped if (cell_neighbor_counts.shape[0] / total_cell_count) < 0.95: warnings.warn(UserWarning("More than 5% of cells have no neighbor within the provided " "radius and have been omitted. We suggest increasing the " "distlim value to reduce the number of cells excluded from " "analysis.")) return cell_neighbor_counts, cell_neighbor_freqs
[docs]def generate_cluster_matrix_results(all_data, neighbor_mat, cluster_num, seed=42, excluded_channels=None, included_fovs=None, cluster_label_col=settings.KMEANS_CLUSTER, fov_col=settings.FOV_ID, cell_type_col=settings.CELL_TYPE, label_col=settings.CELL_LABEL, pre_channel_col=settings.PRE_CHANNEL_COL, post_channel_col=settings.POST_CHANNEL_COL): """Generate the cluster info on all_data using k-means clustering on neighbor_mat. cluster_num has to be picked based on visualizations from compute_cluster_metrics. Args: all_data (pandas.DataFrame): data including fovs, cell labels, and cell expression matrix for all markers neighbor_mat (pandas.DataFrame): a neighborhood matrix, created from create_neighborhood_matrix cluster_num (int): the optimal k to pass into k-means clustering to generate the final clusters and corresponding results seed (int): the random seed to set for k-means clustering excluded_channels (list): all channel names to be excluded from analysis included_fovs (list): fovs to include in analysis. If argument is None, default is all fovs used cluster_label_col (str): the name of the cluster label col we will create for neighborhood clusters fov_col (str): the name of the column in all_data and neighbor_mat indicating the fov cell_type_col (str): the name of the column in all_data indicating the cell type label_col (str): the name of the column in all_data indicating cell label pre_channel_col (str): the name of the column in all_data right before the first channel column post_channel_col (str): the name of the column in all_data right after the last channel column Returns: tuple (pandas.DataFrame, pandas.DataFrame, pandas.DataFrame): - the expression matrix with the corresponding cluster labels attached, will only include fovs included in the analysis - an a x b count matrix (a = # of clusters, b = # of cell types) with cluster ids indexed row-wise and cell types indexed column-wise, indicates number of cell types that are within each cluster - an a x c mean matrix (a = # of clusters, c = # of markers) with cluster ids indexed row-wise and markers indexed column-wise, indicates the mean marker expression for each cluster id """ # get fovs if included_fovs is None: included_fovs = neighbor_mat[fov_col].unique() # check if included fovs found in fov_col misc_utils.verify_in_list(fov_names=included_fovs, unique_fovs=all_data[fov_col].unique()) # check if all excluded column names found in all_data if excluded_channels is not None: misc_utils.verify_in_list(columns_to_exclude=excluded_channels, column_names=all_data.columns) # make sure number of clusters specified is valid if cluster_num < 2: raise ValueError("Invalid k provided for clustering") # subset neighbor mat neighbor_mat_data_all = neighbor_mat[neighbor_mat[fov_col].isin(included_fovs)] neighbor_mat_data = neighbor_mat_data_all.drop([fov_col, label_col, cell_type_col], axis=1) # generate cluster labels cluster_labels = spatial_analysis_utils.generate_cluster_labels( neighbor_mat_data, cluster_num, seed=seed) # add labels to neighbor mat neighbor_mat_data_all[cluster_label_col] = cluster_labels # subset for data in cell table we want to keep all_data_clusters = all_data[all_data[fov_col].isin(included_fovs)] # combine with neighborhood data all_data_clusters = all_data_clusters.merge( neighbor_mat_data_all[[fov_col, label_col, cluster_label_col]], on=[fov_col, label_col]) # create a count pivot table with cluster_label_col as row and cell_type_col as column group_by_cell_type = all_data_clusters.groupby( [cluster_label_col, cell_type_col]).size().reset_index(name="count") num_cell_type_per_cluster = group_by_cell_type.pivot( index=cluster_label_col, columns=cell_type_col, values="count").fillna(0).astype(int) # annotate index with "Cluster" to make visualization clear that these are cluster labels num_cell_type_per_cluster.index = ["Cluster" + str(c) for c in num_cell_type_per_cluster.index] # subsets the expression matrix to only have channel columns channel_start = np.where(all_data_clusters.columns == pre_channel_col)[0][0] + 1 channel_end = np.where(all_data_clusters.columns == post_channel_col)[0][0] cluster_label_colnum = np.where(all_data_clusters.columns == cluster_label_col)[0][0] all_data_markers_clusters = \ all_data_clusters.iloc[:, list(range(channel_start, channel_end)) + [cluster_label_colnum]] # drop excluded channels if excluded_channels is not None: all_data_markers_clusters = all_data_markers_clusters.drop(excluded_channels, axis=1) # create a mean pivot table with cluster_label_col as row and channels as column mean_marker_exp_per_cluster = all_data_markers_clusters.groupby([cluster_label_col]).mean() # annotate index with "Cluster" to make visualization clear that these are cluster labels mean_marker_exp_per_cluster.index = ["Cluster" + str(c) for c in mean_marker_exp_per_cluster.index] return all_data_clusters, num_cell_type_per_cluster, mean_marker_exp_per_cluster
[docs]def compute_cluster_metrics_inertia(neighbor_mat, min_k=2, max_k=10, seed=42, included_fovs=None, fov_col=settings.FOV_ID, label_col=settings.CELL_LABEL, cell_col=settings.CELL_TYPE): """Produce k-means clustering metrics to help identify optimal number of clusters using inertia Args: neighbor_mat (pandas.DataFrame): a neighborhood matrix, created from create_neighborhood_matrix min_k (int): the minimum k we want to generate cluster statistics for, must be at least 2 max_k (int): the maximum k we want to generate cluster statistics for, must be at least 2 seed (int): the random seed to set for k-means clustering included_fovs (list): fovs to include in analysis. If argument is none, default is all fovs used. fov_col (str): the name of the column in neighbor_mat indicating the fov label_col (str): the name of the column in neighbor_mat indicating the label cell_col (str): column with the cell phenotpype Returns: xarray.DataArray: an xarray with dimensions (num_k_values) where num_k_values is the range of integers from 2 to max_k included, contains the metric scores for each value in num_k_values """ # set included_fovs to everything if not set if included_fovs is None: included_fovs = neighbor_mat[fov_col].unique() # make sure the user specifies a positive k if min_k < 2 or max_k < 2: raise ValueError("Invalid k provided for clustering") # check if included fovs found in fov_col misc_utils.verify_in_list(fov_names=included_fovs, unique_fovs=neighbor_mat[fov_col].unique()) # subset neighbor_mat accordingly, and drop the columns we don't need neighbor_mat_data = neighbor_mat[neighbor_mat[fov_col].isin(included_fovs)] neighbor_mat_data = neighbor_mat_data.drop([fov_col, label_col, cell_col], axis=1) # generate the cluster score information neighbor_cluster_stats = spatial_analysis_utils.compute_kmeans_inertia( neighbor_mat_data=neighbor_mat_data, min_k=min_k, max_k=max_k, seed=seed) return neighbor_cluster_stats
[docs]def compute_cluster_metrics_silhouette(neighbor_mat, min_k=2, max_k=10, seed=42, included_fovs=None, fov_col=settings.FOV_ID, label_col=settings.CELL_LABEL, cell_col=settings.CELL_TYPE, subsample=None): """Produce k-means clustering metrics to help identify optimal number of clusters using Silhouette score Args: neighbor_mat (pandas.DataFrame): a neighborhood matrix, created from create_neighborhood_matrix min_k (int): the minimum k we want to generate cluster statistics for, must be at least 2 max_k (int): the maximum k we want to generate cluster statistics for, must be at least 2 seed (int): the random seed to set for k-means clustering included_fovs (list): fovs to include in analysis. If argument is none, default is all fovs used. fov_col (str): the name of the column in neighbor_mat indicating the fov label_col (str): the name of the column in neighbor_mat indicating the label cell_col (str): column with the cell phenotype subsample (int): the number of cells that will be sampled from each neighborhood cluster for calculating Silhouette score If None, all cells will be used Returns: xarray.DataArray: an xarray with dimensions (num_k_values) where num_k_values is the range of integers from 2 to max_k included, contains the metric scores for each value in num_k_values """ # set included_fovs to everything if not set if included_fovs is None: included_fovs = neighbor_mat[fov_col].unique() # make sure the user specifies a positive k if min_k < 2 or max_k < 2: raise ValueError("Invalid k provided for clustering") # check if included fovs found in fov_col misc_utils.verify_in_list(fov_names=included_fovs, unique_fovs=neighbor_mat[fov_col].unique()) # subset neighbor_mat accordingly, and drop the columns we don't need neighbor_mat_data = neighbor_mat[neighbor_mat[fov_col].isin(included_fovs)] neighbor_mat_data = neighbor_mat_data.drop([fov_col, label_col, cell_col], axis=1) # generate the cluster score information neighbor_cluster_stats = spatial_analysis_utils.compute_kmeans_silhouette( neighbor_mat_data=neighbor_mat_data, min_k=min_k, max_k=max_k, seed=seed, subsample=subsample ) return neighbor_cluster_stats
[docs]def compute_cell_ratios(neighbors_mat, target_cells, reference_cells, fov_list, bin_number=10, cell_col=settings.CELL_TYPE, fov_col=settings.FOV_ID, label_col=settings.CELL_LABEL): """ Computes the target/reference and reference/target ratios for each FOV Args: neighbors_mat (pandas.DataFrame): a neighborhood matrix, created from create_neighborhood_matrix target_cells (list): invading cell phenotypes reference_cells (list): expected cell phenotypes fov_list (list): names of the fovs to compare bin_number (int): number of bins to use in histogram cell_col (str): column with the cell phenotype fov_col (str): column with the fovs label_col (str): column with the cell labels Returns: tuple(list, list): - the target/reference ratios of each FOV - the reference/target ratios of each FOV """ targ_ref_ratio, ref_targ_ratio = np.empty(0), np.empty(0) for fov in fov_list: # subset neighbors mat by fov, drop fov name and labels neighbors_mat_fov = neighbors_mat[neighbors_mat[fov_col] == fov] misc_utils.verify_in_list(provided_column_names=[cell_col, fov_col, label_col], cell_neighbors_columns=neighbors_mat.columns) neighbors_mat_fov = neighbors_mat_fov.drop(columns=[fov_col, label_col]) # get number of target and reference cells in sample target_total = neighbors_mat_fov[neighbors_mat_fov[cell_col].isin(target_cells)].shape[0] reference_total = neighbors_mat_fov[ neighbors_mat_fov[cell_col].isin(reference_cells)].shape[0] if target_total == 0 or reference_total == 0: targ_ref_ratio = np.append(targ_ref_ratio, np.nan) ref_targ_ratio = np.append(ref_targ_ratio, np.nan) else: targ_ref_ratio = np.append(targ_ref_ratio, target_total / reference_total) ref_targ_ratio = np.append(ref_targ_ratio, reference_total / target_total) # remove nan values for plotting targ_ref_remove_nan = targ_ref_ratio[~np.isnan(targ_ref_ratio)] ref_targ_remove_nan = ref_targ_ratio[~np.isnan(ref_targ_ratio)] # filter values targ_ref_filter = targ_ref_remove_nan[targ_ref_remove_nan < 15] ref_targ_filter = ref_targ_remove_nan[ref_targ_remove_nan < 15] # create ratio plots sns.set(rc={'figure.figsize': (16, 4)}) fig, (ax1, ax2) = plt.subplots(1, 2) fig.suptitle("Population 1 / Population 2 Ratios") ax1.boxplot(targ_ref_filter, 0, 'c', vert=False) ax1.set(xlabel='Ratio') ax2.hist(targ_ref_filter, bins=bin_number) ax2.set(xlabel='Ratio', ylabel='Count') fig2, (ax3, ax4) = plt.subplots(1, 2) fig2.suptitle("Population 2 / Population 1 Ratios") ax3.boxplot(ref_targ_filter, 0, 'c', vert=False) ax3.set(xlabel='Ratio') ax4.hist(ref_targ_filter, bins=bin_number) ax4.set(xlabel='Ratio', ylabel='Count') ratio_data = pd.DataFrame(list(zip(fov_list, targ_ref_ratio)), columns=['fov', 'cell_ratio']) return ratio_data
[docs]def compute_mixing_score(fov_neighbors_mat, target_cells, reference_cells, mixing_type, ratio_threshold=5, cell_count_thresh=200, cell_col=settings.CELL_TYPE, fov_col=settings.FOV_ID, label_col=settings.CELL_LABEL): """ Compute and return the mixing score for the specified target/reference cell types Args: fov_neighbors_mat (pandas.DataFrame): a neighborhood matrix, created from create_neighborhood_matrix and subsetted for 1 fov target_cells (list): invading cell phenotypes reference_cells (list): expected cell phenotypes mixing_type (str): "homogeneous" or "percent", homogeneous is a symmetrical calculation ratio_threshold (int): maximum ratio of cell_types required to calculate a mixing score, under this labeled "cold" cell_count_thresh (int): minimum number of total cells from both populations to calculate a mixing score, under this labeled "cold" cell_col (str): column with the cell phenotype fov_col (str): column with the fovs label_col (str): column with the cell labels Returns: float: the mixing score for the FOV """ # read in fov cell neighbors, drop fov, cell label, and cell type columns misc_utils.verify_in_list(provided_column_names=[cell_col, fov_col, label_col], cell_neighbors_columns=fov_neighbors_mat.columns) fov_neighbors_mat = fov_neighbors_mat.drop(columns=[fov_col, label_col]) # cell types validation overlap = [cell for cell in target_cells if cell in reference_cells] if overlap: raise ValueError(f"The following cell types were included in both the target and reference" f" populations: {overlap}") all_cells = fov_neighbors_mat[cell_col].unique() # mixing_type validation if mixing_type not in ['percent', 'homogeneous']: raise ValueError(f'Please provide a valid mixing_type: "percent" or "homogeneous".') # get number of target and reference cells in sample target_total = fov_neighbors_mat[fov_neighbors_mat[cell_col].isin(target_cells)].shape[0] ref_total = fov_neighbors_mat[fov_neighbors_mat[cell_col].isin(reference_cells)].shape[0] # check cell count threshold if (target_total + ref_total) < cell_count_thresh: return np.nan, (target_total + ref_total) elif ref_total == 0 or target_total == 0: return np.nan, (target_total + ref_total) # check ratio threshold if ref_total/target_total > ratio_threshold or target_total/ref_total > ratio_threshold: return np.nan, (target_total + ref_total) # condense to total number of cell type interactions fov_neighbors_mat[cell_col] = fov_neighbors_mat[cell_col].replace(target_cells, 'target') fov_neighbors_mat[cell_col] = fov_neighbors_mat[cell_col].replace(reference_cells, 'reference') interactions_mat = fov_neighbors_mat.groupby(by=[cell_col]).sum(numeric_only=True) # combine cell interactions by target and reference populations interactions_mat['target'] = [0] * interactions_mat.shape[0] interactions_mat['reference'] = [0] * interactions_mat.shape[0] for target_cell in target_cells: if target_cell in all_cells: interactions_mat['target'] = interactions_mat['target'] + interactions_mat[target_cell] for reference_cell in reference_cells: if reference_cell in all_cells: interactions_mat['reference'] = interactions_mat['reference'] + \ interactions_mat[reference_cell] # count interactions reference_target = interactions_mat.loc['target', 'reference'] target_target = interactions_mat.loc['target', 'target'] reference_reference = interactions_mat.loc['reference', 'reference'] # mixing score calculation if mixing_type == 'percent': # percent mixing mixing_score = reference_target / (reference_target + target_target) elif mixing_type == 'homogeneous': # homogenous mixing mixing_score = reference_target / (target_target + reference_reference) return mixing_score, (target_total + ref_total)