Fiji Streaming System
Overview
Pipeline visualization with Fiji/ImageJ requires real-time data streaming to external processes without blocking pipeline execution. The Fiji streaming system provides automatic hyperstack creation and PyImageJ integration for leveraging the ImageJ/Fiji ecosystem while maintaining OpenHCS’s performance characteristics.
The Fiji Integration Challenge: ImageJ/Fiji uses a different dimensional model (CZT: Channels, Z-slices, Time-frames) than OpenHCS’s component-based system. Additionally, hyperstack building is computationally expensive (~2 seconds per stack), which could block pipeline execution if not handled properly.
The OpenHCS Solution: A thin-wrapper architecture where OpenHCS composes
external streaming infrastructure. Generic transport is provided by
zmqruntime, payload semantics and receiver-core utilities are provided by
polystore, and OpenHCS provides runtime wiring and process management.
Canonical Abstraction Docs
For generic abstraction internals, see external module docs:
external/zmqruntime/docs/source/architecture/viewer_streaming_architecture.rstexternal/PolyStore/docs/source/architecture/streaming_receiver_projection.rst
This page focuses on OpenHCS-specific viewer wrapper behavior.
Key Innovation: Shared memory IPC with proper lifecycle management ensures zero-copy data transfer while preventing memory leaks. The publisher closes handles after successful sends, while the receiver unlinks shared memory after copying data.
Architecture Components
FijiStreamingBackend
Location: external/PolyStore/src/polystore/fiji_stream.py
The streaming backend sends image data to Fiji viewers using ZeroMQ publish/subscribe pattern:
class FijiStreamingBackend(StreamingBackend):
"""Fiji streaming backend with ZMQ publisher pattern."""
def save_batch(self, data_list, file_paths, **kwargs):
# Create shared memory for zero-copy transfer
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shm_array = np.ndarray(shape, dtype, buffer=shm.buf)
shm_array[:] = data[:]
# Send metadata + shared memory name (not data)
message = {
'images': [{
'shm_name': shm.name,
'shape': data.shape,
'dtype': str(data.dtype),
'component_metadata': metadata
}],
'display_config': config
}
publisher.send_json(message, flags=zmq.NOBLOCK)
# Clean up publisher's handle after successful send
shm.close() # Receiver will unlink after copying
Key Features:
Zero-copy transfer: Shared memory IPC for large image arrays
Non-blocking sends:
zmq.NOBLOCKflag prevents pipeline blockingHigh water mark: Increased to 10000 to buffer slow hyperstack building
Proper cleanup: Publisher closes handles after send, receiver unlinks after copy
FijiViewerServer
Location: openhcs/runtime/fiji_viewer_server.py (OpenHCS wrapper)
The viewer server receives images and displays them via PyImageJ, inheriting from ZMQServer ABC:
class FijiViewerServer(ZMQServer):
"""ZMQ server for Fiji viewer with PyImageJ integration."""
def start(self):
# Initialize PyImageJ in this process
import imagej
self.ij = imagej.init(mode='interactive')
self.ij.ui().showUI()
def process_image_message(self, message):
# Attach to shared memory
shm = shared_memory.SharedMemory(name=shm_name)
data = np.ndarray(shape, dtype, buffer=shm.buf).copy()
shm.close()
shm.unlink() # Clean up shared memory
# Build hyperstack from component metadata
hyperstack = self._build_hyperstack(images, metadata)
self.ij.ui().show(hyperstack)
Key Features:
PyImageJ integration: Uses
imagej.init()for native Fiji functionalityHyperstack building: Automatic CZT dimension mapping from component metadata
Dual-channel ZMQ: Control channel (REQ/REP) + data channel (PUB/SUB)
Ping/pong handshake: Inherited from
ZMQServerABC for connection verification
FijiStreamVisualizer
Location: openhcs/runtime/fiji_stream_visualizer.py (OpenHCS wrapper)
Manages Fiji viewer process lifecycle, following the same architecture as NapariStreamVisualizer:
class FijiStreamVisualizer:
"""Manages Fiji viewer instance for real-time visualization."""
def start_viewer(self, async_mode: bool = False):
# Check for existing viewer on same port
if self._try_connect_to_existing_viewer():
return
# Spawn new viewer process
if self.persistent:
# Detached subprocess survives parent termination
self.process = _spawn_detached_fiji_process(port, title, config)
else:
# Multiprocessing.Process for test cleanup
self.process = multiprocessing.Process(
target=_fiji_viewer_server_process,
args=(port, title, config, log_file)
)
self.process.start()
# Wait for ping/pong handshake
if not async_mode:
self._wait_for_server_ready()
Key Features:
Persistent viewers: Detached subprocesses survive parent termination
Viewer reuse: Connects to existing viewers before spawning new ones
Async startup: Background thread startup for non-blocking initialization
Process management: Graceful shutdown with timeout and force-kill fallback
Hyperstack Building
Component to CZT Mapping
OpenHCS uses component-based dimensions (well, site, channel, z_index, timepoint), while Fiji uses CZT (Channels, Z-slices, Time-frames). The system automatically maps between these:
# Component metadata from OpenHCS
metadata = {
'well': 'A01',
'site': 's1',
'channel': 'DAPI',
'z_index': 'z003',
'timepoint': 't001'
}
# Automatic CZT mapping based on FijiDisplayConfig
config = FijiDisplayConfig(
channel_mode=FijiDimensionMode.SLICE, # C dimension
z_index_mode=FijiDimensionMode.STACK, # Z dimension
timepoint_mode=FijiDimensionMode.STACK # T dimension
)
# Result: Hyperstack with nChannels=1, nSlices=N, nFrames=M
Dimension Modes:
STACK: Component becomes a hyperstack dimension (C, Z, or T)SLICE: Component values shown as separate slicesSEPARATE: Component values shown as separate images
Hyperstack Creation
The server groups images by (step_name, well) and builds hyperstacks:
def _build_hyperstack(self, images, display_config):
# Group images by CZT coordinates
czt_map = {}
for img in images:
c = self._get_channel_index(img['metadata'])
z = self._get_z_index(img['metadata'])
t = self._get_timepoint_index(img['metadata'])
czt_map[(c, z, t)] = img['data']
# Create ImageJ ImageStack
stack = self.ij.py.to_java(first_image)
for (c, z, t), data in sorted(czt_map.items()):
stack.addSlice(self.ij.py.to_java(data))
# Convert to hyperstack
imp = ImagePlus(title, stack)
imp.setDimensions(nChannels, nSlices, nFrames)
# Apply LUT and display mode
if nChannels > 1:
comp = CompositeImage(imp, CompositeImage.COMPOSITE)
return comp
return imp
Performance Note: Hyperstack building takes ~2 seconds per stack due to ImageJ’s internal processing. The non-blocking send pattern ensures this doesn’t block the pipeline.
ZeroMQ Communication Pattern
Dual-Channel Architecture
Fiji streaming uses the same dual-channel pattern as Napari and OMERO integrations:
Control Channel (REQ/REP):
- Port: fiji_port + 1000 (e.g., 6556 for data port 5556)
- Purpose: Ping/pong handshake, status queries
- Pattern: Synchronous request/response
Data Channel (PUB/SUB):
- Port: fiji_port (e.g., 5556)
- Purpose: Image streaming
- Pattern: Asynchronous publish/subscribe
Benefits:
Control messages don’t block data streaming
Reliable handshake before data transfer
Independent scaling of control and data throughput
Non-Blocking Sends
The publisher uses zmq.NOBLOCK to prevent pipeline blocking:
try:
publisher.send_json(message, flags=zmq.NOBLOCK)
# Clean up shared memory handles after successful send
for img in batch_images:
shm = self._shared_memory_blocks.pop(img['shm_name'])
shm.close()
except zmq.Again:
# Fiji viewer busy, drop batch and clean up
logger.warning(f"Fiji viewer busy, dropped batch")
for img in batch_images:
shm = self._shared_memory_blocks.pop(img['shm_name'])
shm.close()
shm.unlink() # Unlink since receiver never got it
Why This Works: If the receiver’s buffer is full (zmq.Again exception), the publisher drops the batch and cleans up shared memory. This prevents pipeline blocking while ensuring no memory leaks.
Configuration System
The Fiji integration uses OpenHCS’s lazy configuration framework with placeholder inheritance:
@dataclass(frozen=True)
class FijiStreamingConfig(StreamingConfig, FijiDisplayConfig):
fiji_port: int = 5556
fiji_host: str = 'localhost'
# Inherited from FijiDisplayConfig:
lut: FijiLUT = FijiLUT.GRAYS
channel_mode: FijiDimensionMode = SLICE
z_index_mode: FijiDimensionMode = STACK
timepoint_mode: FijiDimensionMode = STACK
site_mode: FijiDimensionMode = SEPARATE
Lazy Configuration: Steps can use LazyFijiStreamingConfig() to inherit from pipeline-level defaults, enabling centralized configuration with per-step overrides.
See Also
External Integrations Overview - Overview of all OpenHCS integrations
Napari Streaming System - Napari streaming (similar architecture)
Fiji Viewer Management - Fiji viewer management guide