Multiprocessing Coordination System

Overview

Traditional OpenHCS multiprocessing was hardcoded to use wells as the parallelization axis. The MultiprocessingCoordinator eliminates this assumption by providing axis-agnostic task coordination that can use any component for parallelization.

class MultiprocessingCoordinator(Generic[T]):
    def __init__(self, config: ComponentConfiguration[T]):
        self.config = config
        self.axis = config.multiprocessing_axis

    def create_tasks(self, orchestrator, axis_filter=None) -> Dict[str, Task[T]]:
        """Create tasks for multiprocessing axis values."""
        axis_values = orchestrator.get_component_keys(self.axis, axis_filter)
        return {value: Task(axis_value=value, context=orchestrator.create_context(value))
                for value in axis_values}

This enables optimal parallelization strategies - wells, timepoints, batches, or any other component - without code changes.

Task Coordination

The coordinator creates and executes tasks for any multiprocessing axis.

Task Creation

def create_tasks(self, orchestrator, axis_filter=None) -> Dict[str, Task[T]]:
    """Create tasks for multiprocessing axis values."""
    # Get axis values from orchestrator using the multiprocessing axis component directly
    axis_values = orchestrator.get_component_keys(self.axis, axis_filter)

    if not axis_values:
        logger.warning(f"No {self.axis.value} values found for multiprocessing")
        return {}

    # Create tasks
    tasks = {}
    for axis_value in axis_values:
        context = orchestrator.create_context(axis_value)
        tasks[axis_value] = Task(axis_value=axis_value, context=context)

    return tasks

Tasks are created dynamically based on the available component values for the configured multiprocessing axis.

Task Execution

def execute_tasks(self, tasks: Dict[str, Task[T]], pipeline_definition: List[Any],
                 executor, processor_func) -> Dict[str, Any]:
    """Execute tasks using the provided executor."""
    if not tasks:
        logger.warning("No tasks to execute")
        return {}

    logger.info(f"Executing {len(tasks)} tasks using {type(executor).__name__}")

    # Submit tasks to executor
    future_to_axis = {}
    for axis_value, task in tasks.items():
        future = executor.submit(processor_func, task.context, pipeline_definition)
        future_to_axis[future] = axis_value

    # Collect results
    results = {}
    for future in concurrent.futures.as_completed(future_to_axis):
        axis_value = future_to_axis[future]
        try:
            result = future.result()
            results[axis_value] = result
            logger.debug(f"Task completed for {self.axis.value}: {axis_value}")
        except Exception as e:
            logger.error(f"Task failed for {self.axis.value} {axis_value}: {e}")
            results[axis_value] = None

    return results

The coordinator handles task submission, execution monitoring, and result collection generically.

Configuration Examples

Different parallelization strategies through component configuration.

# Well-based parallelization (traditional)
well_config = ComponentConfigurationFactory.create_configuration(
    StandardComponents,
    multiprocessing_axis=StandardComponents.WELL,
    default_variable=[StandardComponents.SITE],
    default_group_by=StandardComponents.CHANNEL
)

# Timepoint-based parallelization (temporal analysis)
temporal_config = ComponentConfigurationFactory.create_configuration(
    TimeSeriesComponents,
    multiprocessing_axis=TimeSeriesComponents.TIMEPOINT,
    default_variable=[TimeSeriesComponents.WELL, TimeSeriesComponents.SITE],
    default_group_by=TimeSeriesComponents.CHANNEL
)

Common Gotchas:

  • Component keys are cached on initialization - call clear_component_cache() if input directory changes

  • Task contexts must be serializable for ProcessPoolExecutor

  • Axis values are discovered from actual data, not enum definitions