OpenHCS Concurrency Modelο
The Problem: Thread Safety in Image Processingο
Image processing pipelines need to process multiple wells in parallel to achieve reasonable throughput for high-content screening. However, parallel execution creates thread safety challenges: shared state, race conditions, and deadlocks. Traditional approaches use complex locking mechanisms that are hard to reason about and prone to bugs. For long-running microscopy workflows (hours or days), even rare race conditions become critical failures.
The Solution: Immutable Compilation + Well-Level Parallelismο
OpenHCS implements a well-level parallelism model with thread isolation and immutable compilation artifacts. This design provides performance while maintaining thread safety through architectural constraints rather than complex locking mechanisms. By compiling pipelines once (single-threaded) and then executing them immutably (multi-threaded), the system eliminates entire classes of concurrency bugs.
Overviewο
The system separates compilation (single-threaded, mutable) from execution (multi-threaded, immutable), ensuring thread safety through design rather than synchronization.
Note: This document describes the actual concurrency implementation. Some advanced features like runtime GPU slot management are planned for future development.
Core Concurrency Philosophyο
Well-Level Parallelismο
Parallel Unit: Each well is processed independently in its own thread
Sequential Within Well: All steps for a well execute sequentially in the same thread
No Cross-Well Dependencies: Wells never share data or coordinate during execution
Immutable Compilation Artifactsο
Compile Once, Execute Many: Step plans compiled once, then immutable during execution
Frozen Contexts: ProcessingContext frozen after compilation, preventing state corruption
Stateless Steps: Step objects stripped of mutable state after compilation
Concurrency Architectureο
Two-Phase Execution Modelο
# Phase 1: Compilation (Single-threaded)
compiled_contexts = orchestrator.compile_pipelines(pipeline_definition, wells)
# Result: Immutable ProcessingContext for each well
# Phase 2: Execution (Multi-threaded)
results = orchestrator.execute_compiled_plate(pipeline_definition, compiled_contexts)
# Result: Parallel execution across wells
Thread Pool Executionο
def execute_compiled_plate(self, pipeline_definition, compiled_contexts, max_workers=None):
"""Execute with configurable parallelism."""
actual_max_workers = max_workers or self.global_config.num_workers
if actual_max_workers > 1 and len(compiled_contexts) > 1:
# Choose executor type based on global config
if self.global_config.use_threading:
# ThreadPoolExecutor for debugging
executor = concurrent.futures.ThreadPoolExecutor(max_workers=actual_max_workers)
else:
# ProcessPoolExecutor for true parallelism
executor = concurrent.futures.ProcessPoolExecutor(max_workers=actual_max_workers)
with executor:
future_to_well_id = {
executor.submit(self._execute_single_well, pipeline_definition, context, visualizer): well_id
for well_id, context in compiled_contexts.items()
}
for future in concurrent.futures.as_completed(future_to_well_id):
well_id = future_to_well_id[future]
try:
result = future.result()
execution_results[well_id] = result
except Exception as exc:
logger.error(f"Well {well_id} exception: {exc}", exc_info=True)
execution_results[well_id] = {"status": "error", "error": str(exc)}
else:
# Sequential execution
for well_id, context in compiled_contexts.items():
execution_results[well_id] = self._execute_single_well(pipeline_definition, context, visualizer)
π Thread Safety Mechanismsο
1. Immutable State Designο
Frozen ProcessingContextο
class ProcessingContext:
"""Context with immutability enforcement."""
def freeze(self):
"""Make context immutable after compilation."""
self._is_frozen = True
def __setattr__(self, name, value):
"""Prevent modification of frozen context."""
if getattr(self, '_is_frozen', False) and name != '_is_frozen':
raise AttributeError(f"Cannot modify '{name}' of frozen ProcessingContext")
super().__setattr__(name, value)
Thread Safety Guarantee: Frozen contexts cannot be modified, eliminating race conditions.
Stateless Step Designο
# Step objects are designed to be stateless after compilation
# They read configuration from immutable step_plans in ProcessingContext
# No mutable state is stored in step objects during execution
class FunctionStep(AbstractStep):
def process(self, context: 'ProcessingContext', step_index: int) -> None:
# Read configuration from immutable context
step_plan = context.step_plans[step_index]
# All execution state comes from context, not step object
Thread Safety Guarantee: Step objects with no mutable state can be safely shared across threads.
2. Thread-Local Resource Isolationο
FileManager Per Threadο
class FileManager:
"""FileManager with strict thread isolation."""
def __init__(self, registry):
# Thread Safety:
# Each FileManager instance must be scoped to a single execution context.
# Do NOT share FileManager instances across pipelines or threads.
# For isolation, create a dedicated registry for each FileManager.
self.registry = registry
self._backend_cache = {} # Per-instance backend cache
Thread Safety Guarantee: Each thread gets its own FileManager instance with isolated backend cache.
Backend Instance Isolationο
def _get_backend(self, backend_name):
"""Get backend with per-FileManager caching."""
# Thread Safety:
# This method is thread-safe for a single FileManager instance.
# Backend instances are NOT shared across FileManager instances.
if backend_name not in self._backend_cache:
backend_class = self.registry[backend_name]
self._backend_cache[backend_name] = backend_class() # New instance per FileManager
return self._backend_cache[backend_name]
Thread Safety Guarantee: Backend instances are never shared between threads.
3. Global Resource Coordinationο
Thread-Safe GPU Registryο
See: GPU Resource Management for comprehensive GPU coordination details.
Concurrency Aspects: - GPU registry access is thread-safe with atomic operations - GPU assignment happens at compilation time, not runtime - No runtime slot acquisition/release needed in current implementation - Registry status queries are protected by locks for consistency
Thread Safety Guarantee: GPU registry access is atomic and consistent across threads.
Memory Backend Isolationο
class MemoryStorageBackend(StorageBackend):
"""Memory backend with per-instance storage."""
def __init__(self):
self._memory_store = {} # Per-instance memory store
self._prefixes = set() # Per-instance namespace tracking
Thread Safety Guarantee: Each thread gets its own memory backend instance with isolated storage.
π Execution Flow Thread Safetyο
Single Well Executionο
def _execute_single_well(self, pipeline_definition, context, visualizer):
"""Execute pipeline for single well - thread-safe by design."""
# 1. Context is frozen (immutable)
assert context.is_frozen()
# 2. Each thread has its own FileManager
filemanager = context.filemanager # Thread-local instance
# 3. GPU assignment handled at compilation time
# No runtime GPU slot management needed
try:
# 4. Sequential step execution within thread
for step in pipeline_definition:
step.process(context) # Step is stateless, context is immutable
return {"status": "success", "well_id": context.well_id}
except Exception as e:
logger.error(f"Pipeline execution failed for well {context.well_id}: {e}")
return {"status": "error", "well_id": context.well_id, "error": str(e)}
FunctionStep Thread Safetyο
def process(self, context: 'ProcessingContext', step_index: int) -> None:
"""FunctionStep execution - thread-safe by design."""
# 1. Read immutable step plan
step_plan = context.step_plans[step_index] # Immutable after compilation
# 2. Use thread-local FileManager
filemanager = context.filemanager # Thread-local instance
# 3. Load data using isolated backends
for file_path in matching_files:
image = filemanager.load_image(file_path, read_backend) # Isolated backend
raw_slices.append(image)
# 4. Process data (pure computation)
result = func(image_stack) # Function operates on local data
# 5. Save data using isolated backends
for i, slice_2d in enumerate(output_slices):
filemanager.save_image(slice_2d, output_path, write_backend) # Isolated backend
π― Concurrency Guaranteesο
What is Thread-Safe:ο
β Immutable Data Structuresο
Frozen ProcessingContext: Cannot be modified after compilation
Step Plans: Immutable dictionaries with execution configuration
Stateless Steps: No mutable state after attribute stripping
β Thread-Local Resourcesο
FileManager Instances: One per thread, never shared
Backend Instances: Isolated per FileManager
Memory Storage: Separate memory store per backend instance
β Atomic Global Operationsο
GPU Registry Access: Protected by locks for atomic updates
Configuration Access: Immutable configuration objects
What Requires Coordination:ο
π GPU Resource Managementο
See: GPU Resource Management for complete GPU coordination architecture.
Concurrency Considerations: - Registry status queries use atomic reads with locks - GPU assignment during compilation phase (thread-safe) - Registry initialization is one-time with thread-safe checks
π Global Configuration Updatesο
Live Config Changes: Coordinated through orchestrator
Registry Initialization: One-time setup with thread-safe checks
β‘ Performance Characteristicsο
Scalability Modelο
# Optimal parallelism calculation
max_workers = min(
num_wells, # Don't create more threads than wells
global_config.num_workers, # Respect configured limit
available_gpu_slots # Don't exceed GPU capacity
)
Resource Utilizationο
CPU Cores: One thread per core (configurable)
GPU Devices: Multiple pipelines per GPU (based on memory capacity)
Memory: Isolated per thread, no sharing overhead
I/O: Parallel disk access across threads
Contention Pointsο
GPU Registry: Minimal contention (fast lock operations)
Disk I/O: Natural parallelism across different directories
Memory Allocation: Thread-local, no contention
π Advanced Concurrency Featuresο
Exception Isolationο
# Exceptions in one well don't affect others
for future in concurrent.futures.as_completed(future_to_well_id):
well_id = future_to_well_id[future]
try:
result = future.result()
execution_results[well_id] = result
except Exception as exc:
# Exception isolated to this well
logger.error(f"Well {well_id} exception: {exc}", exc_info=True)
execution_results[well_id] = {"status": "error", "error": str(exc)}
# Other wells continue processing
Resource Cleanupο
def _execute_single_well(self, pipeline_definition, context, visualizer):
"""Guaranteed resource cleanup per thread."""
try:
# Execute pipeline steps
for step in pipeline_definition:
step.process(context)
return {"status": "success", "well_id": context.well_id}
except Exception as e:
# Exception handling and cleanup
logger.error(f"Pipeline execution failed for well {context.well_id}: {e}")
return {"status": "error", "well_id": context.well_id, "error": str(e)}
Graceful Degradationο
# Automatic fallback to sequential execution
if actual_max_workers <= 1 or len(compiled_contexts) <= 1:
logger.info("Executing wells sequentially")
for well_id, context in compiled_contexts.items():
execution_results[well_id] = self._execute_single_well(pipeline_definition, context, visualizer)
Design Rationaleο
The concurrency model achieves thread safety through architectural constraints rather than complex locking:
Immutable State Designο
Frozen ProcessingContext: Cannot be modified after compilation
Thread Isolation: No shared mutable resources between threads
Minimal Coordination: Only GPU registry requires locking
Error Isolationο
Well-Level Failures: One well failure doesnβt affect others
Resource Cleanup: Guaranteed cleanup per thread
Exception Propagation: Clear error reporting per well
Performance Characteristicsο
Linear Scaling: Performance scales with number of cores/GPUs
No Lock Contention: Minimal synchronization overhead
Resource Efficiency: Optimal utilization of available hardware
Metadata JSON Writingο
During step execution, after data is written to disk or zarr backends, OpenHCS automatically generates openhcs_metadata.json files:
# In FunctionStep.process() after writing output data
if actual_write_backend not in [Backend.OMERO_LOCAL.value, Backend.MEMORY.value]:
from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator
metadata_generator = OpenHCSMetadataGenerator(context.filemanager)
# Create metadata reflecting actual disk state
metadata_generator.create_metadata(
context,
step_plan['output_dir'],
actual_write_backend,
is_main=is_pipeline_output,
plate_root=step_plan['output_plate_root'],
sub_dir=step_plan['sub_dir']
)
This metadata generation is thread-safe because:
Per-Thread FileManager: Each thread has isolated file operations
Atomic Writes: Metadata uses atomic writer to prevent corruption
No Cross-Thread Coordination: Each well writes its own metadata independently
Current Implementation Statusο
Implemented Featuresο
β Two-phase execution model (compilation + execution)
β Well-level parallelism with ThreadPoolExecutor/ProcessPoolExecutor
β ProcessingContext freezing for immutability
β Thread-safe GPU registry with compilation-time assignment
β FileManager thread isolation with per-instance backend cache
β Exception isolation with per-well error handling
β Graceful degradation to sequential execution
Future Enhancementsο
Runtime GPU Slot Management: Dynamic GPU slot acquisition/release during execution (see GPU Resource Management)
Work Stealing: Dynamic load balancing between threads
Pipeline Parallelism: Parallel execution of steps within a well
Distributed Processing: Multi-node execution coordination
Adaptive Threading: Dynamic thread pool sizing based on workload
Memory Pool Management: Shared memory pools for large datasets
This concurrency model achieves thread safety and performance through immutable compilation artifacts and thread-local resource isolation, eliminating the need for complex synchronization logic while maintaining predictable scaling characteristics.