Source code for ark.phenotyping.pixel_meta_clustering
import multiprocessing
import os
from functools import partial
from shutil import move, rmtree
import feather
import numpy as np
import pandas as pd
from alpineer import io_utils, misc_utils
from pyarrow.lib import ArrowInvalid
from ark.phenotyping import cluster_helpers, pixel_cluster_utils
multiprocessing.set_start_method('spawn', force=True)
[docs]def run_pixel_consensus_assignment(pixel_data_path, pixel_cc_obj, fov):
"""Helper function to assign pixel consensus clusters
Args:
pixel_data_path (str):
The path to the pixel data directory
pixel_cc_obj (ark.phenotyping.cluster_helpers.PixieConsensusCluster):
The pixel consensus cluster object
fov (str):
The name of the FOV to process
Returns:
tuple (str, int):
The name of the FOV as well as the return code
"""
# get the path to the fov
fov_path = os.path.join(pixel_data_path, fov + '.feather')
# read in the fov data with SOM labels
try:
fov_data = feather.read_dataframe(fov_path)
# this indicates this fov file is corrupted
except (ArrowInvalid, OSError, IOError):
return fov, 1
# assign the consensus labels to fov_data
fov_data = pixel_cc_obj.assign_consensus_labels(fov_data)
# resave the data with the meta cluster labels assigned
temp_path = os.path.join(pixel_data_path + '_temp', fov + '.feather')
feather.write_dataframe(fov_data, temp_path, compression='uncompressed')
return fov, 0
[docs]def pixel_consensus_cluster(fovs, channels, base_dir, max_k=20, cap=3,
data_dir='pixel_mat_data',
pc_chan_avg_som_cluster_name='pixel_channel_avg_som_cluster.csv',
multiprocess=False, batch_size=5, seed=42, overwrite=False):
"""Run consensus clustering algorithm on pixel-level summed data across channels
Saves data with consensus cluster labels to `data_dir`.
Args:
fovs (list):
The list of fovs to subset on
channels (list):
The list of channels to subset on
base_dir (str):
The path to the data directory
max_k (int):
The number of consensus clusters
cap (int):
z-score cap to use when hierarchical clustering
data_dir (str):
Name of the directory which contains the full preprocessed pixel data.
This data should also have the SOM cluster labels appended from `cluster_pixels`.
pc_chan_avg_som_cluster_name (str):
Name of file to save the channel-averaged results across all SOM clusters to
multiprocess (bool):
Whether to use multiprocessing or not
batch_size (int):
The number of FOVs to process in parallel, ignored if `multiprocess` is `False`
seed (int):
The random seed to set for consensus clustering
overwrite (bool):
If set, force overwrites the meta labels in all the FOVs
Returns:
cluster_helpers.PixieConsensusCluster:
The consensus cluster object containing the SOM to meta mapping
"""
# define the paths to the data
pixel_data_path = os.path.join(base_dir, data_dir)
som_cluster_avg_path = os.path.join(base_dir, pc_chan_avg_som_cluster_name)
# path validation
io_utils.validate_paths([pixel_data_path, som_cluster_avg_path])
# if overwrite flag set, run on all FOVs in data_dir
if overwrite:
print('Overwrite flag set, reassigning meta cluster labels to all FOVs')
os.mkdir(pixel_data_path + '_temp')
fovs_list = io_utils.remove_file_extensions(
io_utils.list_files(pixel_data_path, substrs='.feather')
)
# otherwise, only assign meta clusters to FOVs that don't already have them
else:
fovs_list = pixel_cluster_utils.find_fovs_missing_col(
base_dir, data_dir, 'pixel_meta_cluster'
)
# make sure fovs_list only contain fovs that exist in the master fovs list specified
fovs_list = list(set(fovs_list).intersection(fovs))
# if there are no FOVs left without meta labels don't run function
if len(fovs_list) == 0:
print("There are no more FOVs to assign meta labels to, skipping")
return
# if meta cluster labeling is only partially complete, inform the user of restart
if len(fovs_list) < len(fovs):
print("Restarting meta cluster label assignment from fov %s, "
"%d fovs left to process" % (fovs_list[0], len(fovs_list)))
# consensus clustering setup
pixel_cc = cluster_helpers.PixieConsensusCluster(
'pixel', som_cluster_avg_path, channels, max_k=max_k, cap=cap
)
# z-score and cap the data
print("z-score scaling and capping data")
pixel_cc.scale_data()
# set random seed for consensus clustering
np.random.seed(seed)
# run consensus clustering
print("Running consensus clustering")
pixel_cc.run_consensus_clustering()
# generate the the som to meta cluster map
pixel_cc.generate_som_to_meta_map()
# define variable to keep track of number of fovs processed
fovs_processed = 0
# define the partial function to iterate over
fov_data_func = partial(
run_pixel_consensus_assignment, pixel_data_path, pixel_cc
)
# use the som to meta mapping to assign meta cluster values to data in data_path
print("Mapping pixel data to consensus cluster labels")
# TODO: this multiprocess logic will be duplicated in several places, should be own function
if multiprocess:
with multiprocessing.get_context('spawn').Pool(batch_size) as fov_data_pool:
for fov_batch in [fovs_list[i:(i + batch_size)]
for i in range(0, len(fovs_list), batch_size)]:
fov_statuses = fov_data_pool.map(fov_data_func, fov_batch)
for fs in fov_statuses:
if fs[1] == 1:
print("The data for FOV %s has been corrupted, skipping" % fs[0])
fovs_processed -= 1
# update number of fovs processed
fovs_processed += len(fov_batch)
print("Processed %d fovs" % fovs_processed)
else:
for fov in fovs_list:
fov_status = fov_data_func(fov)
if fov_status[1] == 1:
print("The data for FOV %s has been corrupted, skipping" % fov_status[0])
fovs_processed -= 1
# update number of fovs processed
fovs_processed += 1
# update every 10 FOVs, or at the very end
if fovs_processed % 10 == 0 or fovs_processed == len(fovs_list):
print("Processed %d fovs" % fovs_processed)
# remove the data directory and rename the temp directory to the data directory
rmtree(pixel_data_path)
move(pixel_data_path + '_temp', pixel_data_path)
return pixel_cc
[docs]def generate_meta_avg_files(fovs, channels, base_dir, pixel_cc, data_dir='pixel_mat_data',
pc_chan_avg_som_cluster_name='pixel_channel_avg_som_cluster.csv',
pc_chan_avg_meta_cluster_name='pixel_channel_avg_meta_cluster.csv',
num_fovs_subset=100, seed=42, overwrite=False):
"""Computes and saves the average channel expression across pixel meta clusters.
Assigns meta cluster labels to the data stored in `pc_chan_avg_som_cluster_name`.
Args:
fovs (list):
The list of fovs to subset on
channels (list):
The list of channels to subset on
base_dir (str):
The path to the data directory
pixel_cc (cluster_helpers.PixieConsensusCluster):
The consensus cluster object containing the SOM to meta mapping
data_dir (str):
Name of the directory which contains the full preprocessed pixel data.
This data should also have the SOM cluster labels appended from `cluster_pixels`.
pc_chan_avg_som_cluster_name (str):
Name of file to save the channel-averaged results across all SOM clusters to
pc_chan_avg_meta_cluster_name (str):
Name of file to save the channel-averaged results across all meta clusters to
num_fovs_subset (float):
The number of FOVs to subset on for meta cluster channel averaging
seed (int):
The random seed to use for subsetting FOVs
overwrite (bool):
If set, force overwrites the existing average channel expression file if it exists
"""
# define the paths to the data
som_cluster_avg_path = os.path.join(base_dir, pc_chan_avg_som_cluster_name)
meta_cluster_avg_path = os.path.join(base_dir, pc_chan_avg_meta_cluster_name)
# path validation
io_utils.validate_paths([som_cluster_avg_path])
# if the channel meta average file already exists and the overwrite flag isn't set, skip
if os.path.exists(meta_cluster_avg_path):
if not overwrite:
print("Already generated meta cluster channel average file, skipping")
return
print("Overwrite flag set, regenerating meta cluster channel average file")
# compute average channel expression for each pixel meta cluster
# and the number of pixels per meta cluster
print("Computing average channel expression across pixel meta clusters")
pixel_channel_avg_meta_cluster = pixel_cluster_utils.compute_pixel_cluster_channel_avg(
fovs,
channels,
base_dir,
'pixel_meta_cluster',
pixel_cc.max_k,
data_dir,
num_fovs_subset=num_fovs_subset,
seed=seed,
keep_count=True
)
# save pixel_channel_avg_meta_cluster
pixel_channel_avg_meta_cluster.to_csv(
meta_cluster_avg_path,
index=False
)
# merge metacluster assignments in
print("Mapping meta cluster values onto average channel expression across pixel SOM clusters")
pixel_channel_avg_som_cluster = pd.read_csv(som_cluster_avg_path)
# this happens if the overwrite flag is set with previously generated data, need to overwrite
if 'pixel_meta_cluster' in pixel_channel_avg_som_cluster.columns.values:
pixel_channel_avg_som_cluster = pixel_channel_avg_som_cluster.drop(
columns='pixel_meta_cluster'
)
pixel_channel_avg_som_cluster["pixel_som_cluster"] =\
pixel_channel_avg_som_cluster["pixel_som_cluster"].astype(int)
pixel_channel_avg_som_cluster = pd.merge_asof(
pixel_channel_avg_som_cluster, pixel_cc.mapping, on="pixel_som_cluster"
)
# resave channel-averaged results across all pixel SOM clusters with metacluster assignments
pixel_channel_avg_som_cluster.to_csv(
som_cluster_avg_path,
index=False
)
[docs]def update_pixel_meta_labels(pixel_data_path, pixel_remapped_dict,
pixel_renamed_meta_dict, fov):
"""Helper function to reassign meta cluster names based on remapping scheme to a FOV
Args:
pixel_data_path (str):
The path to the pixel data drectory
pixel_remapped_dict (dict):
The mapping from pixel SOM cluster to pixel meta cluster label (not renamed)
pixel_renamed_meta_dict (dict):
The mapping from pixel meta cluster label to renamed pixel meta cluster name
fov (str):
The name of the FOV to process
Returns:
tuple (str, int):
The name of the FOV as well as the return code
"""
# get the path to the fov
fov_path = os.path.join(pixel_data_path, fov + '.feather')
# read in the fov data with SOM and meta cluster labels
try:
fov_data = feather.read_dataframe(fov_path)
# this indicates this fov file is corrupted
except (ArrowInvalid, OSError, IOError):
return fov, 1
# ensure that no SOM clusters are missing from the mapping
misc_utils.verify_in_list(
fov_som_labels=fov_data['pixel_som_cluster'].unique(),
som_labels_in_mapping=list(pixel_remapped_dict.keys())
)
# assign the new meta cluster labels
fov_data['pixel_meta_cluster'] = fov_data['pixel_som_cluster'].map(
pixel_remapped_dict
)
# assign the renamed meta cluster names
fov_data['pixel_meta_cluster_rename'] = fov_data['pixel_meta_cluster'].map(
pixel_renamed_meta_dict
)
# resave the data with the new meta cluster labels
temp_path = os.path.join(pixel_data_path + '_temp', fov + '.feather')
feather.write_dataframe(fov_data, temp_path, compression='uncompressed')
return fov, 0
[docs]def apply_pixel_meta_cluster_remapping(fovs, channels, base_dir,
pixel_data_dir,
pixel_remapped_name,
multiprocess=False, batch_size=5):
"""Apply the meta cluster remapping to the data in `pixel_data_dir`.
Args:
fovs (list):
The list of fovs to subset on
channels (list):
The list of channels to subset on
base_dir (str):
The path to the data directories
pixel_data_dir (str):
Name of directory with the full pixel data.
This data should also have the SOM cluster labels appended from `cluster_pixels`
and the meta cluster labels appended from `pixel_consensus_cluster`.
pixel_remapped_name (str):
Name of the file containing the pixel SOM clusters to their remapped meta clusters
multiprocess (bool):
Whether to use multiprocessing or not
batch_size (int):
The number of FOVs to process in parallel
"""
# define the data paths
pixel_data_path = os.path.join(base_dir, pixel_data_dir)
pixel_remapped_path = os.path.join(base_dir, pixel_remapped_name)
# file path validation
io_utils.validate_paths([pixel_data_path, pixel_remapped_path])
# read in the remapping
pixel_remapped_data = pd.read_csv(pixel_remapped_path)
# assert the correct columns are contained
misc_utils.verify_in_list(
required_cols=['pixel_som_cluster', 'pixel_meta_cluster', 'pixel_meta_cluster_rename'],
remapped_data_cols=pixel_remapped_data.columns.values,
)
# create the mapping from pixel SOM to pixel meta cluster
pixel_remapped_dict = dict(
pixel_remapped_data[
['pixel_som_cluster', 'pixel_meta_cluster']
].values
)
# ensure no duplicated renamed meta clusters make it in
cluster_helpers.verify_unique_meta_clusters(pixel_remapped_data, meta_cluster_type="pixel")
# create the mapping from pixel meta cluster to renamed pixel meta cluster
pixel_renamed_meta_dict = dict(pixel_remapped_data[
['pixel_meta_cluster', 'pixel_meta_cluster_rename']
].drop_duplicates().values)
# define the partial function to iterate over
fov_data_func = partial(
update_pixel_meta_labels, pixel_data_path,
pixel_remapped_dict, pixel_renamed_meta_dict
)
# if it doesn't already exist, create a temporary directory to write the data
if not os.path.exists(pixel_data_path + '_temp'):
os.mkdir(pixel_data_path + '_temp')
fov_list = fovs
# otherwise, re-run only for unprocessed FOVs
else:
# NOTE: this will only test the else statement in find_fovs_missing_col
fov_list = pixel_cluster_utils.find_fovs_missing_col(
base_dir, pixel_data_dir, 'pixel_meta_cluster_rename'
)
print("Restarting meta cluster remapping assignment from %s, "
"%d fovs left to process" % (fov_list[0], len(fov_list)))
# define variable to keep track of number of fovs processed
fovs_processed = 0
print("Using re-mapping scheme to re-label pixel meta clusters")
if multiprocess:
# define the multiprocessing context
with multiprocessing.get_context('spawn').Pool(batch_size) as fov_data_pool:
# asynchronously generate and save the pixel matrices per FOV
for fov_batch in [fov_list[i:(i + batch_size)]
for i in range(0, len(fov_list), batch_size)]:
fov_statuses = fov_data_pool.map(fov_data_func, fov_batch)
for fs in fov_statuses:
if fs[1] == 1:
print("The data for FOV %s has been corrupted, skipping" % fs[0])
fovs_processed -= 1
# update number of fovs processed
fovs_processed += len(fov_batch)
print("Processed %d fovs" % fovs_processed)
else:
for fov in fov_list:
fov_status = fov_data_func(fov)
if fov_status[1] == 1:
print("The data for FOV %s has been corrupted, skipping" % fov_status[0])
fovs_processed -= 1
# update number of fovs processed
fovs_processed += 1
# update every 10 FOVs, or at the very end
if fovs_processed % 10 == 0 or fovs_processed == len(fov_list):
print("Processed %d fovs" % fovs_processed)
# remove the data directory and rename the temp directory to the data directory
rmtree(pixel_data_path)
move(pixel_data_path + '_temp', pixel_data_path)
[docs]def generate_remap_avg_files(fovs, channels, base_dir, pixel_data_dir, pixel_remapped_name,
pc_chan_avg_som_cluster_name, pc_chan_avg_meta_cluster_name,
num_fovs_subset=100, seed=42):
"""Resaves the re-mapped consensus data to `pixel_data_dir` and re-runs the
average channel expression per pixel meta cluster computation.
Re-maps the pixel SOM clusters to meta clusters in `pc_chan_avg_som_cluster_name`.
Args:
fovs (list):
The list of fovs to subset on
channels (list):
The list of channels to subset on
base_dir (str):
The path to the data directories
pixel_data_dir (str):
Name of directory with the full pixel data.
This data should also have the SOM cluster labels appended from `cluster_pixels`
and the meta cluster labels appended from `pixel_consensus_cluster`.
pixel_remapped_name (str):
Name of the file containing the pixel SOM clusters to their remapped meta clusters
pc_chan_avg_som_cluster_name (str):
Name of the file containing the channel-averaged results across all SOM clusters
pc_chan_avg_meta_cluster_name (str):
Name of the file containing the channel-averaged results across all meta clusters
num_fovs_subset (float):
The number of FOVs to subset on for meta cluster channel averaging
seed (int):
The random seed to use for subsetting FOVs
"""
# define the data paths
pixel_remapped_path = os.path.join(base_dir, pixel_remapped_name)
som_cluster_avg_path = os.path.join(base_dir, pc_chan_avg_som_cluster_name)
meta_cluster_avg_path = os.path.join(base_dir, pc_chan_avg_meta_cluster_name)
# file path validation
io_utils.validate_paths([pixel_remapped_path, som_cluster_avg_path, meta_cluster_avg_path])
# read in the remapping
# TODO: define a separate function for this duplicated logic, OOP will help greatly (soon...)
pixel_remapped_data = pd.read_csv(pixel_remapped_path)
# create the mapping from pixel SOM to pixel meta cluster
pixel_remapped_dict = dict(
pixel_remapped_data[
['pixel_som_cluster', 'pixel_meta_cluster']
].values
)
# create the mapping from pixel meta cluster to renamed pixel meta cluster
pixel_renamed_meta_dict = dict(
pixel_remapped_data[
['pixel_meta_cluster', 'pixel_meta_cluster_rename']
].drop_duplicates().values
)
# re-compute average channel expression for each pixel meta cluster
# and the number of pixels per meta cluster, add renamed meta cluster column in
print("Re-computing average channel expression across pixel meta clusters")
pixel_channel_avg_meta_cluster = pixel_cluster_utils.compute_pixel_cluster_channel_avg(
fovs,
channels,
base_dir,
'pixel_meta_cluster',
len(pixel_remapped_data['pixel_meta_cluster'].unique()),
pixel_data_dir,
num_fovs_subset=num_fovs_subset,
seed=seed,
keep_count=True
)
pixel_channel_avg_meta_cluster['pixel_meta_cluster_rename'] = \
pixel_channel_avg_meta_cluster['pixel_meta_cluster'].map(pixel_renamed_meta_dict)
# re-save the pixel channel average meta cluster table
pixel_channel_avg_meta_cluster.to_csv(meta_cluster_avg_path, index=False)
# re-assign pixel meta cluster labels back to the pixel channel average som cluster table
pixel_channel_avg_som_cluster = pd.read_csv(som_cluster_avg_path)
print("Re-assigning meta cluster column in pixel SOM cluster average channel expression table")
pixel_channel_avg_som_cluster['pixel_meta_cluster'] = \
pixel_channel_avg_som_cluster['pixel_som_cluster'].map(pixel_remapped_dict)
pixel_channel_avg_som_cluster['pixel_meta_cluster_rename'] = \
pixel_channel_avg_som_cluster['pixel_meta_cluster'].map(pixel_renamed_meta_dict)
# re-save the pixel channel average som cluster table
pixel_channel_avg_som_cluster.to_csv(som_cluster_avg_path, index=False)