Orchestrator Cleanup Guarantees
Module: openhcs.core.orchestrator.orchestrator Status: STABLE
—
Overview
Worker processes accumulate memory and GPU resources when processing multiple wells sequentially. The orchestrator cleanup system guarantees resource cleanup before error returns, preventing memory leaks and zombie workers.
Problem Context
Without guaranteed cleanup, worker processes leak resources:
# Worker processes well A
process_well_A() # Allocates GPU memory, creates VFS mappings
# Worker processes well B
process_well_B() # Allocates MORE GPU memory, MORE VFS mappings
# Worker processes well C → ERROR
process_well_C() # Raises exception
# Without cleanup: GPU memory and VFS from A, B, C still allocated
# Worker process is now a zombie with leaked resources
This causes:
Memory exhaustion: GPU memory fills up across sequential wells
VFS pollution: Memory backend accumulates stale file mappings
Zombie workers: Processes hang with unreleased resources
Solution: Cleanup Before Error Return
The orchestrator guarantees cleanup REGARDLESS of success or failure:
def _execute_axis_with_sequential_combinations(
pipeline_definition, axis_contexts, visualizer
):
"""Execute sequential combinations with guaranteed cleanup."""
for combo_idx, (context_key, frozen_context) in enumerate(axis_contexts):
# Execute this combination
result = _execute_single_axis_static(
pipeline_definition, frozen_context, visualizer
)
# CRITICAL: Clear VFS and GPU REGARDLESS of success/failure
# This must happen BEFORE checking result status
from polystore.base import reset_memory_backend
from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks
reset_memory_backend()
if cleanup_all_gpu_frameworks:
cleanup_all_gpu_frameworks()
# NOW check if combination failed (after cleanup)
if not result.success:
return result # Return error with clean state
Key insight: Cleanup happens BEFORE error return, ensuring worker process is in clean state even when errors occur.
Cleanup Components
VFS Reset
The memory backend maintains a virtual file system mapping real paths to in-memory data:
from polystore.base import reset_memory_backend
# Clear all VFS mappings
reset_memory_backend()
What this clears:
In-memory file mappings
Cached metadata
Virtual path registrations
Backend state
Why this matters: Without reset, VFS accumulates mappings from previous wells, causing memory growth and stale data access.
GPU Framework Cleanup
GPU frameworks (CuPy, PyTorch, TensorFlow) cache memory pools and kernels:
from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks
# Clear GPU memory pools and caches
if cleanup_all_gpu_frameworks:
cleanup_all_gpu_frameworks()
What this clears:
CuPy memory pools
PyTorch cached allocations
TensorFlow graph caches
CUDA context state
Why this matters: GPU memory pools grow across sequential wells. Cleanup releases memory back to GPU.
Cleanup Timing Guarantees
After Each Sequential Combination
# Sequential processing: A → B → C
for combination in sequential_combinations:
result = process_combination(combination)
# Cleanup AFTER each combination
reset_memory_backend()
cleanup_all_gpu_frameworks()
# Check result AFTER cleanup
if not result.success:
return result # Clean state guaranteed
Guarantee: Each combination starts with clean VFS and GPU state.
Before Error Return
try:
result = process_combination(combination)
except Exception as e:
# Cleanup BEFORE raising exception
reset_memory_backend()
cleanup_all_gpu_frameworks()
raise # Exception raised with clean state
Guarantee: Exceptions don’t prevent cleanup. Worker process is clean even on errors.
After Successful Completion
# All combinations successful
for combination in combinations:
result = process_combination(combination)
reset_memory_backend()
cleanup_all_gpu_frameworks()
# Final cleanup after all combinations
reset_memory_backend()
cleanup_all_gpu_frameworks()
return success_result
Guarantee: Worker process is clean after successful completion, ready for next task.
Operational Implications
Memory Stability
Cleanup guarantees prevent memory growth across sequential processing:
# Without cleanup
# Well 1: 2GB GPU memory used
# Well 2: 4GB GPU memory used (2GB + 2GB leaked)
# Well 3: 6GB GPU memory used (4GB + 2GB leaked)
# Well 4: OOM error (out of memory)
# With cleanup
# Well 1: 2GB GPU memory used → cleanup → 0GB
# Well 2: 2GB GPU memory used → cleanup → 0GB
# Well 3: 2GB GPU memory used → cleanup → 0GB
# Well 4: 2GB GPU memory used → cleanup → 0GB
Operational benefit: Stable memory usage enables processing large plate datasets without OOM errors.
Worker Process Reuse
Clean workers can be reused for subsequent tasks:
# Worker pool with 4 processes
with ProcessPoolExecutor(max_workers=4) as executor:
# Process 96-well plate
for well in wells:
future = executor.submit(process_well, well)
# Worker cleanup guaranteed after each well
# Same worker can process next well without restart
Operational benefit: Worker process reuse reduces overhead from process creation/destruction.
Zombie Worker Prevention
Cleanup before error return prevents zombie workers:
# Worker processes well with error
try:
process_well(well)
except Exception as e:
# Cleanup guaranteed before exception propagates
reset_memory_backend()
cleanup_all_gpu_frameworks()
raise
# Worker is clean, can be terminated or reused
Operational benefit: No zombie workers with leaked resources. Clean shutdown guaranteed.
Implementation Patterns
Sequential Combination Processing
def _execute_axis_with_sequential_combinations(
pipeline_definition, axis_contexts, visualizer
):
"""Execute sequential combinations with cleanup guarantees."""
for combo_idx, (context_key, frozen_context) in enumerate(axis_contexts):
# Execute combination
result = _execute_single_axis_static(
pipeline_definition, frozen_context, visualizer
)
# CRITICAL: Cleanup BEFORE checking result
reset_memory_backend()
if cleanup_all_gpu_frameworks:
cleanup_all_gpu_frameworks()
# Check result AFTER cleanup
if not result.success:
return result
return ExecutionResult.success(axis_id=axis_id)
Multiprocessing Worker Function
def _worker_process_well(well_context):
"""Worker function with cleanup guarantees."""
try:
# Process well
result = process_pipeline(well_context)
# Cleanup on success
reset_memory_backend()
cleanup_all_gpu_frameworks()
return result
except Exception as e:
# Cleanup on error
reset_memory_backend()
cleanup_all_gpu_frameworks()
# Return error result (don't raise to prevent worker crash)
return ExecutionResult.failure(error=str(e))
ZMQ Execution Server
The ZMQ execution server ensures cleanup even for remote execution:
class ZMQExecutionServer:
def execute_pipeline(self, request):
"""Execute pipeline with cleanup guarantees."""
try:
result = self.orchestrator.execute_compiled_plate(...)
# Cleanup after execution
reset_memory_backend()
cleanup_all_gpu_frameworks()
return result
except Exception as e:
# Cleanup on error
reset_memory_backend()
cleanup_all_gpu_frameworks()
raise
Key insight: Cleanup guarantees extend to remote execution via ZMQ server.
Implementation Notes
🔬 Source Code:
Sequential cleanup:
openhcs/core/orchestrator/orchestrator.py(line 199)Worker cleanup:
openhcs/core/orchestrator/orchestrator.py(line 200-208)ZMQ server cleanup:
openhcs/runtime/zmq_execution_server.py(line 448)
🏗️ Architecture:
../concepts/memory-backend-system - VFS architecture
gpu-resource-management - GPU cleanup system
concurrency-model - Multiprocessing architecture
📊 Performance:
VFS reset: < 1ms (clears dictionary mappings)
GPU cleanup: 10-100ms (depends on allocated memory)
Total overhead: < 1% of well processing time
Key Design Decisions
Why cleanup before error return?
Ensures worker process is in clean state even when errors occur. Prevents zombie workers with leaked resources.
Why cleanup after each sequential combination?
Sequential processing accumulates resources across combinations. Per-combination cleanup prevents memory growth.
Why separate VFS and GPU cleanup?
VFS cleanup is always safe. GPU cleanup is conditional (only if GPU frameworks are loaded). Separation enables flexibility.
Common Gotchas
Don’t skip cleanup on errors: Cleanup must happen BEFORE error return
Don’t assume cleanup is automatic: Explicit cleanup calls required
GPU cleanup is conditional: Check if
cleanup_all_gpu_frameworksexists before callingVFS reset is global: Affects all threads in worker process
Debugging Cleanup Issues
Symptom: Memory Growth Across Wells
Cause: Cleanup not called or failing silently
Diagnosis:
# Add logging to verify cleanup
logger.debug("Before cleanup: VFS size = ...")
reset_memory_backend()
logger.debug("After cleanup: VFS size = 0")
Fix: Ensure cleanup is called after each well
Symptom: Zombie Workers
Cause: Exception preventing cleanup
Diagnosis:
# Check worker process state
ps aux | grep python # Look for zombie processes
Fix: Wrap cleanup in try/finally to guarantee execution
Symptom: GPU OOM Errors
Cause: GPU cleanup not called or ineffective
Diagnosis:
# Check GPU memory usage
nvidia-smi # Monitor GPU memory across wells
Fix: Verify cleanup_all_gpu_frameworks is called and working