Production Examples

This section shows real OpenHCS workflows used in production, demonstrating the full capabilities of the platform for complex bioimage analysis tasks.

All examples are based on actual production scripts generated by the OpenHCS TUI and used for real research projects.

Complete Cell Analysis Pipeline

This example shows a complete workflow for cell counting and neurite tracing analysis, demonstrating:

  • GPU-accelerated preprocessing with PyTorch and CuPy

  • Multi-channel analysis with dict patterns

  • Special I/O for cross-step communication

  • ZARR backend for large dataset handling

  • Production configuration with memory management

from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
from openhcs.core.steps.function_step import FunctionStep
from openhcs.core.config import GlobalPipelineConfig, PathPlanningConfig, VFSConfig
from openhcs.constants.constants import VariableComponents, Backend

# Import analysis functions
from openhcs.processing.backends.analysis.cell_counting_cpu import count_cells_single_channel
from openhcs.processing.backends.analysis.skan_axon_analysis import skan_axon_skeletonize_and_analyze
from openhcs.processing.backends.processors.torch_processor import stack_percentile_normalize
from openhcs.processing.backends.processors.cupy_processor import tophat, create_composite

def create_production_pipeline():
    """Create a production-ready cell analysis pipeline."""

    # Production configuration
    global_config = GlobalPipelineConfig(
        num_workers=5,
        path_planning=PathPlanningConfig(
            output_dir_suffix="_analysis",
            global_output_folder="/data/analysis_results/",
            materialization_results_path="results"
        ),
        vfs=VFSConfig(
            intermediate_backend=Backend.MEMORY,
            materialization_backend=Backend.ZARR
        )
    )

    # Define the analysis pipeline
    steps = [
        # Step 1: GPU-accelerated normalization (PyTorch)
        FunctionStep(
            func=stack_percentile_normalize,
            variable_components=[VariableComponents.CHANNEL],
            percentile_low=1.0,
            percentile_high=99.0,
            clip_negative=True
        ),

        # Step 2: Morphological processing (CuPy)
        FunctionStep(
            func=tophat,
            variable_components=[VariableComponents.CHANNEL],
            footprint_size=15
        ),

        # Step 3: Create composite for visualization
        FunctionStep(
            func=create_composite,
            variable_components=[VariableComponents.CHANNEL],
            weights={'1': 0.7, '2': 0.3},
            normalize=True
        ),

        # Step 4: Multi-channel analysis with special outputs
        FunctionStep(
            func={
                '1': count_cells_single_channel,  # DAPI channel - cell counting
                '2': skan_axon_skeletonize_and_analyze  # GFP channel - neurite tracing
            },
            variable_components=[VariableComponents.CHANNEL],
            # Cell counting parameters (channel 1)
            detection_method=DetectionMethod.BLOB_LOG,
            min_sigma=1.0,
            max_sigma=10.0,
            threshold=0.1,
            # Neurite analysis parameters (channel 2)
            analysis_dimension=AnalysisDimension.TWO_D,
            min_branch_length=10.0,
            summarize=True
        )
    ]

    return global_config, steps

GPU Memory Type Conversion Example

This example demonstrates OpenHCS’s automatic memory type conversion between different GPU frameworks:

from openhcs.core.steps.function_step import FunctionStep
from openhcs.processing.backends.processors.torch_processor import gaussian_filter_torch
from openhcs.processing.backends.processors.cupy_processor import sobel_filter_cupy
from openhcs.processing.backends.analysis.pyclesperanto_registry import threshold_otsu_cle

# Pipeline with automatic GPU memory conversion
gpu_pipeline = [
    # PyTorch processing
    FunctionStep(
        func=gaussian_filter_torch,  # Input: any type → Output: torch tensors
        sigma=2.0
    ),

    # Automatic conversion: torch → cupy (zero-copy on GPU)
    FunctionStep(
        func=sobel_filter_cupy,  # Input: torch → Output: cupy arrays
        axis=0
    ),

    # Automatic conversion: cupy → pyclesperanto (zero-copy on GPU)
    FunctionStep(
        func=threshold_otsu_cle,  # Input: cupy → Output: pyclesperanto arrays
        # Otsu threshold automatically calculated
    )
]

Large Dataset Processing with ZARR

For datasets larger than available RAM, OpenHCS automatically uses ZARR compression:

from openhcs.core.config import ZarrConfig, ZarrCompressor, ZarrChunkStrategy

# Configuration for 100GB+ datasets
large_dataset_config = GlobalPipelineConfig(
    num_workers=8,
    zarr=ZarrConfig(
        compressor=ZarrCompressor.LZ4,  # Fast compression
        chunk_strategy=ZarrChunkStrategy.ADAPTIVE,  # Automatic chunking
        compression_level=1  # Balance speed vs size
    ),
    vfs=VFSConfig(
        intermediate_backend=Backend.MEMORY,
        materialization_backend=Backend.ZARR,
        memory_limit_gb=32  # Automatic ZARR when exceeded
    )
)

TUI-Generated Script Structure

Scripts generated by the OpenHCS TUI follow this production-ready pattern:

#!/usr/bin/env python3
"""
OpenHCS Pipeline Script - Generated from TUI
Generated: 2025-07-21 01:27:14
"""

import sys
from pathlib import Path

# Add OpenHCS to path (for development installations)
sys.path.insert(0, "/path/to/openhcs")

from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
# ... imports for all used functions

def create_pipeline():
    """Create and return the pipeline configuration."""

    # Plate paths (actual data directories)
    plate_paths = ['/path/to/microscopy/data']

    # Production configuration
    global_config = GlobalPipelineConfig(...)

    # Pipeline steps
    steps = [...]

    return plate_paths, steps, global_config

def main():
    """Main execution function."""
    plate_paths, steps, global_config = create_pipeline()

    # Create orchestrator
    orchestrator = PipelineOrchestrator(
        plate_paths=plate_paths,
        steps=steps,
        global_config=global_config
    )

    # Execute pipeline
    orchestrator.run()

if __name__ == "__main__":
    main()

Key Production Features

Self-Contained Scripts: Generated scripts include all necessary imports and can be run independently.

Error Handling: Production scripts include comprehensive error handling and logging.

Resource Management: Automatic GPU memory management and worker process coordination.

Reproducibility: All parameters and configurations are explicitly specified.

Monitoring: Integration with OpenHCS logging system for production monitoring.

Scalability: Designed to handle datasets from MB to 100GB+ with automatic backend selection.

Next Steps