System Integration: VFS, Memory Types, and Compilation
The Problem: Fragmented Data Processing Systems
Scientific image processing requires managing multiple concerns simultaneously: where data is stored (disk, OMERO, cloud), what format it’s in (NumPy, PyTorch, CuPy), and how to process it efficiently (GPU allocation, memory staging). Without integration, these systems become isolated silos, forcing users to write glue code and manage conversions manually. This creates brittle pipelines that break when switching storage backends or computational libraries.
The Solution: Integrated Three-Layer Architecture
OpenHCS integrates three core systems (VFS, Memory Types, Compilation) into a cohesive architecture where each layer handles one concern and passes results to the next. This enables the same pipeline code to work with different storage backends and computational libraries without modification.
Overview
This document describes how the three core systems of OpenHCS work together to provide seamless data processing: the Virtual File System (VFS), the Memory Type System, and the Pipeline Compilation System.
Note: This document describes the actual system integration implementation. Some optimization strategies are planned for future development.
Integration Architecture
The Four-Layer Architecture Stack
┌─────────────────────────────────────────────────────────────┐
│ Configuration Management │
│ • Hierarchical config flow (Global → Context → Steps) │
│ • Immutable dataclasses with pull-based access │
│ • Live configuration updates and validation │
│ • Backend selection and path planning configuration │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Pipeline Compilation │
│ • Function pattern resolution │
│ • Memory type extraction from decorators │
│ • Step plan generation and validation │
│ • Resource allocation (GPU, backends) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Memory Type System │
│ • Cross-library array conversion (numpy ↔ torch ↔ cupy) │
│ • GPU device management and placement │
│ • Stack/unstack operations with type conversion │
│ • Strict validation and error handling │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Virtual File System (VFS) │
│ • Backend abstraction (disk, memory, zarr) │
│ • Format-specific serialization/deserialization │
│ • Location transparency and path virtualization │
│ • Type-aware storage operations │
└─────────────────────────────────────────────────────────────┘
Data Flow Coordination
The three systems coordinate to provide end-to-end data processing:
Compilation Phase: Determines what conversions are needed
Execution Phase: Orchestrates conversions and processing
Storage Phase: Handles persistence and retrieval
Compilation-Time Integration
Memory Type Planning
During compilation, the system extracts memory type requirements and plans conversions:
# Phase 1: Extract memory types from function decorators
def analyze_function_memory_requirements(func_pattern):
"""Extract memory types from function patterns."""
if isinstance(func_pattern, dict):
# Component-specific: different memory types per component
return {
component: {
'input_type': getattr(func, 'input_memory_type'),
'output_type': getattr(func, 'output_memory_type')
}
for component, func in func_pattern.items()
}
elif isinstance(func_pattern, list):
# Sequential: validate consistency across chain
memory_types = []
for func in func_pattern:
input_type = getattr(func, 'input_memory_type')
output_type = getattr(func, 'output_memory_type')
memory_types.append((input_type, output_type))
# Validate chain compatibility
for i in range(len(memory_types) - 1):
if memory_types[i][1] != memory_types[i+1][0]:
raise MemoryTypeError(f"Incompatible memory types in chain")
return memory_types[0][0], memory_types[-1][1] # First input, last output
else:
# Single function
return getattr(func_pattern, 'input_memory_type'), getattr(func_pattern, 'output_memory_type')
Backend Selection Coordination
The compiler coordinates memory types with VFS backend selection:
def plan_backend_selection(step_position, memory_types, data_size):
"""Coordinate backend selection with memory type requirements."""
# First step: must read from disk
if step_position == 0:
read_backend = "disk"
else:
# Intermediate steps can use memory backend
read_backend = "memory"
# Last step: must write to disk
if step_position == last_position:
write_backend = "disk"
else:
# GPU memory types benefit from memory backend
if memory_types['output_type'] in GPU_MEMORY_TYPES:
write_backend = "memory" # Keep on GPU
else:
write_backend = "memory" # Avoid disk I/O
return read_backend, write_backend
Step Plan Generation
The compiler generates comprehensive step plans that coordinate all systems:
step_plan = {
# Basic metadata
"step_name": "GPU Image Processing",
"step_type": "FunctionStep",
"well_id": "A01",
# VFS configuration
"input_dir": "/workspace/A01/input",
"output_dir": "/workspace/A01/step1_out",
"read_backend": "disk", # From backend planner
"write_backend": "memory", # From backend planner
# Memory type configuration
"input_memory_type": "torch", # From function decorator
"output_memory_type": "torch", # From function decorator
"gpu_id": 0, # From GPU resource planner
# Function pattern configuration
"func_pattern": gpu_processing_func,
"variable_components": ["site"],
"group_by": "channel",
# Special I/O configuration
"special_inputs": {
"positions": {
"path": "/vfs/positions.pkl",
"backend": "memory"
}
},
"special_outputs": {
"metadata": {
"path": "/vfs/metadata.pkl",
"backend": "memory"
}
}
}
Runtime Integration
Complete Execution Flow
During execution, the three systems work together seamlessly:
def process(self, context: ProcessingContext, step_index: int):
"""Complete execution flow showing system integration (FunctionStep.process)."""
step_plan = context.step_plans[step_index]
# 1. VFS: Load images from storage
raw_slices = []
for file_path in matching_files:
# VFS handles format-specific deserialization
image = context.filemanager.load_image(
file_path,
step_plan['read_backend']
)
raw_slices.append(image) # Usually numpy arrays from TIFF
# 2. Memory System: Stack with type conversion
image_stack = stack_slices(
slices=raw_slices,
memory_type=step_plan['input_memory_type'], # torch
gpu_id=step_plan.get('gpu_id') # 0 or None
)
# Result: torch.Tensor on GPU 0
# 3. Load special inputs (if any)
special_kwargs = {}
for input_name, input_config in step_plan['special_inputs'].items():
# VFS loads from specified backend
special_data = context.filemanager.load(
input_config['path'],
input_config['backend']
)
special_kwargs[input_name] = special_data
# 4. Execute function in native memory type
# Function pattern resolution handled by prepare_patterns_and_functions
result_stack = self._execute_function_core(image_stack, step_plan, context, **special_kwargs)
# Function operates entirely in torch on GPU
# 5. Handle special outputs (if any)
if hasattr(result_stack, '__len__') and len(result_stack) > 1:
main_result = result_stack[0]
special_outputs = result_stack[1:]
for i, (output_name, output_config) in enumerate(step_plan['special_outputs'].items()):
# VFS saves to specified backend
context.filemanager.save(
special_outputs[i],
output_config['path'],
output_config['backend']
)
else:
main_result = result_stack
# 6. Memory System: Unstack with type conversion
output_slices = unstack_slices(
array=main_result,
memory_type=step_plan['output_memory_type'], # torch
gpu_id=step_plan['gpu_id'] # 0
)
# Result: List of torch.Tensor on GPU 0
# 7. VFS: Save images to storage
for i, slice_2d in enumerate(output_slices):
output_path = step_plan['output_dir'] / f"processed_{i:03d}.tif"
# VFS handles memory type conversion for disk storage
context.filemanager.save_image(
slice_2d, # torch.Tensor
output_path,
step_plan['write_backend'] # memory or disk
)
# If disk: automatically converts torch → numpy → TIFF
# If memory: stores torch.Tensor directly
Automatic Conversion Points
The systems automatically handle conversions at key integration points:
VFS ↔ Memory Type Integration
# VFS automatically detects and converts memory types for disk storage
def save_image_with_conversion(data, path, backend):
if backend == "disk":
# Convert any memory type to numpy for TIFF
if isinstance(data, torch.Tensor):
numpy_data = data.cpu().numpy()
elif hasattr(data, 'get'): # CuPy
numpy_data = data.get()
elif hasattr(data, 'device_get'): # JAX
numpy_data = jax.device_get(data)
else:
numpy_data = data # Already numpy
# VFS handles TIFF serialization
tifffile.imwrite(path, numpy_data)
elif backend == "memory":
# Store in original memory type
memory_store[path] = data
Memory Type ↔ Compilation Integration
# Compilation system coordinates memory types across steps
def plan_inter_step_conversions(pipeline_steps):
"""Plan memory type conversions between pipeline steps."""
conversions = []
for i in range(len(pipeline_steps) - 1):
current_step = pipeline_steps[i]
next_step = pipeline_steps[i + 1]
current_output = current_step.output_memory_type
next_input = next_step.input_memory_type
if current_output != next_input:
# Plan conversion in the intermediate storage
conversion = {
'from_type': current_output,
'to_type': next_input,
'location': 'intermediate_storage',
'method': 'automatic'
}
conversions.append(conversion)
return conversions
Performance Optimization
Conversion Minimization
The integrated system minimizes unnecessary conversions:
# Optimal: Keep data in same memory type across steps
pipeline = [
FunctionStep(func=torch_preprocess), # torch → torch
FunctionStep(func=torch_process), # torch → torch
FunctionStep(func=torch_postprocess) # torch → torch
]
# Result: No memory type conversions, data stays on GPU
# Suboptimal: Mixed memory types cause conversions
pipeline = [
FunctionStep(func=numpy_preprocess), # numpy → numpy
FunctionStep(func=torch_process), # torch → torch (conversion!)
FunctionStep(func=numpy_postprocess) # numpy → numpy (conversion!)
]
# Result: 2 conversions (numpy→torch, torch→numpy)
Backend Selection Strategy
def optimize_backend_selection(step_plans):
"""Optimize backend selection for performance."""
for i, step_plan in enumerate(step_plans):
# GPU memory types benefit from memory backend
if step_plan['input_memory_type'] in GPU_MEMORY_TYPES:
if i > 0: # Not first step
step_plan['read_backend'] = 'memory'
if step_plan['output_memory_type'] in GPU_MEMORY_TYPES:
if i < len(step_plans) - 1: # Not last step
step_plan['write_backend'] = 'memory'
# Large data benefits from streaming
if step_plan['estimated_data_size'] > LARGE_DATA_THRESHOLD:
step_plan['streaming_enabled'] = True
Error Handling and Validation
Cross-System Validation
The integrated system validates compatibility across all layers:
def validate_system_integration(step_plans):
"""Validate integration across VFS, memory types, and compilation."""
for step_plan in step_plans:
# Validate memory type compatibility
if step_plan['input_memory_type'] not in SUPPORTED_MEMORY_TYPES:
raise ValidationError(f"Unsupported input memory type: {step_plan['input_memory_type']}")
# Validate backend compatibility
if step_plan['read_backend'] not in SUPPORTED_BACKENDS:
raise ValidationError(f"Unsupported read backend: {step_plan['read_backend']}")
# Validate GPU requirements
if step_plan['input_memory_type'] in GPU_MEMORY_TYPES:
if step_plan['gpu_id'] is None:
raise ValidationError("GPU memory type requires gpu_id")
# Validate special I/O paths
for special_input in step_plan.get('special_inputs', {}).values():
if not validate_vfs_path(special_input['path'], special_input['backend']):
raise ValidationError(f"Invalid special input path: {special_input['path']}")
Error Recovery
def handle_conversion_errors(data, source_type, target_type, allow_fallback=True):
"""Handle memory type conversion errors with fallback strategies."""
try:
# Attempt direct conversion
return convert_memory_type(data, source_type, target_type)
except MemoryConversionError as e:
if allow_fallback:
# Fallback to CPU roundtrip
logger.warning(f"Direct conversion failed, using CPU fallback: {e}")
cpu_data = convert_to_cpu(data, source_type)
return convert_from_cpu(cpu_data, target_type)
else:
raise
except Exception as e:
# Log detailed error information
logger.error(f"Conversion failed: {source_type} → {target_type}")
logger.error(f"Data shape: {getattr(data, 'shape', 'unknown')}")
logger.error(f"Data type: {type(data)}")
raise MemoryConversionError(f"Failed to convert {source_type} to {target_type}") from e
Current Implementation Status
Implemented Features
✅ Four-layer architecture stack with Configuration, Compilation, Memory Types, and VFS
✅ Comprehensive step plan generation with backend and memory type coordination
✅ MaterializationFlagPlanner for intelligent backend selection
✅ Memory type extraction and validation during compilation
✅ VFS integration with FileManager and multiple storage backends
✅ Runtime execution flow with stack/unstack operations and type conversion
✅ Special I/O integration using VFS memory backend for cross-step communication
✅ Cross-system validation and error handling
Future Enhancements
Automatic Memory Type Selection: Based on data size and available resources
Streaming Processing: Handle datasets larger than memory across all systems
Performance Optimization: Intelligent backend selection and conversion minimization
Distributed Processing: Coordinate memory types across multiple nodes
Advanced Error Recovery: Fallback strategies for conversion failures
Memory Pool Management: Efficient GPU memory reuse across steps
Resource Prediction: Predict memory and storage requirements before execution