Pipeline Compilation Workflow

OpenHCS uses a sophisticated 5-phase compilation system that transforms pipeline definitions into optimized execution plans. This guide explains how the compilation process works and how it integrates with the overall system.

Compilation Overview

The pipeline compilation follows a multi-pass compiler architecture similar to programming language compilers:

  1. Declaration Phase: Functions declare their contracts via decorators

  2. Compilation Phase: 5-phase compiler builds execution plans

  3. Execution Phase: Stateless execution against immutable contexts

This approach enables powerful optimizations, validation, and resource management before any processing begins.

The 5 Compilation Phases

The PipelineCompiler executes five sequential phases for each well:

# Actual compilation sequence from orchestrator
for well_id in wells_to_process:
    context = self.create_context(well_id)

    # 5-Phase Compilation
    resolved_steps, step_state_map = PipelineCompiler.initialize_step_plans_for_context(
        context,
        pipeline_definition,
        metadata_writer=is_responsible,
        plate_path=self.plate_path,
        steps_already_resolved=False,
    )
    PipelineCompiler.declare_zarr_stores_for_context(
        context, resolved_steps, self
    )
    PipelineCompiler.plan_materialization_flags_for_context(
        context, resolved_steps, self
    )
    PipelineCompiler.validate_memory_contracts_for_context(
        context,
        resolved_steps,
        self,
        step_state_map=step_state_map,
    )
    PipelineCompiler.assign_gpu_resources_for_context(context, resolved_steps, self)

    context.freeze()
    compiled_contexts[well_id] = context

Phase 1: Step Plan Initialization

Purpose: Establishes the data flow topology and I/O paths

Implementation: PipelineCompiler.initialize_step_plans_for_context()

Key Operations: - Creates step plans for each pipeline step - Registers global/orchestrator/step ObjectState scopes and resolves their saved values before metadata injection - Calls PipelinePathPlanner.prepare_pipeline_paths() for path resolution - Returns (resolved_steps, step_state_map) so later phases can access the exact configuration snapshot used during resolution (streaming defaults, visualizer settings, etc.) - Handles special I/O path linking between steps - Sets up chainbreaker status for steps that break the pipeline flow

# Path planning example
steps = [
    FunctionStep(func=normalize_images, name="normalize"),
    FunctionStep(func=segment_cells, name="segment"),
    FunctionStep(func=count_cells, name="count")
]

# Phase 1 creates step plans:
# step_plans = {
#     "normalize_id": {
#         "input_dir": "/data/plate/well_A01",
#         "output_dir": "/memory/normalize_output",
#         "special_inputs": {},
#         "special_outputs": {}
#     },
#     "segment_id": {
#         "input_dir": "/memory/normalize_output",
#         "output_dir": "/memory/segment_output",
#         ...
#     }
# }

Phase 2: ZARR Store Declaration

Purpose: Configures ZARR storage for large datasets

Implementation: PipelineCompiler.declare_zarr_stores_for_context()

Key Operations: - Identifies steps that require ZARR storage (large outputs, final results) - Configures ZARR store parameters (compression, chunking) - Sets up shared ZARR stores across wells for efficiency

# ZARR metadata for large datasets
# Note: This is metadata dict, not the ZarrConfig dataclass
if will_use_zarr:
    step_plan["zarr_config"] = {
        "all_wells": all_wells,
        "needs_initialization": True
    }

Phase 3: Materialization Flag Planning

Purpose: Determines read/write backends for each step

Implementation: PipelineCompiler.plan_materialization_flags_for_context()

Key Operations: - Sets read backends (disk for first step, memory for intermediate steps) - Sets write backends (memory for intermediate, materialization backend for final) - Handles backend compatibility with microscope formats

# Backend selection logic
for i, step in enumerate(pipeline_definition):
    if i == 0:  # First step
        step_plan["read_backend"] = "disk"  # Read from microscope files
    else:
        step_plan["read_backend"] = "memory"  # Read from previous step

    if i == len(pipeline_definition) - 1:  # Last step
        step_plan["write_backend"] = "zarr"  # Final output
    else:
        step_plan["write_backend"] = "memory"  # Intermediate output

Phase 4: Memory Contract Validation

Purpose: Validates memory types and stores function patterns

Implementation: PipelineCompiler.validate_memory_contracts_for_context()

Key Operations: - Validates memory type compatibility between steps - Stores resolved function patterns in step plans - Injects memory type conversion information

# Memory contract validation
memory_types = FuncStepContractValidator.validate_pipeline(steps, context)

# Injects into step plans:
# step_plan.update({
#     "input_memory_type": "numpy",
#     "output_memory_type": "torch",
#     "func": resolved_function_pattern
# })

Phase 5: GPU Resource Assignment

Purpose: Assigns GPU resources and validates GPU memory requirements

Implementation: PipelineCompiler.assign_gpu_resources_for_context()

Key Operations: - Assigns GPU devices to GPU-enabled steps - Validates GPU memory requirements - Sets up GPU resource scheduling

# GPU resource assignment
for step_id, step_plan in context.step_plans.items():
    if step_plan["output_memory_type"] in VALID_GPU_MEMORY_TYPES:
        step_plan["gpu_id"] = assigned_gpu_id
        step_plan["gpu_memory_required"] = estimated_memory

Context Freezing and Immutability

After compilation, contexts are frozen to ensure immutability during execution:

# After all compilation phases
context.freeze()

# Context becomes immutable - no further modifications allowed
# Execution phase operates on frozen, validated contexts

Function Pattern Resolution

The compilation system resolves complex function patterns:

Single Function Pattern:

FunctionStep(func=normalize_images)
# Resolves to: normalize_images(image_stack)

Function with Parameters:

FunctionStep(func=(normalize_images, {'percentile': 99.0}))
# Resolves to: normalize_images(image_stack, percentile=99.0)

Function Chain Pattern:

FunctionStep(func=[
    (normalize_images, {'percentile': 99.0}),
    (apply_filter, {'sigma': 2.0})
])
# Resolves to: apply_filter(normalize_images(image_stack, percentile=99.0), sigma=2.0)

Dict Pattern (Channel-specific):

FunctionStep(func={
    '1': count_nuclei,
    '2': trace_neurites
}, variable_components=[VariableComponents.CHANNEL])
# Resolves to different functions per channel

Special I/O Integration

The compilation system handles cross-step communication:

Special Outputs Declaration:

@special_outputs("positions", "metadata")
def generate_positions(image_stack):
    return processed_stack, positions, metadata

Special Inputs Consumption:

@special_inputs("positions")
def assemble_images(image_tiles, positions):
    return assembled_image

Compilation Integration:

# Compilation links special I/O paths
step_plans["generate_positions"]["special_outputs"] = {
    "positions": "/memory/positions_data",
    "metadata": "/memory/metadata_data"
}

step_plans["assemble_images"]["special_inputs"] = {
    "positions": "/memory/positions_data"
}

Execution Phase Integration

Compiled contexts enable stateless execution:

# Execution retrieves everything from frozen context
def process(self, context: ProcessingContext):
    step_id = get_step_id(self)
    step_plan = context.step_plans[step_id]

    # All execution parameters from compilation
    func_from_plan = step_plan['func']
    input_memory_type = step_plan['input_memory_type']
    output_memory_type = step_plan['output_memory_type']
    gpu_id = step_plan.get('gpu_id')

    # Execute with compiled parameters
    result = func_from_plan(input_data)

Performance Benefits

The compilation system provides significant performance benefits:

Validation at Compile Time: Catch errors before processing begins Resource Optimization: Optimal GPU and memory allocation Path Optimization: Efficient I/O path planning Memory Type Planning: Minimize conversions between memory types Parallel Compilation: Compile multiple wells simultaneously

Error Handling

Compilation failures are caught early with detailed error messages:

try:
    compiled_contexts = orchestrator.compile_pipelines(pipeline_definition)
except ValidationError as e:
    print(f"Pipeline validation failed: {e}")
    # Fix pipeline before execution
except MemoryTypeError as e:
    print(f"Memory type incompatibility: {e}")
    # Adjust memory types or add conversions

Best Practices

Pipeline Design: - Group functions by memory type to minimize conversions - Use special I/O for cross-step communication - Design for parallel execution across wells

Function Development: - Always use memory type decorators - Declare special inputs/outputs explicitly - Follow 3D array output conventions

Resource Management: - Configure appropriate ZARR settings for large datasets - Set memory limits to trigger automatic ZARR usage - Use GPU memory types for compute-intensive operations

See Also