OpenHCS Concurrency Model

The Problem: Thread Safety in Image Processing

Image processing pipelines need to process multiple wells in parallel to achieve reasonable throughput for high-content screening. However, parallel execution creates thread safety challenges: shared state, race conditions, and deadlocks. Traditional approaches use complex locking mechanisms that are hard to reason about and prone to bugs. For long-running microscopy workflows (hours or days), even rare race conditions become critical failures.

The Solution: Immutable Compilation + Well-Level Parallelism

OpenHCS implements a well-level parallelism model with thread isolation and immutable compilation artifacts. This design provides performance while maintaining thread safety through architectural constraints rather than complex locking mechanisms. By compiling pipelines once (single-threaded) and then executing them immutably (multi-threaded), the system eliminates entire classes of concurrency bugs.

Overview

The system separates compilation (single-threaded, mutable) from execution (multi-threaded, immutable), ensuring thread safety through design rather than synchronization.

Note: This document describes the actual concurrency implementation. Some advanced features like runtime GPU slot management are planned for future development.

Core Concurrency Philosophy

Well-Level Parallelism

  • Parallel Unit: Each well is processed independently in its own thread

  • Sequential Within Well: All steps for a well execute sequentially in the same thread

  • No Cross-Well Dependencies: Wells never share data or coordinate during execution

Immutable Compilation Artifacts

  • Compile Once, Execute Many: Step plans compiled once, then immutable during execution

  • Frozen Contexts: ProcessingContext frozen after compilation, preventing state corruption

  • Stateless Steps: Step objects stripped of mutable state after compilation

Concurrency Architecture

Two-Phase Execution Model

# Phase 1: Compilation (Single-threaded)
compiled_contexts = orchestrator.compile_pipelines(pipeline_definition, wells)
# Result: Immutable ProcessingContext for each well

# Phase 2: Execution (Multi-threaded)
results = orchestrator.execute_compiled_plate(pipeline_definition, compiled_contexts)
# Result: Parallel execution across wells

Thread Pool Execution

def execute_compiled_plate(self, pipeline_definition, compiled_contexts, max_workers=None):
    """Execute with configurable parallelism."""

    actual_max_workers = max_workers or self.global_config.num_workers

    if actual_max_workers > 1 and len(compiled_contexts) > 1:
        # Choose executor type based on global config
        if self.global_config.use_threading:
            # ThreadPoolExecutor for debugging
            executor = concurrent.futures.ThreadPoolExecutor(max_workers=actual_max_workers)
        else:
            # ProcessPoolExecutor for true parallelism
            executor = concurrent.futures.ProcessPoolExecutor(max_workers=actual_max_workers)

        with executor:
            future_to_well_id = {
                executor.submit(self._execute_single_well, pipeline_definition, context, visualizer): well_id
                for well_id, context in compiled_contexts.items()
            }

            for future in concurrent.futures.as_completed(future_to_well_id):
                well_id = future_to_well_id[future]
                try:
                    result = future.result()
                    execution_results[well_id] = result
                except Exception as exc:
                    logger.error(f"Well {well_id} exception: {exc}", exc_info=True)
                    execution_results[well_id] = {"status": "error", "error": str(exc)}
    else:
        # Sequential execution
        for well_id, context in compiled_contexts.items():
            execution_results[well_id] = self._execute_single_well(pipeline_definition, context, visualizer)

πŸ”’ Thread Safety Mechanisms

1. Immutable State Design

Frozen ProcessingContext

class ProcessingContext:
    """Context with immutability enforcement."""

    def freeze(self):
        """Make context immutable after compilation."""
        self._is_frozen = True

    def __setattr__(self, name, value):
        """Prevent modification of frozen context."""
        if getattr(self, '_is_frozen', False) and name != '_is_frozen':
            raise AttributeError(f"Cannot modify '{name}' of frozen ProcessingContext")
        super().__setattr__(name, value)

Thread Safety Guarantee: Frozen contexts cannot be modified, eliminating race conditions.

Stateless Step Design

# Step objects are designed to be stateless after compilation
# They read configuration from immutable step_plans in ProcessingContext
# No mutable state is stored in step objects during execution
class FunctionStep(AbstractStep):
    def process(self, context: 'ProcessingContext', step_index: int) -> None:
        # Read configuration from immutable context
        step_plan = context.step_plans[step_index]
        # All execution state comes from context, not step object

Thread Safety Guarantee: Step objects with no mutable state can be safely shared across threads.

2. Thread-Local Resource Isolation

FileManager Per Thread

class FileManager:
    """FileManager with strict thread isolation."""

    def __init__(self, registry):
        # Thread Safety:
        #   Each FileManager instance must be scoped to a single execution context.
        #   Do NOT share FileManager instances across pipelines or threads.
        #   For isolation, create a dedicated registry for each FileManager.
        self.registry = registry
        self._backend_cache = {}  # Per-instance backend cache

Thread Safety Guarantee: Each thread gets its own FileManager instance with isolated backend cache.

Backend Instance Isolation

def _get_backend(self, backend_name):
    """Get backend with per-FileManager caching."""
    # Thread Safety:
    #   This method is thread-safe for a single FileManager instance.
    #   Backend instances are NOT shared across FileManager instances.
    if backend_name not in self._backend_cache:
        backend_class = self.registry[backend_name]
        self._backend_cache[backend_name] = backend_class()  # New instance per FileManager

    return self._backend_cache[backend_name]

Thread Safety Guarantee: Backend instances are never shared between threads.

3. Global Resource Coordination

Thread-Safe GPU Registry

See: GPU Resource Management for comprehensive GPU coordination details.

Concurrency Aspects: - GPU registry access is thread-safe with atomic operations - GPU assignment happens at compilation time, not runtime - No runtime slot acquisition/release needed in current implementation - Registry status queries are protected by locks for consistency

Thread Safety Guarantee: GPU registry access is atomic and consistent across threads.

Memory Backend Isolation

class MemoryStorageBackend(StorageBackend):
    """Memory backend with per-instance storage."""

    def __init__(self):
        self._memory_store = {}  # Per-instance memory store
        self._prefixes = set()   # Per-instance namespace tracking

Thread Safety Guarantee: Each thread gets its own memory backend instance with isolated storage.

πŸ”„ Execution Flow Thread Safety

Single Well Execution

def _execute_single_well(self, pipeline_definition, context, visualizer):
    """Execute pipeline for single well - thread-safe by design."""

    # 1. Context is frozen (immutable)
    assert context.is_frozen()

    # 2. Each thread has its own FileManager
    filemanager = context.filemanager  # Thread-local instance

    # 3. GPU assignment handled at compilation time
    # No runtime GPU slot management needed

    try:
        # 4. Sequential step execution within thread
        for step in pipeline_definition:
            step.process(context)  # Step is stateless, context is immutable

        return {"status": "success", "well_id": context.well_id}

    except Exception as e:
        logger.error(f"Pipeline execution failed for well {context.well_id}: {e}")
        return {"status": "error", "well_id": context.well_id, "error": str(e)}

FunctionStep Thread Safety

def process(self, context: 'ProcessingContext', step_index: int) -> None:
    """FunctionStep execution - thread-safe by design."""

    # 1. Read immutable step plan
    step_plan = context.step_plans[step_index]  # Immutable after compilation

    # 2. Use thread-local FileManager
    filemanager = context.filemanager  # Thread-local instance

    # 3. Load data using isolated backends
    for file_path in matching_files:
        image = filemanager.load_image(file_path, read_backend)  # Isolated backend
        raw_slices.append(image)

    # 4. Process data (pure computation)
    result = func(image_stack)  # Function operates on local data

    # 5. Save data using isolated backends
    for i, slice_2d in enumerate(output_slices):
        filemanager.save_image(slice_2d, output_path, write_backend)  # Isolated backend

🎯 Concurrency Guarantees

What is Thread-Safe:

βœ… Immutable Data Structures

  • Frozen ProcessingContext: Cannot be modified after compilation

  • Step Plans: Immutable dictionaries with execution configuration

  • Stateless Steps: No mutable state after attribute stripping

βœ… Thread-Local Resources

  • FileManager Instances: One per thread, never shared

  • Backend Instances: Isolated per FileManager

  • Memory Storage: Separate memory store per backend instance

βœ… Atomic Global Operations

  • GPU Registry Access: Protected by locks for atomic updates

  • Configuration Access: Immutable configuration objects

What Requires Coordination:

πŸ”’ GPU Resource Management

See: GPU Resource Management for complete GPU coordination architecture.

Concurrency Considerations: - Registry status queries use atomic reads with locks - GPU assignment during compilation phase (thread-safe) - Registry initialization is one-time with thread-safe checks

πŸ”’ Global Configuration Updates

  • Live Config Changes: Coordinated through orchestrator

  • Registry Initialization: One-time setup with thread-safe checks

⚑ Performance Characteristics

Scalability Model

# Optimal parallelism calculation
max_workers = min(
    num_wells,                    # Don't create more threads than wells
    global_config.num_workers,    # Respect configured limit
    available_gpu_slots           # Don't exceed GPU capacity
)

Resource Utilization

  • CPU Cores: One thread per core (configurable)

  • GPU Devices: Multiple pipelines per GPU (based on memory capacity)

  • Memory: Isolated per thread, no sharing overhead

  • I/O: Parallel disk access across threads

Contention Points

  • GPU Registry: Minimal contention (fast lock operations)

  • Disk I/O: Natural parallelism across different directories

  • Memory Allocation: Thread-local, no contention

πŸš€ Advanced Concurrency Features

Exception Isolation

# Exceptions in one well don't affect others
for future in concurrent.futures.as_completed(future_to_well_id):
    well_id = future_to_well_id[future]
    try:
        result = future.result()
        execution_results[well_id] = result
    except Exception as exc:
        # Exception isolated to this well
        logger.error(f"Well {well_id} exception: {exc}", exc_info=True)
        execution_results[well_id] = {"status": "error", "error": str(exc)}
        # Other wells continue processing

Resource Cleanup

def _execute_single_well(self, pipeline_definition, context, visualizer):
    """Guaranteed resource cleanup per thread."""

    try:
        # Execute pipeline steps
        for step in pipeline_definition:
            step.process(context)

        return {"status": "success", "well_id": context.well_id}

    except Exception as e:
        # Exception handling and cleanup
        logger.error(f"Pipeline execution failed for well {context.well_id}: {e}")
        return {"status": "error", "well_id": context.well_id, "error": str(e)}

Graceful Degradation

# Automatic fallback to sequential execution
if actual_max_workers <= 1 or len(compiled_contexts) <= 1:
    logger.info("Executing wells sequentially")
    for well_id, context in compiled_contexts.items():
        execution_results[well_id] = self._execute_single_well(pipeline_definition, context, visualizer)

Design Rationale

The concurrency model achieves thread safety through architectural constraints rather than complex locking:

Immutable State Design

  • Frozen ProcessingContext: Cannot be modified after compilation

  • Thread Isolation: No shared mutable resources between threads

  • Minimal Coordination: Only GPU registry requires locking

Error Isolation

  • Well-Level Failures: One well failure doesn’t affect others

  • Resource Cleanup: Guaranteed cleanup per thread

  • Exception Propagation: Clear error reporting per well

Performance Characteristics

  • Linear Scaling: Performance scales with number of cores/GPUs

  • No Lock Contention: Minimal synchronization overhead

  • Resource Efficiency: Optimal utilization of available hardware

Metadata JSON Writing

During step execution, after data is written to disk or zarr backends, OpenHCS automatically generates openhcs_metadata.json files:

# In FunctionStep.process() after writing output data
if actual_write_backend not in [Backend.OMERO_LOCAL.value, Backend.MEMORY.value]:
    from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator
    metadata_generator = OpenHCSMetadataGenerator(context.filemanager)

    # Create metadata reflecting actual disk state
    metadata_generator.create_metadata(
        context,
        step_plan['output_dir'],
        actual_write_backend,
        is_main=is_pipeline_output,
        plate_root=step_plan['output_plate_root'],
        sub_dir=step_plan['sub_dir']
    )

This metadata generation is thread-safe because:

  • Per-Thread FileManager: Each thread has isolated file operations

  • Atomic Writes: Metadata uses atomic writer to prevent corruption

  • No Cross-Thread Coordination: Each well writes its own metadata independently

Current Implementation Status

Implemented Features

  • βœ… Two-phase execution model (compilation + execution)

  • βœ… Well-level parallelism with ThreadPoolExecutor/ProcessPoolExecutor

  • βœ… ProcessingContext freezing for immutability

  • βœ… Thread-safe GPU registry with compilation-time assignment

  • βœ… FileManager thread isolation with per-instance backend cache

  • βœ… Exception isolation with per-well error handling

  • βœ… Graceful degradation to sequential execution

Future Enhancements

  1. Runtime GPU Slot Management: Dynamic GPU slot acquisition/release during execution (see GPU Resource Management)

  2. Work Stealing: Dynamic load balancing between threads

  3. Pipeline Parallelism: Parallel execution of steps within a well

  4. Distributed Processing: Multi-node execution coordination

  5. Adaptive Threading: Dynamic thread pool sizing based on workload

  6. Memory Pool Management: Shared memory pools for large datasets

This concurrency model achieves thread safety and performance through immutable compilation artifacts and thread-local resource isolation, eliminating the need for complex synchronization logic while maintaining predictable scaling characteristics.