Source code for ark.analysis.spatial_analysis_utils

import os

import numpy as np
import pandas as pd
import scipy
import sklearn.metrics
import xarray as xr
from alpineer import io_utils, misc_utils
from scipy.spatial.distance import cdist
from sklearn.cluster import KMeans
from statsmodels.stats.multitest import multipletests
from tqdm.notebook import tqdm

import ark.settings as settings


[docs]def calc_dist_matrix(cell_table, save_path, fov_id=settings.FOV_ID, label_id=settings.CELL_LABEL, centroid_ids=(settings.CENTROID_0, settings.CENTROID_1)): """Generate matrix of distances between center of pairs of cells. Saves each one individually to `save_path`. Args: cell_table (str): data frame with fov, label, and centroid information save_path (str): path to save the distance matrices fov_id (str): the column name containing the fov label_id (str): the column name containing the cell label centroid_ids (tuple): the columns identifying the centroids of each cell """ # check that both label_dir and save_path exist io_utils.validate_paths([save_path]) # load all the file names in label_dir fovs = cell_table[fov_id].unique() # iterate for each fov with tqdm(total=len(fovs), desc="Distance Matrix Generation", unit="FOVs") \ as dist_mat_progress: for fov in fovs: dist_mat_progress.set_postfix(FOV=fov) fov_table = cell_table[cell_table[fov_id] == fov] # get centroid and label info centroids = [(row[centroid_ids[0]], row[centroid_ids[1]]) for indx, row in fov_table.iterrows()] centroid_labels = list(fov_table[label_id]) # generate the distance matrix, then assign centroid_labels as coords dist_matrix = cdist(centroids, centroids).astype(np.float32) dist_mat_xarr = xr.DataArray(dist_matrix, coords=[centroid_labels, centroid_labels]) # save the distance matrix to save_path dist_mat_xarr.to_netcdf( os.path.join(save_path, fov + '_dist_mat.xr'), format='NETCDF3_64BIT' ) dist_mat_progress.update(1)
[docs]def append_distance_features_to_dataset(fov, dist_matrix, cell_table, distance_columns): """Appends selected distance features as 'cells' in distance matrix and cell table Args: fov (str): the name of the FOV dist_matrix (xarray.DataArray): a cells x cells matrix with the euclidian distance between centers of corresponding cells for the FOV cell_table (pd.DataFrame): Table of cell features. Must contain provided distance columns distance_columns (List[str]): List of column names which store feature distance. These must exist in cell_table Returns: (pd.DataFrame, dict): Updated cell_table, and distance matricie indexed by fov name """ misc_utils.verify_in_list(distance_columns=distance_columns, valid_columns=cell_table.columns) num_cell_types = max(list(cell_table[settings.CELL_TYPE].astype("category").cat.codes)) + 1 dist_list = [] fov_cells = cell_table.loc[cell_table[settings.FOV_ID] == fov] num_labels = max(fov_cells[settings.CELL_LABEL]) for i, dist_col in enumerate(distance_columns): dist_list.append(pd.DataFrame([{ settings.FOV_ID: fov, settings.CELL_LABEL: num_labels + i + 1, settings.CELL_TYPE: dist_col, settings.CELL_TYPE_NUM: num_cell_types + i + 1, }])) coords = ( [max(dist_matrix.dim_0.values) + i + 1], fov_cells[settings.CELL_LABEL].values[:] ) dist_matrix = xr.concat([dist_matrix, xr.DataArray( fov_cells[dist_col].values[np.newaxis, :], coords=coords )], dim='dim_0') dist_matrix = xr.concat([dist_matrix, xr.DataArray( fov_cells[dist_col].values[:, np.newaxis], coords=(coords[1], coords[0]) )], dim='dim_1') distance_features = pd.concat(dist_list) cell_table = pd.concat([cell_table, distance_features]) return cell_table, dist_matrix
[docs]def get_pos_cell_labels_channel(thresh, current_fov_channel_data, cell_labels, current_marker): """For channel enrichment, finds positive labels that match the current phenotype or identifies cells with positive expression values for the current marker (greater than the marker threshold). Args: thresh (int): current threshold for marker current_fov_channel_data (pandas.DataFrame): expression data for column markers for current patient cell_labels (pandas.DataFrame): the column of cell labels for current patient current_marker (str): the current marker that the positive labels are being found for Returns: list: List of all the positive labels""" # Subset only cells that are positive for the given marker marker1posinds = current_fov_channel_data[current_marker] > thresh # Get the cell labels of the positive cells mark1poslabels = cell_labels[marker1posinds] return mark1poslabels
[docs]def get_pos_cell_labels_cluster(pheno, current_fov_neighborhood_data, cell_label_col, cell_type_col): """For cluster enrichment, finds positive labels that match the current phenotype or identifies cells with positive expression values for the current marker (greater than the marker threshold). Args: pheno (str): the current cell phenotype current_fov_neighborhood_data (pandas.DataFrame): data for the current patient cell_label_col (str): the name of the column indicating the cell label cell_type_col (str): the name of the column indicating the cell type Returns: list: List of all the positive labels""" # Subset only cells that are of the same phenotype pheno1posinds = current_fov_neighborhood_data[cell_type_col] == pheno # Get the cell labels of the cells of the phenotype mark1poslabels = current_fov_neighborhood_data.loc[:, cell_label_col][pheno1posinds] return mark1poslabels
[docs]def compute_close_cell_num(dist_mat, dist_lim, analysis_type, current_fov_data=None, current_fov_channel_data=None, cluster_ids=None, cell_types_analyze=None, thresh_vec=None, cell_label_col=settings.CELL_LABEL, cell_type_col=settings.CELL_TYPE_NUM): """Finds positive cell labels and creates matrix with counts for cells positive for corresponding markers. Computes close_num matrix for both Cell Label and Threshold spatial analyses. This function loops through all the included markers in the patient data and identifies cell labels positive for corresponding markers. It then subsets the distance matrix to only include these positive cells and records interactions based on whether cells are close to each other (within the dist_lim). It then stores the number of interactions in the index of close_num corresponding to both markers (for instance markers 1 and 2 would be in index [0, 1]). Args: dist_mat (numpy.ndarray): cells x cells matrix with the euclidian distance between centers of corresponding cells dist_lim (int): threshold for spatial enrichment distance proximity analysis_type (str): type of analysis, must be either cluster or channel current_fov_data (pandas.DataFrame): data for specific patient in expression matrix current_fov_channel_data (pandas.DataFrame): data of only column markers for Channel Analysis cluster_ids (numpy.ndarray): all the cell phenotypes in Cluster Analysis cell_types_analyze (list): a list of the cell types we wish to analyze, if None we set it equal to all cell types thresh_vec (numpy.ndarray): matrix of thresholds column for markers cell_label_col (str): the name of the column containing the cell labels cell_type_col (str): the name of the column containing the cell type numbers Returns: numpy.ndarray: 2D array containing marker x marker matrix with counts for cells positive for corresponding markers, as well as a list of number of cell labels for marker 1 """ # assert our analysis type is valid good_analyses = ["cluster", "channel"] misc_utils.verify_in_list(analysis_type=analysis_type, good_analyses=good_analyses) # Initialize variables cell_labels = [] # Subset data based on analysis type if analysis_type == "channel": # Subsetting the column with the cell labels cell_labels = current_fov_data[cell_label_col] # assign the dimension of close_num respective to type of analysis if analysis_type == "channel": num = len(thresh_vec) else: num = len(cluster_ids) # Create close_num, marker1_num, and marker2_num close_num = np.zeros((num, num), dtype=np.uint16) mark1_num = [] mark1poslabels = [] dist_mat_bin = xr.DataArray( ((dist_mat.values < dist_lim) & (dist_mat.values > 0)).astype(np.uint8), coords=dist_mat.coords ) for j in range(num): if analysis_type == "cluster": mark1poslabels.append( get_pos_cell_labels_cluster(pheno=cluster_ids[j], current_fov_neighborhood_data=current_fov_data, cell_label_col=cell_label_col, cell_type_col=cell_type_col)) else: mark1poslabels.append( get_pos_cell_labels_channel(thresh=thresh_vec[j], current_fov_channel_data=current_fov_channel_data, cell_labels=cell_labels, current_marker=current_fov_channel_data.columns[j])) mark1_num.append(len(mark1poslabels[j])) # iterating k from [j, end] cuts out 1/2 the steps (while symmetric) for j, m1n in enumerate(mark1_num): for k, m2n in enumerate(mark1_num[j:], j): dist_mat_bin_subset = dist_mat_bin.loc[ mark1poslabels[j].values, mark1poslabels[k].values ].values count_close_num_hits = np.sum(dist_mat_bin_subset, dtype=np.uint16) close_num[j, k] = count_close_num_hits # symmetry :) close_num[k, j] = close_num[j, k] return close_num, mark1_num, mark1poslabels
[docs]def compute_neighbor_counts(current_fov_neighborhood_data, dist_matrix, distlim, self_neighbor=False, cell_label_col=settings.CELL_LABEL, cluster_name_col=settings.CELL_TYPE): """Calculates the number of neighbor phenotypes for each cell. The cell counts itself as a neighbor if self_neighbor=True. Args: current_fov_neighborhood_data (pandas.DataFrame): data for the current fov, including the cell labels, cell phenotypes, and cell phenotype dist_matrix (numpy.ndarray): cells x cells matrix with the euclidian distance between centers of corresponding cells distlim (int): threshold for distance proximity self_neighbor (bool): If true, cell counts itself as a neighbor in the analysis. cell_label_col (str): Column name with the cell labels cluster_name_col (str): Column name with the cell types Returns: tuple (pandas.DataFrame, pandas.DataFrame): - phenotype counts per cell - phenotype frequencies of counts per total for each cell """ # subset our distance matrix based on the cell labels provided cell_labels = current_fov_neighborhood_data[cell_label_col].values cell_dist_mat = dist_matrix.loc[cell_labels, cell_labels].values # binarize distance matrix cell_dist_mat_bin = np.zeros(cell_dist_mat.shape) cell_dist_mat_bin[cell_dist_mat < distlim] = 1 # remove cell as it's own neighbor if not self_neighbor: cell_dist_mat_bin[cell_dist_mat == 0] = 0 # get num_neighbors for freqs num_neighbors = np.sum(cell_dist_mat_bin, axis=0) # create the 'phenotype has cell?' matrix, excluding non cell-label rows pheno_has_cell_pd = pd.get_dummies(current_fov_neighborhood_data.loc[:, cluster_name_col]) pheno_names_from_tab = pheno_has_cell_pd.columns.values pheno_has_cell = pheno_has_cell_pd.to_numpy().T # dot binarized 'is neighbor?' matrix with pheno_has_cell to get counts counts = pheno_has_cell.dot(cell_dist_mat_bin).T counts_pd = pd.DataFrame(counts, columns=pheno_names_from_tab) counts_pd = counts_pd.set_index(current_fov_neighborhood_data.index.copy()) # there may be errors if num_neighbors = 0, suppress these warnings np.seterr(invalid='ignore') # compute freqs with num_neighbors freqs = counts.T / num_neighbors freqs_pd = pd.DataFrame(freqs.T, columns=pheno_names_from_tab) freqs_pd = freqs_pd.set_index(current_fov_neighborhood_data.index.copy()) # change na's to 0, will remove these cells before clustering freqs_pd = freqs_pd.fillna(0) return counts_pd, freqs_pd
[docs]def compute_kmeans_inertia(neighbor_mat_data, min_k=2, max_k=10, seed=42): """For a given neighborhood matrix, cluster and compute inertia using k-means clustering from the range of k=min_k to max_k Args: neighbor_mat_data (pandas.DataFrame): neighborhood matrix data with only the desired fovs min_k (int): the minimum k we want to generate cluster statistics for, must be at least 2 max_k (int): the maximum k we want to generate cluster statistics for, must be at least 2 seed (int): the random seed to set for k-means clustering Returns: xarray.DataArray: contains a single dimension, `cluster_num`, which indicates the inertia when `cluster_num` was set as k for k-means clustering """ # create array we can store the results of each k for clustering coords = [np.arange(min_k, max_k + 1)] dims = ["cluster_num"] stats_raw_data = np.zeros(max_k - min_k + 1) cluster_stats = xr.DataArray(stats_raw_data, coords=coords, dims=dims) # iterate over each k value pb_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]' for n in tqdm(range(min_k, max_k + 1), bar_format=pb_format): cluster_fit = KMeans(n_clusters=n, random_state=seed, n_init='auto').fit(neighbor_mat_data) cluster_stats.loc[n] = cluster_fit.inertia_ return cluster_stats
[docs]def compute_kmeans_silhouette(neighbor_mat_data, min_k=2, max_k=10, seed=42, subsample=None): """For a given neighborhood matrix, cluster and compute Silhouette score using k-means from the range of k=min_k to max_k Args: neighbor_mat_data (pandas.DataFrame): neighborhood matrix data with only the desired fovs min_k (int): the minimum k we want to generate cluster statistics for, must be at least 2 max_k (int): the maximum k we want to generate cluster statistics for, must be at least 2 seed (int): the random seed to set for k-means clustering subsample (int): the number of cells that will be sampled from each neighborhood cluster for calculating Silhouette score If None, all cells will be used Returns: xarray.DataArray: contains a single dimension, `cluster_num`, which indicates the Silhouette score when `cluster_num` was set as k for k-means clustering """ # create array we can store the results of each k for clustering coords = [np.arange(min_k, max_k + 1)] dims = ["cluster_num"] stats_raw_data = np.zeros(max_k - min_k + 1) cluster_stats = xr.DataArray(stats_raw_data, coords=coords, dims=dims) # iterate over each k value pb_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]' for n in tqdm(range(min_k, max_k + 1), bar_format=pb_format): cluster_fit = KMeans(n_clusters=n, random_state=seed, n_init='auto').fit(neighbor_mat_data) cluster_labels = cluster_fit.labels_ sub_dat = neighbor_mat_data.copy() sub_dat["cluster"] = cluster_labels if subsample is not None: # Subsample each cluster sub_dat = sub_dat.groupby("cluster").apply( lambda x: x.sample( subsample, replace=len(x) < subsample, random_state=seed) ).reset_index(drop=True) cluster_score = sklearn.metrics.silhouette_score(sub_dat.drop("cluster", axis=1), sub_dat["cluster"], metric="euclidean") cluster_stats.loc[n] = cluster_score return cluster_stats
[docs]def generate_cluster_labels(neighbor_mat_data, cluster_num, seed=42): """Run k-means clustering with k=cluster_num Give the same data, given several runs the clusters will always be the same, but the labels assigned will likely be different Args: neighbor_mat_data (pandas.DataFrame): neighborhood matrix data with only the desired fovs cluster_num (int): the k we want to use when running k-means clustering seed (int): the random seed to set for k-means clustering Returns: numpy.ndarray: the neighborhood cluster labels assigned to each cell in neighbor_mat_data """ cluster_fit = KMeans(n_clusters=cluster_num, random_state=seed, n_init=10).\ fit(neighbor_mat_data) # Add 1 to avoid cluster number 0 cluster_labels = cluster_fit.labels_ + 1 return cluster_labels