Source code for ark.utils.deepcell_service_utils
import os
import time
import warnings
from io import BytesIO
from json import JSONDecodeError
from pathlib import Path
from urllib.parse import unquote_plus
from zipfile import ZIP_DEFLATED, ZipFile
import numpy as np
import requests
from alpineer import image_utils, io_utils, load_utils, misc_utils
from tifffile import imread
from tqdm.notebook import tqdm
[docs]def zip_input_files(deepcell_input_dir, fov_group, batch_num):
"""Helper function which handles zipping the batch fov images into a single zip file.
Args:
deepcell_input_dir (str): path to where deepcell input image files are stored
fov_group (list): list of fovs to process in this batch
batch_num (int): the batch number
Returns:
str: path to deepcell input zip file
"""
# write all files to the zip file
zip_path = os.path.join(deepcell_input_dir, f'fovs_batch_{batch_num}.zip')
# create zip files, skip any existing
if not os.path.exists(zip_path):
with ZipFile(zip_path, 'w', compression=ZIP_DEFLATED) as zipObj:
for fov in fov_group:
# file has .tiff extension
basename = fov + '.tiff'
filename = os.path.join(deepcell_input_dir, basename)
zipObj.write(filename, basename)
return zip_path
[docs]def extract_deepcell_response(deepcell_output_dir, fov_group, batch_num, wc_suffix, nuc_suffix):
"""Helper function to extract the segmentation masks from the deepcell output zip file.
Args:
deepcell_output_dir (str):
path to where deepcell output zips are stored
fov_group (list):
list of fovs to process in this batch
batch_num (int):
the batch number
wc_suffix (str):
Suffix for whole cell DeepCell output filename. e.g. for fovX, DeepCell output
should be `<fovX>+suffix.tif`.
Whole cell DeepCell files by default get suffixed with `'feature_0'`,
it will be renamed to this arg.
nuc_suffix (str):
Suffix for nuclear DeepCell output filename. e.g. for fovX, DeepCell output
should be `<fovX>+suffix.tif`.
Nuclear DeepCell files by default get suffixed with `'feature_1'`,
it will be renamed to this arg.
"""
# extract the .tif output
batch_zip = os.path.join(
deepcell_output_dir, f"deepcell_response_fovs_batch_{batch_num}.zip")
with ZipFile(batch_zip, "r") as zipObj:
for name in zipObj.namelist():
# this files will only ever be suffixed with feature_0.tiff or feature_1.tiff
if '_feature_0.tif' in name:
resuffixed_name = name.replace('_feature_0', wc_suffix)
else:
resuffixed_name = name.replace('_feature_1', nuc_suffix)
mask_path = os.path.join(deepcell_output_dir, resuffixed_name)
# DeepCell uses .tif extension, append extra f to account for .tiff standard
mask_path += 'f'
# read the file from the .zip file and save as segmentation mask
byte_repr = zipObj.read(name)
ranked_segmentation_mask = (_convert_deepcell_seg_masks(byte_repr)).squeeze()
image_utils.save_image(mask_path, ranked_segmentation_mask)
# verify that all the files were extracted
for fov in fov_group:
if fov + '_feature_0.tif' not in zipObj.namelist():
warnings.warn(f'Deep Cell whole cell output file was not found for {fov}.')
if fov + '_feature_1.tif' not in zipObj.namelist():
warnings.warn(f'Deep Cell nuclear output file was not found for {fov}.')
[docs]def create_deepcell_output(deepcell_input_dir, deepcell_output_dir, fovs=None,
wc_suffix='_whole_cell', nuc_suffix='_nuclear',
host='https://deepcell.org', job_type='mesmer',
scale=1.0, timeout=300, zip_size=5):
"""Handles all of the necessary data manipulation for running deepcell tasks.
Creates .zip files (to be used as input for DeepCell),
calls run_deepcell_task method,
and extracts zipped output files to the specified output location
Args:
deepcell_input_dir (str):
Location of preprocessed files (assume deepcell_input_dir contains <fov>.tiff
for each fov in fovs list). This should not be a GoogleDrivePath.
deepcell_output_dir (str):
Location to save DeepCell output (as .tiff)
fovs (list):
List of fovs in preprocessing pipeline. if None, all .tiff files
in deepcell_input_dir will be considered as input fovs. Default: None
wc_suffix (str):
Suffix for whole cell DeepCell output filename. e.g. for fovX, DeepCell output
should be `<fovX>+suffix.tif`.
Whole cell DeepCell files by default get suffixed with `'feature_0'`,
it will be renamed to this arg.
nuc_suffix (str):
Suffix for nuclear DeepCell output filename. e.g. for fovX, DeepCell output
should be `<fovX>+suffix.tif`.
Nuclear DeepCell files by default get suffixed with `'feature_1'`,
it will be renamed to this arg.
host (str):
Hostname and port for the kiosk-frontend API server
Default: 'https://deepcell.org'
job_type (str):
Name of job workflow (multiplex, segmentation, tracking)
Default: 'multiplex'
scale (float):
Value to rescale data by
Default: 1.0
timeout (int):
Approximate seconds until timeout.
Default: 5 minutes (300)
zip_size (int):
Maximum number of files to include in zip.
Default: 5
Raises:
ValueError:
Raised if there is some fov X (from fovs list) s.t.
the file <deepcell_input_dir>/fovX.tiff does not exist
"""
# check that scale arg can be converted to a float
try:
scale = float(scale)
except ValueError:
raise ValueError("Scale argument must be a number")
# extract all the files from deepcell_input_dir
input_files = io_utils.list_files(deepcell_input_dir, substrs=['.tiff'])
# set fovs equal to input_files it not already set
if fovs is None:
fovs = input_files
# now extract only the names of the fovs without the file extension
fovs = io_utils.remove_file_extensions(fovs)
# make sure that all fovs actually exist in the list of input_files
misc_utils.verify_in_list(
fovs=fovs,
deepcell_input_files=io_utils.remove_file_extensions(input_files))
# partition fovs for smaller zip file batching
fov_groups = [
fovs[zip_size * i:zip_size * (i + 1)]
for i in range(((len(fovs) + zip_size - 1) // zip_size))
]
print(f'Processing tiffs in {len(fov_groups)} batches...')
unprocessed_fovs = {}
for batch_num, fov_group in enumerate(fov_groups, start=1):
# create zipped input files
input_zip_path = zip_input_files(deepcell_input_dir, fov_group, batch_num)
# add timeout limit
batch_filename = Path(input_zip_path).name
output_zip_path = os.path.join(deepcell_output_dir, f"deepcell_response_" + batch_filename)
if os.path.exists(output_zip_path):
print(f"Skipping previously processed batch_{batch_num}.")
# upload to deepcell
total_time, status = 0, 0
start = time.time()
while not os.path.exists(output_zip_path) and total_time < timeout:
# pass the zip file to deepcell.org
status = run_deepcell_direct(
input_zip_path, deepcell_output_dir, host, job_type, scale, timeout
)
# successful deepcell response
if status == 0:
# extract segmentation masks from deepcell output
extract_deepcell_response(deepcell_output_dir, fov_group, batch_num, wc_suffix,
nuc_suffix)
break
total_time = time.time() - start
if status != 0:
unprocessed_fovs[batch_num] = fov_group
if total_time >= timeout:
print(f"This batch exceeded the allotted processing time of {timeout / 60} minutes "
f"and will be skipped.")
if unprocessed_fovs:
print("\nThe following batches were not processed:")
for batch in unprocessed_fovs.keys():
print(f"fovs_batch_{batch} {unprocessed_fovs[batch]}")
[docs]def run_deepcell_direct(input_dir, output_dir, host='https://deepcell.org',
job_type='mesmer', scale=1.0, timeout=300):
"""Uses direct calls to DeepCell API and saves output to output_dir.
Args:
input_dir (str):
location of .zip files
output_dir (str):
location to save deepcell output (as .zip)
host (str):
Hostname and port for the kiosk-frontend API server.
Default: 'https://deepcell.org'
job_type (str):
Name of job workflow (mesmer, segmentation, tracking).
scale (float):
Value to rescale data by
Default: 1.0
timeout (int):
Approximate seconds until timeout.
Default: 5 minutes (300)
"""
# upload zip file
upload_url = host + "/api/upload"
filename = Path(input_dir).name
with open(input_dir, mode='rb') as f:
upload_fields = {
'file': (filename, f.read(), 'application/zip'),
}
f.seek(0)
try:
upload_response = requests.post(
upload_url,
timeout=timeout,
files=upload_fields
)
except (requests.ConnectionError, requests.ReadTimeout) as e:
return 1
# handles the case if the endpoint returns an invalid JSON
# indicating an internal API error
try:
upload_response = upload_response.json()
except JSONDecodeError as jde:
return 1
# call prediction
predict_url = host + '/api/predict'
predict_response = requests.post(
predict_url,
json={
'jobForm': {"scale": scale},
'imageName': filename,
'imageUrl': upload_response['imageURL'],
'jobType': job_type,
'uploadedName': upload_response['uploadedName']
}
).json()
predict_hash = predict_response['hash']
# check redis every 3 seconds
redis_url = host + '/api/redis'
batch_num = (io_utils.remove_file_extensions([filename])[0]).split("_")[-1]
print(f'Segmentation progress for batch_{batch_num}:')
progress_bar = tqdm(total=100,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]')
pbar_last = 0
total_time = 0
redis_response = None
while total_time < timeout:
redis_response = requests.post(
redis_url,
json={
'hash': predict_hash,
'key': ["status", "progress", "output_url", "reason", "failures"]
}
).json()
if redis_response['value'][0] == 'done':
# make sure progress bar shows 100%
pbar_next = int(redis_response['value'][1])
progress_bar.update(max(pbar_next - pbar_last, 0))
break
# update progress bar here
if redis_response['value'][0] == 'waiting':
pbar_next = int(redis_response['value'][1])
if pbar_next > pbar_last:
progress_bar.update(max(pbar_next - pbar_last, 0))
pbar_last = pbar_next
if redis_response['value'][0] not in ['done', 'waiting', 'new']:
print(redis_response['value'])
time.sleep(3.0)
total_time += 3
progress_bar.close()
# print timeout message
if total_time >= timeout:
return 1
# when done, download result or examine errors
if len(redis_response['value'][4]) > 0:
# error happened
print(f"Encountered Failure(s): {unquote_plus(redis_response['value'][4])}")
return 1
deepcell_output = requests.get(redis_response['value'][2], allow_redirects=True)
with open(os.path.join(output_dir, "deepcell_response_" + filename), mode="wb") as f:
f.write(deepcell_output.content)
# being kind and sending an expire signal to deepcell
expire_url = redis_url + '/expire'
requests.post(
expire_url,
json={
'hash': predict_hash,
'expireIn': 90,
}
)
return 0
# TODO: Add metadata for channel name (eliminates need for fixed-order channels)
[docs]def generate_deepcell_input(data_dir, tiff_dir, nuc_channels, mem_channels, fovs,
is_mibitiff=False, img_sub_folder="TIFs", dtype="int16"):
"""Saves nuclear and membrane channels into deepcell input format.
Either nuc_channels or mem_channels should be specified.
Writes summed channel images out as multitiffs (channels first).
Args:
data_dir (str):
location to save deepcell input tifs
tiff_dir (str):
directory containing folders of images, is_mibitiff determines what type
nuc_channels (list):
nuclear channels to be summed over
mem_channels (list):
membrane channels to be summed over
fovs (list):
list of folders to or MIBItiff files to load imgs from
is_mibitiff (bool):
if the images are of type MIBITiff
img_sub_folder (str):
if is_mibitiff is False, define the image subfolder for each fov
ignored if is_mibitiff is True
dtype (str/type):
optional specifier of image type. Overwritten with warning for float images
Raises:
ValueError:
Raised if nuc_channels and mem_channels are both None or empty
"""
# cannot have no nuclear and no membrane channels
if not nuc_channels and not mem_channels:
raise ValueError('Either nuc_channels or mem_channels should be non-empty.')
# define the channels list by combining nuc_channels and mem_channels
channels = (nuc_channels if nuc_channels else []) + (mem_channels if mem_channels else [])
# filter channels for None (just in case)
channels = [channel for channel in channels if channel is not None]
for fov in fovs:
# load the images in the current fov batch
if is_mibitiff:
data_xr = load_utils.load_imgs_from_mibitiff(
tiff_dir, mibitiff_files=[fov], channels=channels
)
else:
data_xr = load_utils.load_imgs_from_tree(
tiff_dir, img_sub_folder=img_sub_folder, fovs=[fov], channels=channels
)
fov_name = data_xr.fovs.values[0]
out = np.zeros((2, data_xr.shape[1], data_xr.shape[2]), dtype=data_xr.dtype)
# sum over channels and add to output
if nuc_channels:
out[0] = np.sum(data_xr.loc[fov_name, :, :, nuc_channels].values, axis=2)
if mem_channels:
out[1] = np.sum(data_xr.loc[fov_name, :, :, mem_channels].values, axis=2)
save_path = os.path.join(data_dir, f"{fov_name}.tiff")
image_utils.save_image(save_path, out)
def _convert_deepcell_seg_masks(seg_mask: bytes) -> np.ndarray:
"""Converts the segmentation masks provided by deepcell from `np.float32` to `inp.nt32`.
Args:
seg_mask (bytes): The output of deep cell's segmentation algorithm as file bytes.
Returns:
np.ndarray: The segmentation masks as `np.int32`
"""
float_mask = imread(BytesIO(seg_mask))
int_mask = float_mask.astype("int32")
return int_mask