Multiprocessing Coordination System
Overview
Traditional OpenHCS multiprocessing was hardcoded to use wells as the parallelization axis. The MultiprocessingCoordinator eliminates this assumption by providing axis-agnostic task coordination that can use any component for parallelization.
class MultiprocessingCoordinator(Generic[T]):
def __init__(self, config: ComponentConfiguration[T]):
self.config = config
self.axis = config.multiprocessing_axis
def create_tasks(self, orchestrator, axis_filter=None) -> Dict[str, Task[T]]:
"""Create tasks for multiprocessing axis values."""
axis_values = orchestrator.get_component_keys(self.axis, axis_filter)
return {value: Task(axis_value=value, context=orchestrator.create_context(value))
for value in axis_values}
This enables optimal parallelization strategies - wells, timepoints, batches, or any other component - without code changes.
Task Coordination
The coordinator creates and executes tasks for any multiprocessing axis.
Task Creation
def create_tasks(self, orchestrator, axis_filter=None) -> Dict[str, Task[T]]:
"""Create tasks for multiprocessing axis values."""
# Get axis values from orchestrator using the multiprocessing axis component directly
axis_values = orchestrator.get_component_keys(self.axis, axis_filter)
if not axis_values:
logger.warning(f"No {self.axis.value} values found for multiprocessing")
return {}
# Create tasks
tasks = {}
for axis_value in axis_values:
context = orchestrator.create_context(axis_value)
tasks[axis_value] = Task(axis_value=axis_value, context=context)
return tasks
Tasks are created dynamically based on the available component values for the configured multiprocessing axis.
Task Execution
def execute_tasks(self, tasks: Dict[str, Task[T]], pipeline_definition: List[Any],
executor, processor_func) -> Dict[str, Any]:
"""Execute tasks using the provided executor."""
if not tasks:
logger.warning("No tasks to execute")
return {}
logger.info(f"Executing {len(tasks)} tasks using {type(executor).__name__}")
# Submit tasks to executor
future_to_axis = {}
for axis_value, task in tasks.items():
future = executor.submit(processor_func, task.context, pipeline_definition)
future_to_axis[future] = axis_value
# Collect results
results = {}
for future in concurrent.futures.as_completed(future_to_axis):
axis_value = future_to_axis[future]
try:
result = future.result()
results[axis_value] = result
logger.debug(f"Task completed for {self.axis.value}: {axis_value}")
except Exception as e:
logger.error(f"Task failed for {self.axis.value} {axis_value}: {e}")
results[axis_value] = None
return results
The coordinator handles task submission, execution monitoring, and result collection generically.
Configuration Examples
Different parallelization strategies through component configuration.
# Well-based parallelization (traditional)
well_config = ComponentConfigurationFactory.create_configuration(
StandardComponents,
multiprocessing_axis=StandardComponents.WELL,
default_variable=[StandardComponents.SITE],
default_group_by=StandardComponents.CHANNEL
)
# Timepoint-based parallelization (temporal analysis)
temporal_config = ComponentConfigurationFactory.create_configuration(
TimeSeriesComponents,
multiprocessing_axis=TimeSeriesComponents.TIMEPOINT,
default_variable=[TimeSeriesComponents.WELL, TimeSeriesComponents.SITE],
default_group_by=TimeSeriesComponents.CHANNEL
)
Common Gotchas:
Component keys are cached on initialization - call
clear_component_cache()if input directory changesTask contexts must be serializable for ProcessPoolExecutor
Axis values are discovered from actual data, not enum definitions