Fiji Streaming System

Overview

Pipeline visualization with Fiji/ImageJ requires real-time data streaming to external processes without blocking pipeline execution. The Fiji streaming system provides automatic hyperstack creation and PyImageJ integration for leveraging the ImageJ/Fiji ecosystem while maintaining OpenHCS’s performance characteristics.

The Fiji Integration Challenge: ImageJ/Fiji uses a different dimensional model (CZT: Channels, Z-slices, Time-frames) than OpenHCS’s component-based system. Additionally, hyperstack building is computationally expensive (~2 seconds per stack), which could block pipeline execution if not handled properly.

The OpenHCS Solution: A thin-wrapper architecture where OpenHCS composes external streaming infrastructure. Generic transport is provided by zmqruntime, payload semantics and receiver-core utilities are provided by polystore, and OpenHCS provides runtime wiring and process management.

Canonical Abstraction Docs

For generic abstraction internals, see external module docs:

  • external/zmqruntime/docs/source/architecture/viewer_streaming_architecture.rst

  • external/PolyStore/docs/source/architecture/streaming_receiver_projection.rst

This page focuses on OpenHCS-specific viewer wrapper behavior.

Key Innovation: Shared memory IPC with proper lifecycle management ensures zero-copy data transfer while preventing memory leaks. The publisher closes handles after successful sends, while the receiver unlinks shared memory after copying data.

Architecture Components

FijiStreamingBackend

Location: external/PolyStore/src/polystore/fiji_stream.py

The streaming backend sends image data to Fiji viewers using ZeroMQ publish/subscribe pattern:

class FijiStreamingBackend(StreamingBackend):
    """Fiji streaming backend with ZMQ publisher pattern."""

    def save_batch(self, data_list, file_paths, **kwargs):
        # Create shared memory for zero-copy transfer
        shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
        shm_array = np.ndarray(shape, dtype, buffer=shm.buf)
        shm_array[:] = data[:]

        # Send metadata + shared memory name (not data)
        message = {
            'images': [{
                'shm_name': shm.name,
                'shape': data.shape,
                'dtype': str(data.dtype),
                'component_metadata': metadata
            }],
            'display_config': config
        }
        publisher.send_json(message, flags=zmq.NOBLOCK)

        # Clean up publisher's handle after successful send
        shm.close()  # Receiver will unlink after copying

Key Features:

  • Zero-copy transfer: Shared memory IPC for large image arrays

  • Non-blocking sends: zmq.NOBLOCK flag prevents pipeline blocking

  • High water mark: Increased to 10000 to buffer slow hyperstack building

  • Proper cleanup: Publisher closes handles after send, receiver unlinks after copy

FijiViewerServer

Location: openhcs/runtime/fiji_viewer_server.py (OpenHCS wrapper)

The viewer server receives images and displays them via PyImageJ, inheriting from ZMQServer ABC:

class FijiViewerServer(ZMQServer):
    """ZMQ server for Fiji viewer with PyImageJ integration."""

    def start(self):
        # Initialize PyImageJ in this process
        import imagej
        self.ij = imagej.init(mode='interactive')
        self.ij.ui().showUI()

    def process_image_message(self, message):
        # Attach to shared memory
        shm = shared_memory.SharedMemory(name=shm_name)
        data = np.ndarray(shape, dtype, buffer=shm.buf).copy()
        shm.close()
        shm.unlink()  # Clean up shared memory

        # Build hyperstack from component metadata
        hyperstack = self._build_hyperstack(images, metadata)
        self.ij.ui().show(hyperstack)

Key Features:

  • PyImageJ integration: Uses imagej.init() for native Fiji functionality

  • Hyperstack building: Automatic CZT dimension mapping from component metadata

  • Dual-channel ZMQ: Control channel (REQ/REP) + data channel (PUB/SUB)

  • Ping/pong handshake: Inherited from ZMQServer ABC for connection verification

FijiStreamVisualizer

Location: openhcs/runtime/fiji_stream_visualizer.py (OpenHCS wrapper)

Manages Fiji viewer process lifecycle, following the same architecture as NapariStreamVisualizer:

class FijiStreamVisualizer:
    """Manages Fiji viewer instance for real-time visualization."""

    def start_viewer(self, async_mode: bool = False):
        # Check for existing viewer on same port
        if self._try_connect_to_existing_viewer():
            return

        # Spawn new viewer process
        if self.persistent:
            # Detached subprocess survives parent termination
            self.process = _spawn_detached_fiji_process(port, title, config)
        else:
            # Multiprocessing.Process for test cleanup
            self.process = multiprocessing.Process(
                target=_fiji_viewer_server_process,
                args=(port, title, config, log_file)
            )
            self.process.start()

        # Wait for ping/pong handshake
        if not async_mode:
            self._wait_for_server_ready()

Key Features:

  • Persistent viewers: Detached subprocesses survive parent termination

  • Viewer reuse: Connects to existing viewers before spawning new ones

  • Async startup: Background thread startup for non-blocking initialization

  • Process management: Graceful shutdown with timeout and force-kill fallback

Shared Memory Lifecycle

The shared memory lifecycle is critical for preventing memory leaks while maintaining zero-copy performance:

Publisher (FijiStreamingBackend):
1. Create shared memory block
2. Copy data into shared memory
3. Send message with shared memory name
4. Close handle (but don't unlink)
5. Remove from tracking dict

Receiver (FijiViewerServer):
1. Receive message with shared memory name
2. Attach to shared memory
3. Copy data from shared memory
4. Close handle
5. Unlink shared memory (cleanup)

Why This Works: The publisher closes its handle immediately after sending, preventing handle accumulation. The receiver unlinks the shared memory after copying, ensuring cleanup even if the publisher crashes.

Previous Bug: The publisher never closed handles after successful sends, causing the _shared_memory_blocks dict to grow indefinitely with stale references. This was fixed in commit 6bd32f1.

Hyperstack Building

Component to CZT Mapping

OpenHCS uses component-based dimensions (well, site, channel, z_index, timepoint), while Fiji uses CZT (Channels, Z-slices, Time-frames). The system automatically maps between these:

# Component metadata from OpenHCS
metadata = {
    'well': 'A01',
    'site': 's1',
    'channel': 'DAPI',
    'z_index': 'z003',
    'timepoint': 't001'
}

# Automatic CZT mapping based on FijiDisplayConfig
config = FijiDisplayConfig(
    channel_mode=FijiDimensionMode.SLICE,  # C dimension
    z_index_mode=FijiDimensionMode.STACK,  # Z dimension
    timepoint_mode=FijiDimensionMode.STACK # T dimension
)

# Result: Hyperstack with nChannels=1, nSlices=N, nFrames=M

Dimension Modes:

  • STACK: Component becomes a hyperstack dimension (C, Z, or T)

  • SLICE: Component values shown as separate slices

  • SEPARATE: Component values shown as separate images

Hyperstack Creation

The server groups images by (step_name, well) and builds hyperstacks:

def _build_hyperstack(self, images, display_config):
    # Group images by CZT coordinates
    czt_map = {}
    for img in images:
        c = self._get_channel_index(img['metadata'])
        z = self._get_z_index(img['metadata'])
        t = self._get_timepoint_index(img['metadata'])
        czt_map[(c, z, t)] = img['data']

    # Create ImageJ ImageStack
    stack = self.ij.py.to_java(first_image)
    for (c, z, t), data in sorted(czt_map.items()):
        stack.addSlice(self.ij.py.to_java(data))

    # Convert to hyperstack
    imp = ImagePlus(title, stack)
    imp.setDimensions(nChannels, nSlices, nFrames)

    # Apply LUT and display mode
    if nChannels > 1:
        comp = CompositeImage(imp, CompositeImage.COMPOSITE)
        return comp
    return imp

Performance Note: Hyperstack building takes ~2 seconds per stack due to ImageJ’s internal processing. The non-blocking send pattern ensures this doesn’t block the pipeline.

ZeroMQ Communication Pattern

Dual-Channel Architecture

Fiji streaming uses the same dual-channel pattern as Napari and OMERO integrations:

Control Channel (REQ/REP):
- Port: fiji_port + 1000 (e.g., 6556 for data port 5556)
- Purpose: Ping/pong handshake, status queries
- Pattern: Synchronous request/response

Data Channel (PUB/SUB):
- Port: fiji_port (e.g., 5556)
- Purpose: Image streaming
- Pattern: Asynchronous publish/subscribe

Benefits:

  • Control messages don’t block data streaming

  • Reliable handshake before data transfer

  • Independent scaling of control and data throughput

Non-Blocking Sends

The publisher uses zmq.NOBLOCK to prevent pipeline blocking:

try:
    publisher.send_json(message, flags=zmq.NOBLOCK)
    # Clean up shared memory handles after successful send
    for img in batch_images:
        shm = self._shared_memory_blocks.pop(img['shm_name'])
        shm.close()
except zmq.Again:
    # Fiji viewer busy, drop batch and clean up
    logger.warning(f"Fiji viewer busy, dropped batch")
    for img in batch_images:
        shm = self._shared_memory_blocks.pop(img['shm_name'])
        shm.close()
        shm.unlink()  # Unlink since receiver never got it

Why This Works: If the receiver’s buffer is full (zmq.Again exception), the publisher drops the batch and cleans up shared memory. This prevents pipeline blocking while ensuring no memory leaks.

Configuration System

The Fiji integration uses OpenHCS’s lazy configuration framework with placeholder inheritance:

@dataclass(frozen=True)
class FijiStreamingConfig(StreamingConfig, FijiDisplayConfig):
    fiji_port: int = 5556
    fiji_host: str = 'localhost'

    # Inherited from FijiDisplayConfig:
    lut: FijiLUT = FijiLUT.GRAYS
    channel_mode: FijiDimensionMode = SLICE
    z_index_mode: FijiDimensionMode = STACK
    timepoint_mode: FijiDimensionMode = STACK
    site_mode: FijiDimensionMode = SEPARATE

Lazy Configuration: Steps can use LazyFijiStreamingConfig() to inherit from pipeline-level defaults, enabling centralized configuration with per-step overrides.

See Also