import numba as nb
import itertools
import os
import pathlib
import re
from typing import List, Literal, Union, Sequence
from numpy.typing import ArrayLike, DTypeLike
from numpy import ma
import feather
import natsort as ns
import numpy as np
import pandas as pd
import skimage.io as io
from alpineer import data_utils, image_utils, io_utils, load_utils, misc_utils
from alpineer.settings import EXTENSION_TYPES
from tqdm.notebook import tqdm_notebook as tqdm
import xarray as xr
from ark import settings
from skimage.segmentation import find_boundaries
from pandas.core.groupby.generic import DataFrameGroupBy
from anndata import AnnData, read_zarr
from anndata.experimental import AnnCollection
from anndata.experimental.multi_files._anncollection import ConvertType
from typing import Optional
try:
from typing import TypedDict, Unpack
except ImportError:
from typing_extensions import TypedDict, Unpack
[docs]def save_fov_mask(fov, data_dir, mask_data, sub_dir=None, name_suffix=''):
"""Saves a provided cluster label mask overlay for a FOV.
Args:
fov (str):
The FOV to save
data_dir (str):
The directory to save the cluster mask
mask_data (numpy.ndarray):
The cluster mask data for the FOV
sub_dir (Optional[str]):
The subdirectory to save the masks in. If specified images are saved to
"data_dir/sub_dir". If `sub_dir = None` the images are saved to `"data_dir"`.
Defaults to `None`.
name_suffix (str):
Specify what to append at the end of every fov.
"""
# data_dir validation
io_utils.validate_paths(data_dir)
# ensure None is handled correctly in file path generation
if sub_dir is None:
sub_dir = ''
save_dir = os.path.join(data_dir, sub_dir)
# make the save_dir if it doesn't already exist
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# define the file name as the fov name with the name suffix appended
fov_file = fov + name_suffix + '.tiff'
# save the image to data_dir
image_utils.save_image(os.path.join(save_dir, fov_file), mask_data)
[docs]def erode_mask(seg_mask: np.ndarray, **kwargs) -> np.ndarray:
"""
Erodes the edges labels of a segmentation mask.
Other keyword arguments get passed to `skimage.segmentation.find_boundaries`.
Args:
seg_mask (np.ndarray): The segmentation mask to erode.
Returns:
np.ndarray: The eroded segmentation mask
"""
edges = find_boundaries(
label_img=seg_mask, **kwargs)
seg_mask = np.where(edges == 0, seg_mask, 0)
return seg_mask
[docs]class ClusterMaskData:
"""
A class containing the cell labels, cluster labels, and segmentation labels for the
whole cohort. Also contains the mapping from the segmentation label to the cluster
label for each FOV.
"""
fov_column: str
label_column: str
cluster_column: str
unique_fovs: List[str]
cluster_id_column: str
unassigned_id: int
n_clusters: int
mapping: pd.DataFrame
[docs] def __init__(
self, data: pd.DataFrame, fov_col: str, label_col: str, cluster_col: str
) -> None:
"""
A class containing the cell data, cell label column, cluster column and the mapping from a
cell label to a cluster.
Args:
data (pd.DataFrame):
A cell table with the cell label column and the cluster column.
fov_col (str):
The name of the column in the cell table that contains the FOV ID.
label_col (str):
The name of the column in the cell table that contains the cell label.
cluster_col (str):
The name of the column in the cell table that contains the cluster label.
"""
self.fov_column: str = fov_col
self.label_column: str = label_col
self.cluster_column: str = cluster_col
self.cluster_id_column: str = "cluster_id"
# Extract only the necessary columns: fov ID, segmentation label, cluster label
mapping_data: pd.DataFrame = data[
[self.fov_column, self.label_column, self.cluster_column]
].copy()
# Add a cluster_id_column to the column in case the cluster_column is
# non-numeric (i.e. string), index in ascending order of cell_meta_cluster
cluster_name_id = pd.DataFrame(
{self.cluster_column: mapping_data[self.cluster_column].unique()})
cluster_name_id.sort_values(by=f'{self.cluster_column}', inplace=True)
cluster_name_id.reset_index(drop=True, inplace=True)
cluster_name_id[self.cluster_id_column] = (cluster_name_id.index + 1).astype(np.int32)
self.cluster_name_id = cluster_name_id
# merge the cluster_id_column to the mapping_data dataframe
mapping_data = mapping_data.merge(right=self.cluster_name_id, on=self.cluster_column)
mapping_data = mapping_data.astype(
{
self.fov_column: str,
self.label_column: np.int32,
self.cluster_id_column: np.int32,
}
)
self.unique_fovs: List[str] = ns.natsorted(
mapping_data[self.fov_column].unique().tolist()
)
self.unassigned_id: np.int32 = np.int32(
mapping_data[self.cluster_id_column].max() + 1
)
self.n_clusters: int = mapping_data[self.cluster_id_column].max()
# For each FOV map the segmentation label 0 (background) to the cluster label 0
cluster0_mapping: pd.DataFrame = pd.DataFrame(
data={
self.fov_column: self.unique_fovs,
self.label_column: np.repeat(0, repeats=len(self.unique_fovs)),
self.cluster_column: np.repeat(0, repeats=len(self.unique_fovs)),
self.cluster_id_column: np.repeat(0, repeats=len(self.unique_fovs)),
}
)
mapping_data = pd.concat(objs=[mapping_data, cluster0_mapping]).astype(
{
self.fov_column: str,
self.label_column: np.int32,
self.cluster_id_column: np.int32,
}
)
# Sort by FOV first, then by segmentation label
self.mapping = mapping_data.sort_values(by=[self.fov_column, self.label_column])
[docs] def fov_mapping(self, fov: str) -> pd.DataFrame:
"""Returns the mapping for a specific FOV.
Args:
fov (str):
The FOV to get the mapping for.
Returns:
pd.DataFrame:
The mapping for the FOV.
"""
misc_utils.verify_in_list(requested_fov=[fov], all_fovs=self.unique_fovs)
fov_data: pd.DataFrame = self.mapping[self.mapping[self.fov_column] == fov]
return fov_data.reset_index(drop=True)
@property
def cluster_names(self) -> List[str]:
"""Returns the cluster names.
Returns:
List[str]:
The cluster names.
"""
return self.cluster_name_id[self.cluster_column].tolist()
[docs]def label_cells_by_cluster(
fov: str,
cmd: ClusterMaskData,
label_map: Union[np.ndarray, xr.DataArray],
) -> np.ndarray:
"""Translates cell-ID labeled images according to the clustering assignment
found in cell_cluster_mask_data.
Args:
fov (str):
The FOV to relabel
cmd (ClusterMaskData):
A dataclass containing the cell data, cell label column, cluster column and the
mapping from the segmentation label to the cluster label for a given FOV.
label_map (xarray.DataArray):
label map for a single FOV.
Returns:
numpy.ndarray:
The image with new designated label assignments
"""
# verify that fov found in all_data
misc_utils.verify_in_list(
fov_name=[fov],
all_data_fovs=cmd.unique_fovs
)
# condense extraneous axes if label_map is a DataArray
if isinstance(label_map, xr.DataArray):
labeled_image = label_map.squeeze().values.astype(np.int32)
else:
labeled_image: np.ndarray = label_map.squeeze().astype(np.int32)
fov_clusters: pd.DataFrame = cmd.fov_mapping(fov=fov)
mapping: nb.typed.typeddict = nb.typed.Dict.empty(
key_type=nb.types.int32,
value_type=nb.types.int32,
)
for label, cluster in fov_clusters[[cmd.label_column, cmd.cluster_id_column]].itertuples(
index=False):
mapping[np.int32(label)] = np.int32(cluster)
relabeled_image: np.ndarray = relabel_segmentation(
mapping=mapping,
unassigned_id=cmd.unassigned_id,
labeled_image=labeled_image,
_dtype=np.int32)
return relabeled_image.astype(np.int16)
[docs]def map_segmentation_labels(
labels: Union[pd.Series, np.ndarray],
values: Union[pd.Series, np.ndarray],
label_map: ArrayLike,
unassigned_id: float = 0,
) -> np.ndarray:
"""
Maps an image consisting of segmentation labels to an image consisting of a particular type of
statistic, metric, or value of interest.
Args:
labels (Union[pd.Series, np.ndarray]): The segmentation labels.
values (Union[pd.Series, np.ndarray]): The values to map to the segmentation labels.
label_map (ArrayLike): The segmentation labels as an image to map to.
unassigned_id (int | float, optional): A default value to assign there is exists no 1-to-1
mapping from a label in the label_map to a label in the `labels` argument. Defaults to 0.
Returns:
np.ndarray: Returns the mapped image.
"""
# condense extraneous axes if label_map is a DataArray
if isinstance(label_map, xr.DataArray):
labeled_image = label_map.squeeze().values.astype(np.int32)
else:
labeled_image: np.ndarray = label_map.squeeze().astype(np.int32)
if isinstance(labels, pd.Series):
labels = labels.to_numpy(dtype=np.int32)
if isinstance(values, pd.Series):
# handle NaNs, replace with 0
values = ma.fix_invalid(values.to_numpy(dtype=np.float64), fill_value=0).data
mapping: nb.typed.typeddict = nb.typed.Dict.empty(
key_type=nb.types.int32, value_type=nb.types.float64
)
for label, value in zip(labels, values):
mapping[label] = value
relabeled_image: np.ndarray = relabel_segmentation(
mapping=mapping,
unassigned_id=unassigned_id,
labeled_image=labeled_image,
_dtype=np.float64,
)
return relabeled_image
@nb.njit(parallel=True)
def relabel_segmentation(
mapping: nb.typed.typeddict,
unassigned_id: np.int32,
labeled_image: np.ndarray,
_dtype: DTypeLike = np.float64,
) -> np.ndarray:
"""
Relabels a labled segmentation image according to the provided values.
Args:
mapping (nb.typed.typeddict):
A Numba typed dictionary mapping segmentation labels to cluster labels.
unassigned_id (np.int32):
The label given to a pixel with no associated cluster.
labeled_image (np.ndarray):
The labeled segmentation image.
_dtype (DTypeLike, optional):
The data type of the relabeled image. Defaults to `np.float64`.
Returns:
np.ndarray: The relabeled segmentation image.
"""
relabeled_image: np.ndarray = np.empty(shape=labeled_image.shape, dtype=_dtype)
for i in nb.prange(labeled_image.shape[0]):
for j in nb.prange(labeled_image.shape[1]):
relabeled_image[i, j] = mapping.get(labeled_image[i, j], unassigned_id)
return relabeled_image
[docs]def generate_cluster_mask(
fov: str,
seg_dir: Union[str, pathlib.Path],
cmd: ClusterMaskData,
seg_suffix: str = "_whole_cell.tiff",
erode: bool = True,
**kwargs) -> np.ndarray:
"""For a fov, create a mask labeling each cell with their SOM or meta cluster label
Args:
fov (str):
The fov to relabel
seg_dir (str):
The path to the segmentation data
cmd (ClusterMaskData):
A dataclass containing the cell data, cell label column, cluster column and the
mapping from the segmentation label to the cluster label for a given FOV.
seg_suffix (str):
The suffix that the segmentation images use. Defaults to `'_whole_cell.tiff'`.
erode (bool):
Whether to erode the edges of the segmentation mask. Defaults to `True`.
Returns:
numpy.ndarray:
The image where values represent cell cluster labels.
"""
# path checking
io_utils.validate_paths([seg_dir])
# define the file for whole cell
whole_cell_files = [fov + seg_suffix]
# load the segmentation labels in for the FOV
label_map = load_utils.load_imgs_from_dir(
data_dir=seg_dir, files=whole_cell_files, xr_dim_name='compartments',
xr_channel_names=['whole_cell'], trim_suffix=seg_suffix.split('.')[0]
).loc[fov, ...]
if erode:
label_map = erode_mask(label_map, connectivity=2, mode="thick", background=0)
# use label_cells_by_cluster to create cell masks
img_data: np.ndarray = label_cells_by_cluster(
fov=fov,
cmd=cmd,
label_map=label_map
)
return img_data
[docs]def generate_and_save_cell_cluster_masks(
fovs: List[str],
save_dir: Union[pathlib.Path, str],
seg_dir: Union[pathlib.Path, str],
cell_data: pd.DataFrame,
cluster_id_to_name_path: Union[pathlib.Path, str],
fov_col: str = settings.FOV_ID,
label_col: str = settings.CELL_LABEL,
cell_cluster_col: str = settings.CELL_TYPE,
seg_suffix: str = "_whole_cell.tiff",
sub_dir: str = None,
name_suffix: str = "",
):
"""Generates cell cluster masks and saves them for downstream analysis.
Args:
fovs (List[str]):
A list of fovs to generate and save pixel masks for.
save_dir (Union[pathlib.Path, str]):
The directory to save the generated cell cluster masks.
seg_dir (Union[pathlib.Path, str]):
The path to the segmentation data.
cell_data (pd.DataFrame):
The cell data with both cell SOM and meta cluster assignments.
cluster_id_to_name_path (Union[str, pathlib.Path]): A path to a CSV identifying the
cell cluster to manually-defined name mapping this is output by the remapping
visualization found in `metacluster_remap_gui`.
fov_col (str, optional):
The column name containing the FOV IDs . Defaults to `settings.FOV_ID` (`"fov"`).
label_col (str, optional):
The column name containing the cell label. Defaults to
`settings.CELL_LABEL` (`"label"`).
cell_cluster_col (str, optional):
Whether to assign SOM or meta clusters. Needs to be `"cell_som_cluster"` or
`"cell_meta_cluster"`. Defaults to `settings.CELL_TYPE` (`"cell_meta_cluster"`).
seg_suffix (str, optional):
The suffix that the segmentation images use. Defaults to `"_whole_cell.tiff"`.
sub_dir (str, optional):
The subdirectory to save the images in. If specified images are saved to
`"data_dir/sub_dir"`. If `sub_dir = None` the images are saved to `"data_dir"`.
Defaults to `None`.
name_suffix (str, optional):
Specify what to append at the end of every cell mask. Defaults to `""`.
"""
cmd = ClusterMaskData(
data=cell_data,
fov_col=fov_col,
label_col=label_col,
cluster_col=cell_cluster_col,
)
# read in gui cluster mapping file and new cluster mapping created by ClusterMaskData
gui_map = pd.read_csv(cluster_id_to_name_path)
cluster_map = cmd.mapping.filter([cmd.cluster_column, cmd.cluster_id_column])
cluster_map = cluster_map.drop_duplicates()
# drop the cluster_id column from updated_cluster_map if it already exists, otherwise do nothing
gui_map = gui_map.drop(columns="cluster_id", errors="ignore")
# add a cluster_id column corresponding to the new mask integers
updated_cluster_map = gui_map.merge(cluster_map, on=[cmd.cluster_column], how="left")
updated_cluster_map.to_csv(cluster_id_to_name_path, index=False)
# create the pixel cluster masks across each fov
with tqdm(total=len(fovs), desc="Cell Cluster Mask Generation", unit="FOVs") as pbar:
for fov in fovs:
pbar.set_postfix(FOV=fov)
# generate the cell mask for the FOV
cell_mask: np.ndarray = generate_cluster_mask(
fov=fov, seg_dir=seg_dir, cmd=cmd, seg_suffix=seg_suffix
)
# save the cell mask generated
save_fov_mask(
fov,
data_dir=save_dir,
mask_data=cell_mask,
sub_dir=sub_dir,
name_suffix=name_suffix,
)
pbar.update(1)
[docs]def generate_pixel_cluster_mask(fov, base_dir, tiff_dir, chan_file_path,
pixel_data_dir, cluster_mapping,
pixel_cluster_col='pixel_meta_cluster'):
"""For a fov, create a mask labeling each pixel with their SOM or meta cluster label
Args:
fov (list):
The fov to relabel
base_dir (str):
The path to the data directory
tiff_dir (str):
The path to the tiff data
chan_file_path (str):
The path to the sample channel file to load (`tiff_dir` as root).
Used to determine dimensions of the pixel mask.
pixel_data_dir (str):
The path to the data with full pixel data.
This data should also have the SOM and meta cluster labels appended.
cluster_mapping (pd.DataFrame)
Dataframe detailing which meta_cluster IDs map to which cluster_id
pixel_cluster_col (str):
Whether to assign SOM or meta clusters
needs to be `'pixel_som_cluster'` or `'pixel_meta_cluster'`
Returns:
numpy.ndarray:
The image overlaid with pixel cluster labels
"""
# path checking
io_utils.validate_paths([tiff_dir, os.path.join(tiff_dir, chan_file_path),
os.path.join(base_dir, pixel_data_dir)])
# verify the pixel_cluster_col provided is valid
misc_utils.verify_in_list(
provided_cluster_col=[pixel_cluster_col],
valid_cluster_cols=['pixel_som_cluster', 'pixel_meta_cluster']
)
# verify the fov is valid
misc_utils.verify_in_list(
provided_fov_file=[fov + '.feather'],
consensus_fov_files=os.listdir(os.path.join(base_dir, pixel_data_dir))
)
# read the sample channel file to determine size of pixel cluster mask
channel_data = np.squeeze(io.imread(os.path.join(tiff_dir, chan_file_path)))
# define an array to hold the overlays for the fov
# use int16 to allow for Photoshop loading
img_data = np.zeros((channel_data.shape[0], channel_data.shape[1]), dtype='int16')
fov_data = feather.read_dataframe(
os.path.join(base_dir, pixel_data_dir, fov + '.feather')
)
# ensure integer display and not float
fov_data[pixel_cluster_col] = fov_data[pixel_cluster_col].astype(int)
# get the pixel coordinates
x_coords = fov_data['row_index'].values
y_coords = fov_data['column_index'].values
# convert to 1D indexing
coordinates = x_coords * img_data.shape[1] + y_coords
# get the corresponding cluster labels for each pixel
cluster_labels = list(fov_data[pixel_cluster_col])
# relabel meta_cluster numbers with cluster_id
cluster_mapping = cluster_mapping.drop_duplicates()[[pixel_cluster_col, 'cluster_id']]
id_mapping = dict(zip(cluster_mapping[pixel_cluster_col], cluster_mapping['cluster_id']))
cluster_labels = [id_mapping[label] for label in cluster_labels]
# assign each coordinate in pixel_cluster_mask to its respective cluster label
img_subset = img_data.ravel()
img_subset[coordinates] = cluster_labels
img_data = img_subset.reshape(img_data.shape)
return img_data
[docs]def generate_and_save_pixel_cluster_masks(fovs: List[str],
base_dir: Union[pathlib.Path, str],
save_dir: Union[pathlib.Path, str],
tiff_dir: Union[pathlib.Path, str],
chan_file: Union[pathlib.Path, str],
pixel_data_dir: Union[pathlib.Path, str],
cluster_id_to_name_path: Union[pathlib.Path, str],
pixel_cluster_col: str = 'pixel_meta_cluster',
sub_dir: str = None,
name_suffix: str = ''):
"""Generates pixel cluster masks and saves them for downstream analysis.
Args:
fovs (List[str]):
A list of fovs to generate and save pixel masks for.
base_dir (Union[pathlib.Path, str]):
The path to the data directory.
save_dir (Union[pathlib.Path, str]):
The directory to save the generated pixel cluster masks.
tiff_dir (Union[pathlib.Path, str]):
The path to the directory with the tiff data.
chan_file (Union[pathlib.Path, str]):
The path to the channel file inside each FOV folder (FOV folder as root).
Used to determine dimensions of the pixel mask.
pixel_data_dir (Union[pathlib.Path, str]):
The path to the data with full pixel data.
This data should also have the SOM and meta cluster labels appended.
cluster_id_to_name_path (Union[str, pathlib.Path]): A path to a CSV identifying the
pixel cluster to manually-defined name mapping this is output by the remapping
visualization found in `metacluster_remap_gui`.
pixel_cluster_col (str, optional):
The path to the data with full pixel data.
This data should also have the SOM and meta cluster labels appended.
Defaults to 'pixel_meta_cluster'.
sub_dir (str, optional):
The subdirectory to save the images in. If specified images are saved to
`"data_dir/sub_dir"`. If `sub_dir = None` the images are saved to `"data_dir"`.
Defaults to `None`.
name_suffix (str, optional):
Specify what to append at the end of every pixel mask. Defaults to `''`.
"""
# read in gui cluster mapping file and save cluster_id created in generate_pixel_cluster_mask
gui_map = pd.read_csv(cluster_id_to_name_path)
cluster_map = gui_map.copy()[[pixel_cluster_col]]
cluster_map = cluster_map.drop_duplicates().sort_values(by=[pixel_cluster_col])
cluster_map["cluster_id"] = list(range(1, len(cluster_map) + 1))
# drop the cluster_id column from gui_map if it already exists, otherwise do nothing
gui_map = gui_map.drop(columns="cluster_id", errors="ignore")
# add a cluster_id column corresponding to the new mask integers
updated_cluster_map = gui_map.merge(cluster_map, on=[pixel_cluster_col], how="left")
updated_cluster_map.to_csv(cluster_id_to_name_path, index=False)
# create the pixel cluster masks across each fov
with tqdm(total=len(fovs), desc="Pixel Cluster Mask Generation", unit="FOVs") \
as pixel_mask_progress:
for fov in fovs:
pixel_mask_progress.set_postfix(FOV=fov)
# define the path to provided channel file in the fov dir, used to calculate dimensions
chan_file_path = os.path.join(fov, chan_file)
# generate the pixel mask for the FOV
pixel_mask: np.ndarray =\
generate_pixel_cluster_mask(fov=fov, base_dir=base_dir, tiff_dir=tiff_dir,
chan_file_path=chan_file_path,
pixel_data_dir=pixel_data_dir,
pixel_cluster_col=pixel_cluster_col,
cluster_mapping=updated_cluster_map)
# save the pixel mask generated
save_fov_mask(fov, data_dir=save_dir, mask_data=pixel_mask, sub_dir=sub_dir,
name_suffix=name_suffix)
pixel_mask_progress.update(1)
[docs]def generate_and_save_neighborhood_cluster_masks(
fovs: List[str],
save_dir: Union[pathlib.Path, str],
seg_dir: Union[pathlib.Path, str],
neighborhood_data: pd.DataFrame,
fov_col: str = settings.FOV_ID,
label_col: str = settings.CELL_LABEL,
cluster_col: str = settings.KMEANS_CLUSTER,
seg_suffix: str = "_whole_cell.tiff",
xr_channel_name="label",
sub_dir: Union[pathlib.Path, str] = None,
name_suffix: str = "",
):
"""Generates neighborhood cluster masks and saves them for downstream analysis.
Args:
fovs (List[str]):
A list of fovs to generate and save neighborhood masks for.
save_dir (Union[pathlib.Path, str]):
The directory to save the generated pixel cluster masks.
seg_dir (Union[pathlib.Path, str]):
The path to the segmentation data.
neighborhood_data (pd.DataFrame):
Contains the neighborhood cluster assignments for each cell.
fov_col (str, optional):
The column name containing the FOV IDs . Defaults to `settings.FOV_ID` (`"fov"`).
label_col (str, optional):
The column name containing the cell label. Defaults to `settings.CELL_LABEL`
(`"label"`).
cluster_col (str, optional):
The column name containing the cluster label. Defaults to `settings.KMEANS_CLUSTER`
(`"kmeans_neighborhood"`).
seg_suffix (str, optional):
The suffix that the segmentation images use. Defaults to `'_whole_cell.tiff'`
xr_channel_name (str):
Channel name for segmented data array.
sub_dir (str, optional):
The subdirectory to save the images in. If specified images are saved to
`"data_dir/sub_dir"`. If `sub_dir = None` the images are saved to `"data_dir"`.
Defaults to `None`.
name_suffix (str, optional):
Specify what to append at the end of every pixel mask. Defaults to `''`.
"""
cmd = ClusterMaskData(
data=neighborhood_data,
fov_col=fov_col,
label_col=label_col,
cluster_col=cluster_col,
)
# create the neighborhood cluster masks across each fov
with tqdm(total=len(fovs), desc="Neighborhood Cluster Mask Generation", unit="FOVs") \
as neigh_mask_progress:
# generate the mask for each FOV
for fov in fovs:
neigh_mask_progress.set_postfix(FOV=fov)
# load in the label map for the FOV
label_map = load_utils.load_imgs_from_dir(
seg_dir,
files=[fov + seg_suffix],
xr_channel_names=[xr_channel_name],
trim_suffix=seg_suffix.split(".")[0],
).loc[fov, ..., :]
# generate the neighborhood mask for the FOV
neighborhood_mask: np.ndarray = label_cells_by_cluster(fov, cmd, label_map)
# save the neighborhood mask generated
save_fov_mask(
fov,
data_dir=save_dir,
mask_data=neighborhood_mask,
sub_dir=sub_dir,
name_suffix=name_suffix,
)
neigh_mask_progress.update(1)
[docs]def split_img_stack(stack_dir, output_dir, stack_list, indices, names, channels_first=True):
"""Splits the channels in a given directory of images into separate files
Images are saved in the output_dir
Args:
stack_dir (str):
where we read the input files
output_dir (str):
where we write the split channel data
stack_list (list):
the names of the files we want to read from stack_dir
indices (list):
the indices we want to pull data from
names (list):
the corresponding names of the channels
channels_first (bool):
whether we index at the beginning or end of the array
"""
for stack_name in stack_list:
img_stack = io.imread(os.path.join(stack_dir, stack_name))
img_dir = os.path.join(output_dir, os.path.splitext(stack_name)[0])
os.makedirs(img_dir)
for i in range(len(indices)):
if channels_first:
channel = img_stack[indices[i], ...]
else:
channel = img_stack[..., indices[i]]
save_path = os.path.join(img_dir, names[i])
image_utils.save_image(save_path, channel)
[docs]def stitch_images_by_shape(data_dir, stitched_dir, img_sub_folder=None, channels=None,
segmentation=False, clustering=False):
""" Creates stitched images for the specified channels based on the FOV folder names
Args:
data_dir (str):
path to directory containing images
stitched_dir (str):
path to directory to save stitched images to
img_sub_folder (str):
optional name of image sub-folder within each fov
channels (list):
optional list of imgs to load, otherwise loads all imgs
segmentation (bool):
if stitching images from the single segmentation dir
clustering (bool or str):
if stitching images from the single pixel or cell mask dir, specify 'pixel' / 'cell'
"""
io_utils.validate_paths(data_dir)
# no img_sub_folder, change to empty string to read directly from base folder
if img_sub_folder in [None, '', ""]:
img_sub_folder = ""
if clustering and clustering not in ['pixel', 'cell']:
raise ValueError('If stitching images from the pixie pipeline, the clustering arg must be '
'set to either \"pixel\" or \"cell\".')
# retrieve valid fov names
if segmentation:
fovs = ns.natsorted(io_utils.list_files(data_dir, substrs='_whole_cell.tiff'))
fovs = io_utils.extract_delimited_names(fovs, delimiter='_whole_cell.tiff')
elif clustering:
fovs = ns.natsorted(io_utils.list_files(data_dir, substrs=f'_{clustering}_mask.tiff'))
fovs = io_utils.extract_delimited_names(fovs, delimiter=f'_{clustering}_mask.tiff')
else:
fovs = ns.natsorted(io_utils.list_folders(data_dir))
# ignore previous toffy stitching in fov directory
if 'stitched_images' in fovs:
fovs.remove('stitched_images')
if len(fovs) == 0:
raise ValueError(f"No FOVs found in directory, {data_dir}.")
# check previous stitching
if os.path.exists(stitched_dir):
raise ValueError(f"The {stitched_dir} directory already exists.")
search_term: str = re.compile(r"(R\+?\d+)(C\+?\d+)")
bad_fov_names = []
for fov in fovs:
res = re.search(search_term, fov)
if res is None:
bad_fov_names.append(fov)
if len(bad_fov_names) > 0:
raise ValueError(f"Invalid FOVs found in directory, {data_dir}. FOV names "
f"{bad_fov_names} should have the form RnCm.")
# retrieve all extracted channel names and verify list if provided
if not segmentation and not clustering:
channel_imgs = io_utils.list_files(
dir_name=os.path.join(data_dir, fovs[0], img_sub_folder),
substrs=EXTENSION_TYPES["IMAGE"])
else:
channel_imgs = io_utils.list_files(data_dir, substrs=fovs[0]+'_')
channel_imgs = [chan.split(fovs[0] + '_')[1] for chan in channel_imgs]
if channels is None:
channels = io_utils.remove_file_extensions(channel_imgs)
else:
misc_utils.verify_in_list(channel_inputs=channels,
valid_channels=io_utils.remove_file_extensions(channel_imgs))
file_ext = os.path.splitext(channel_imgs[0])[1]
expected_tiles = load_utils.get_tiled_fov_names(fovs, return_dims=True)
# save new images to the stitched_images, one channel at a time
for chan, tile in itertools.product(channels, expected_tiles):
prefix, expected_fovs, num_rows, num_cols = tile
if prefix == "":
prefix = "unnamed_tile"
stitched_subdir = os.path.join(stitched_dir, prefix)
if not os.path.exists(stitched_subdir):
os.makedirs(stitched_subdir)
image_data = load_utils.load_tiled_img_data(data_dir, fovs, expected_fovs, chan,
single_dir=any([segmentation, clustering]),
file_ext=file_ext[1:],
img_sub_folder=img_sub_folder)
stitched_data = data_utils.stitch_images(image_data, num_cols)
current_img = stitched_data.loc['stitched_image', :, :, chan].values
image_utils.save_image(os.path.join(stitched_subdir, chan + '_stitched' + file_ext),
current_img)
def _convert_ct_fov_to_adata(fov_group: DataFrameGroupBy, var_names: list[str], obs_names: list[str], save_dir: os.PathLike) -> str:
"""Converts the cell table for a single FOV to an `AnnData` object and saves it to disk as a
`Zarr` store.
Parameters
----------
fov_group : DataFrameGroupBy
The cell table subset on a single FOV.
var_names: list[str]
The marker names to extract from the cell table.
obs_names: list[str]
The cell-level measurements and properties to extract from the cell table.
save_dir: os.PathLike
The directory to save the `AnnData` object to.
Returns
-------
str
The path of the saved `AnnData` object.
"""
fov_pd: pd.DataFrame = fov_group.sort_values(by=settings.CELL_LABEL, key=ns.natsort_key).reset_index()
fov_id: str = fov_pd[settings.FOV_ID].iloc[0]
# Set the index to be the FOV and the segmentation label to create a unique index
fov_pd.index = list(map(lambda label: f"{fov_id}_{int(label)}", fov_pd[settings.CELL_LABEL]))
# Extract the X matrix
X_dd: pd.DataFrame = fov_pd[var_names]
# Extract the obs dataframe and convert the cell label to integer
obs_pd: pd.DataFrame = fov_pd[obs_names].astype({settings.CELL_LABEL: int, settings.FOV_ID: str})
obs_pd["cell_meta_cluster"] = pd.Categorical(obs_pd["cell_meta_cluster"].astype(str))
# Move centroids from obs to obsm["spatial"]
obsm_pd = obs_pd[[settings.CENTROID_0, settings.CENTROID_1]].rename(columns={settings.CENTROID_0: "centroid_y", settings.CENTROID_1: "centroid_x"})
obs_pd = obs_pd.drop(columns=[settings.CENTROID_0, settings.CENTROID_1])
# Create the AnnData object
adata: AnnData = AnnData(X=X_dd, obs=obs_pd, obsm={"spatial": obsm_pd})
# Convert any extra string labels to categorical if it's beneficial.
adata.strings_to_categoricals()
adata.write_zarr(pathlib.Path(save_dir, f"{fov_id}.zarr"), chunks=(1000, 1000))
return pathlib.Path(save_dir, f"{fov_id}.zarr").as_posix()
[docs]class ConvertToAnnData:
""" A class which converts the Cell Table `.csv` file to a series of `AnnData` objects,
one object per FOV.
The default parameters stored in the `.obs` slot include:
- `area`
- `cell_meta_cluster`
- `centroid_dif`
- `convex_area`
- `convex_hull_resid`
- `cell_meta_cluster`
- `eccentricity`
- `fov`
- `major_axis_equiv_diam_ratio`
Visit the Data Types document to see the full list of parameters.
The default parameters stored in the `.obs` slot include:
- `centroid_x`
- `centroid_y`
Args:
cell_table_path (os.PathLike): The path to the cell table.
markers (list[str], "auto"): The markers to extract and store in `.X`. Defaults to "auto",
which will extract all markers.
extra_obs_parameters (list[str], optional): Extra parameters to load in `.obs`. Defaults
to None.
"""
def __init__(self, cell_table_path: os.PathLike,
markers: Union[list[str], Literal["auto"]] = "auto",
extra_obs_parameters: list[str] = None) -> None:
io_utils.validate_paths(paths=cell_table_path)
# Read in the cell table
cell_table: pd.DataFrame = pd.read_csv(cell_table_path)
ct_columns = cell_table.columns
# Get the marker column indices
marker_index_start: int = ct_columns.get_loc(settings.PRE_CHANNEL_COL) + 1
marker_index_stop: int = ct_columns.get_loc(settings.POST_CHANNEL_COL)
obs_index_start: int = ct_columns.get_loc(settings.POST_CHANNEL_COL) + 1
if markers == "auto":
# Default to all markers based on settings Pre and Post channel column values
markers: list[str] = ct_columns[marker_index_start:marker_index_stop].to_list()
else:
# Verify that the correct markers exist
misc_utils.verify_in_list(requested_markers=markers,
all_markers=ct_columns[marker_index_start:marker_index_stop].to_list())
self.var_names = markers
# Verify extra obs parameters
if extra_obs_parameters:
misc_utils.verify_in_list(requested_parameters=extra_obs_parameters,
all_parameters=ct_columns[obs_index_start:].to_list())
else:
extra_obs_parameters = []
obs_names = [
settings.CELL_LABEL,
settings.CELL_SIZE,
*ct_columns[obs_index_start:].to_list(),
*extra_obs_parameters
]
# Use "area" as the default area id instead of settings.CELL_SIZE to account for
# non-cellular observations (ez_seg, fiber, etc...)
if settings.CELL_SIZE in obs_names:
obs_names.remove(settings.CELL_SIZE)
if "area" not in obs_names:
cell_table = cell_table.rename(columns={settings.CELL_SIZE: "area"})
obs_names.append("area")
self.obs_names: list[str] = obs_names
self.cell_table = cell_table
[docs] def convert_to_adata(
self,
save_dir: os.PathLike,
) -> dict[str, str]:
"""Converts the cell table to a FOV-level `AnnData` object, and saves the results as
a `Zarr` store to disk in the `save_dir`.
Args:
save_dir (os.PathLike): The directory to save the `AnnData` objects to.
Returns:
dict[str, str]: A dictionary containing the names of the FOVs and the paths where
they were saved.
"""
if not isinstance(save_dir, pathlib.Path):
save_dir = pathlib.Path(save_dir)
if not save_dir.exists():
save_dir.mkdir(parents=True, exist_ok=True)
n_unique_fovs = self.cell_table[settings.FOV_ID].nunique()
tqdm.pandas(desc="Converting Cell Table to AnnData Tables", total=n_unique_fovs, unit="FOVs")
result: pd.Series = self.cell_table.groupby(by=settings.FOV_ID, sort=True).progress_apply(
lambda x: _convert_ct_fov_to_adata(
x, var_names=self.var_names, obs_names=self.obs_names, save_dir=save_dir
),
)
return result.to_dict()
[docs]class AnnCollectionKwargs(TypedDict):
join_obs: Optional[Literal["inner", "outer"]]
join_obsm: Optional[Literal["inner"]]
join_vars: Optional[Literal["inner"]]
label: Optional[str]
keys: Optional[Sequence[str]]
index_unique: Optional[str]
convert: Optional[ConvertType]
harmonize_dtypes: bool
indices_strict: bool
[docs]def load_anndatas(anndata_dir: os.PathLike, **anncollection_kwargs: Unpack[AnnCollectionKwargs]) -> AnnCollection:
"""Lazily loads a directory of `AnnData` objects into an `AnnCollection`. The concatination happens across the `.obs` axis.
For `AnnCollection` kwargs, see https://anndata.readthedocs.io/en/latest/generated/anndata.experimental.AnnCollection.html
Args:
anndata_dir (os.PathLike): The directory containing the `AnnData` objects.
Returns:
AnnCollection: The `AnnCollection` containing the `AnnData` objects.
"""
if not isinstance(anndata_dir, pathlib.Path):
anndata_dir = pathlib.Path(anndata_dir)
adata_zarr_stores = {f.stem: read_zarr(f) for f in ns.natsorted(anndata_dir.glob("*.zarr"))}
return AnnCollection(adatas=adata_zarr_stores, **anncollection_kwargs)