Core Module¶
The core module contains the main classes for flow execution.
FlowEngine¶
The main orchestrator for executing flows.
FlowEngine
¶
FlowEngine(config: FlowConfig, components: dict[str, BaseComponent], evaluator: Optional[ConditionEvaluator] = None, validate_types: bool = True, checkpoint_store: Any = None, hooks: list[Any] | None = None)
Executes a flow defined by a configuration.
The engine: 1. Loads components and their configurations 2. Executes steps in order 3. Evaluates conditions for conditional steps 4. Handles errors according to step configuration 5. Tracks execution metadata
Example
# Define components
components = {
"fetch": FetchComponent("fetch"),
"process": ProcessComponent("process"),
"save": SaveComponent("save"),
}
# Load configuration
config = ConfigLoader.load("flow.yaml")
# Create engine
engine = FlowEngine(config, components)
# Execute flow
context = FlowContext()
result = engine.execute(context)
print(result.to_json())
Initialize the flow engine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
FlowConfig
|
Parsed flow configuration |
required |
components
|
dict[str, BaseComponent]
|
Dictionary mapping component names to instances |
required |
evaluator
|
Optional[ConditionEvaluator]
|
Optional custom condition evaluator |
None
|
validate_types
|
bool
|
If True (default), validates that component instances match their declared type paths in the config |
True
|
checkpoint_store
|
Any
|
Optional CheckpointStore for pause/resume support |
None
|
hooks
|
list[Any] | None
|
Optional list of ExecutionHook instances for lifecycle callbacks |
None
|
Raises:
| Type | Description |
|---|---|
FlowExecutionError
|
If components are missing or invalid |
ConfigurationError
|
If validate_types is True and types don't match |
execute
¶
Execute the flow.
For sequential flows: executes all steps in order. For conditional flows: executes only the first step whose condition matches (first-match branching, like a switch/case statement). For graph flows: executes DAG with topological ordering and port routing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
Optional[FlowContext]
|
Optional existing context (creates new if None) |
None
|
input_data
|
Any
|
Optional input data to attach to context |
None
|
Returns:
| Type | Description |
|---|---|
FlowContext
|
Final flow context with all accumulated data |
Raises:
| Type | Description |
|---|---|
FlowExecutionError
|
If execution fails and fail_fast is True |
resume
¶
Resume a suspended flow from checkpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
checkpoint_id
|
str
|
ID from a previous suspended execution |
required |
resume_data
|
dict[str, Any] | None
|
Data to inject (e.g., approval decision) |
None
|
Returns:
| Type | Description |
|---|---|
FlowContext
|
FlowContext with execution completed from suspension point |
validate
¶
Validate the flow configuration.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of validation errors (empty if valid) |
dry_run
¶
Perform a dry run without executing components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
Optional[FlowContext]
|
Optional context for condition evaluation |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of steps/nodes that would be executed |
validate_component_types
¶
Validate that component instances match their declared types.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of type mismatch errors (empty if all valid) |
from_config
classmethod
¶
from_config(config: FlowConfig, evaluator: Optional[ConditionEvaluator] = None, registry: Optional[ComponentRegistry] = None, checkpoint_store: Any = None, hooks: list[Any] | None = None) -> FlowEngine
Create a FlowEngine by auto-instantiating components from config.
This method loads component classes from their type paths and creates instances automatically. Use this when you want YAML-complete flows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
FlowConfig
|
Parsed flow configuration |
required |
evaluator
|
Optional[ConditionEvaluator]
|
Optional custom condition evaluator |
None
|
registry
|
Optional[ComponentRegistry]
|
Optional pre-configured component registry |
None
|
checkpoint_store
|
Any
|
Optional CheckpointStore for pause/resume |
None
|
hooks
|
list[Any] | None
|
Optional list of ExecutionHook instances |
None
|
Returns:
| Type | Description |
|---|---|
FlowEngine
|
Configured FlowEngine instance |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If component types cannot be loaded |
BaseComponent¶
Abstract base class for all components.
BaseComponent
¶
Bases: ABC
Abstract base class for all flow components.
Components are the building blocks of flows. Each component: - Has a unique name within the flow - Receives configuration at initialization - Processes context and returns updated context
Lifecycle
- init(name) - Instance creation
- init(config) - Configuration initialization (once)
- setup(context) - Pre-processing (each run)
- process(context) - Main logic (each run)
- teardown(context) - Post-processing (each run)
Example
class GreetingComponent(BaseComponent):
def init(self, config: dict) -> None:
super().init(config)
self.greeting = config.get("greeting", "Hello")
def process(self, context: FlowContext) -> FlowContext:
name = context.get("name", "World")
context.set("message", f"{self.greeting}, {name}!")
return context
Initialize component with a name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Unique identifier for this component instance |
required |
is_async
property
¶
True if this component overrides process_async with custom logic.
init
¶
Initialize component with configuration.
Called once when the flow is set up. Override to perform one-time setup like creating clients or loading resources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary from YAML |
required |
setup
¶
Prepare for processing.
Called at the start of each flow execution, before process(). Override to perform per-run setup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
FlowContext
|
The current flow context |
required |
process
abstractmethod
¶
Execute component logic.
The main processing method. Must be implemented by subclasses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
FlowContext
|
Current flow context with accumulated data |
required |
Returns:
| Type | Description |
|---|---|
FlowContext
|
Updated flow context (may be the same instance) |
process_async
async
¶
Async processing. Override for native async operations.
Default implementation calls sync process() — no breaking change.
teardown
¶
Cleanup after processing.
Called after process() completes, even if it raised an exception. Override to perform cleanup like closing connections.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
FlowContext
|
The current flow context |
required |
validate_config
¶
Validate component configuration.
Override to add custom validation logic.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of validation error messages (empty if valid) |
health_check
¶
Check if component is healthy.
Override to add custom health checking.
Returns:
| Type | Description |
|---|---|
bool
|
True if component is operational |
check_deadline
¶
Check if the execution deadline has been exceeded.
Call this periodically in long-running process() methods to cooperatively support timeout enforcement. If the deadline has passed, raises FlowTimeoutError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
FlowContext
|
The current flow context |
required |
Raises:
| Type | Description |
|---|---|
FlowTimeoutError
|
If the deadline has been exceeded |
set_output_port
¶
Signal which output port to activate.
Use in components with multiple outputs (e.g., Condition, Switch). The graph executor will route to edges matching this port.
FlowContext¶
Context object passed through all components.
FlowContext
dataclass
¶
FlowContext(data: DotDict = DotDict(), metadata: ExecutionMetadata = ExecutionMetadata(), input: Any = None, _active_port: Optional[str] = None)
Context object passed through all components in a flow.
The context accumulates data as it passes through components. Each component can read from and write to the context.
Attributes:
| Name | Type | Description |
|---|---|---|
data |
DotDict
|
Main data container with attribute-style access |
metadata |
ExecutionMetadata
|
Execution metadata (timings, errors, etc.) |
input |
Any
|
Optional initial input data |
Example
metadata
class-attribute
instance-attribute
¶
set
¶
Set a value in the data container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Key to set |
required |
value
|
Any
|
Value to store |
required |
get
¶
Get a value from the data container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Key to retrieve |
required |
default
|
Any
|
Default value if key not found |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Value or default |
has
¶
Check if a key exists in the data container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Key to check |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if key exists |
delete
¶
Delete a key from the data container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Key to delete |
required |
set_port
¶
Set the active output port for the current node.
Called by components like Condition to signal which branch to take. The graph executor reads this to determine edge routing.
get_active_port
¶
Get the active output port (used by graph executor).
suspend
¶
Suspend execution at the current node.
Called by components like HumanApproval to pause the workflow. The executor checks metadata.suspended after each node.
to_dict
¶
Convert context to dictionary.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation of context |
to_json
¶
Serialize context to JSON string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
indent
|
int
|
JSON indentation level |
2
|
Returns:
| Type | Description |
|---|---|
str
|
JSON string representation |
from_dict
classmethod
¶
Create context from dictionary.
Supports full round-trip: from_dict(to_dict()) preserves all fields.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary with context data |
required |
Returns:
| Type | Description |
|---|---|
FlowContext
|
New FlowContext instance |
from_json
classmethod
¶
Create context from JSON string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_str
|
str
|
JSON string representation |
required |
Returns:
| Type | Description |
|---|---|
FlowContext
|
New FlowContext instance |
copy
¶
Create a shallow copy of the context.
Returns:
| Type | Description |
|---|---|
FlowContext
|
New FlowContext with copied data |
DotDict¶
Dictionary with attribute-style access.
DotDict
¶
Dictionary with attribute-style access.
Allows accessing nested dictionary values using dot notation instead of bracket notation.
Example
Initialize DotDict with optional data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Optional[dict[str, Any]]
|
Initial dictionary data (defaults to empty dict) |
None
|
get
¶
Get value with default fallback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Key to retrieve |
required |
default
|
Any
|
Default value if key not found |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Value at key or default |
keys
¶
Get all keys in the data.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of keys |
values
¶
Get all values in the data.
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of values |
items
¶
Get all key-value pairs.
Returns:
| Type | Description |
|---|---|
list[tuple[str, Any]]
|
List of (key, value) tuples |
update
¶
Update with dictionary values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary to merge |
required |
to_dict
¶
Convert to regular dictionary.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Copy of underlying data dictionary |
ExecutionMetadata¶
Tracks timing, errors, and execution state.
ExecutionMetadata
dataclass
¶
ExecutionMetadata(flow_id: str = (lambda: str(uuid.uuid4()))(), started_at: datetime = _utc_now(), completed_at: Optional[datetime] = None, step_timings: list[StepTiming] = list(), skipped_components: list[str] = list(), errors: list[dict[str, Any]] = list(), condition_errors: list[dict[str, Any]] = list(), deadline: Optional[float] = None, deadline_checked: bool = False, suspended: bool = False, suspended_at_node: Optional[str] = None, suspension_reason: Optional[str] = None, completed_nodes: list[str] = list(), node_visit_counts: dict[str, int] = dict(), iteration_count: int = 0, loop_timings: list[dict[str, Any]] = list(), max_iterations_reached: bool = False, converged: bool = False, _step_counter: int = 0)
Metadata about flow execution.
Tracks timing information, errors, and which components were skipped during flow execution.
Attributes:
| Name | Type | Description |
|---|---|---|
flow_id |
str
|
Unique identifier for this flow execution |
started_at |
datetime
|
When execution started |
completed_at |
Optional[datetime]
|
When execution completed (None if still running) |
step_timings |
list[StepTiming]
|
List of timing info for each executed step |
skipped_components |
list[str]
|
Names of components skipped due to conditions |
errors |
list[dict[str, Any]]
|
List of error details from failed components |
condition_errors |
list[dict[str, Any]]
|
List of condition evaluation errors |
deadline |
Optional[float]
|
Absolute time (time.time()) by which current step must complete. Set by engine before each step, cleared after. Components can check this via BaseComponent.check_deadline() to cooperatively timeout. |
flow_id
class-attribute
instance-attribute
¶
started_at
class-attribute
instance-attribute
¶
step_timings
class-attribute
instance-attribute
¶
skipped_components
class-attribute
instance-attribute
¶
errors
class-attribute
instance-attribute
¶
condition_errors
class-attribute
instance-attribute
¶
component_timings
property
¶
Get aggregated timings by component name.
If a component runs multiple times, returns the sum of all durations.
Returns:
| Type | Description |
|---|---|
dict[str, float]
|
Dictionary mapping component names to total execution time |
has_errors
property
¶
Check if any errors were recorded.
Returns:
| Type | Description |
|---|---|
bool
|
True if there are recorded errors |
has_condition_errors
property
¶
Check if any condition evaluation errors were recorded.
Returns:
| Type | Description |
|---|---|
bool
|
True if there are recorded condition errors |
total_duration
property
¶
Get total execution duration in seconds.
Returns:
| Type | Description |
|---|---|
Optional[float]
|
Duration in seconds, or None if not completed |
completed_nodes
class-attribute
instance-attribute
¶
add_error
¶
Record an error from a component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component
|
str
|
Name of the component that errored |
required |
error
|
Exception
|
The exception that was raised |
required |
add_condition_error
¶
Record a condition evaluation error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component
|
str
|
Name of the component with the failed condition |
required |
error
|
Exception
|
The exception that was raised |
required |
condition
|
str
|
The condition expression that failed |
required |
record_timing
¶
record_timing(component: str, seconds: float, started_at: Optional[datetime] = None, step_index: Optional[int] = None) -> None
Record execution time for a step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component
|
str
|
Name of the component |
required |
seconds
|
float
|
Execution time in seconds |
required |
started_at
|
Optional[datetime]
|
When the step started (defaults to now) |
None
|
step_index
|
Optional[int]
|
Position in the flow definition (0-based). If not provided, uses execution order as step_index for backward compatibility. |
None
|
StepTiming¶
Timing information for a single step execution.
StepTiming
dataclass
¶
StepTiming(step_index: int, component: str, duration: float, started_at: datetime, execution_order: int = 0)
Timing information for a single step execution.
Attributes:
| Name | Type | Description |
|---|---|---|
step_index |
int
|
Index of the step in the flow definition (0-based, matches config order) |
component |
str
|
Name of the component executed |
duration |
float
|
Execution time in seconds |
started_at |
datetime
|
When the step started executing |
execution_order |
int
|
Order in which this step was executed (0-based, skips excluded) |
GraphExecutor¶
Executes graph-type flows using topological ordering with port-based routing.
GraphExecutor
¶
GraphExecutor(nodes: list[GraphNodeConfig], edges: list[GraphEdgeConfig], components: dict[str, BaseComponent], settings: FlowSettings, hooks: list[Any] | None = None)
Executes a graph-type flow using topological ordering with port-based routing.
For DAG graphs, uses topological sort (Kahn's algorithm) for execution order. For cyclic graphs, uses a ready-queue executor with iteration limits.
Key semantics: - Root nodes (no incoming edges) execute first — these are triggers/entry points - Port routing: When a component sets active_port, only matching edges propagate - Unreachable nodes: If no incoming edge was activated, the node is skipped - Completed nodes: Nodes in completed_nodes are skipped (for resume support)
execute
¶
Execute the graph flow.
Dispatches to the DAG executor for acyclic graphs, or the cyclic executor for graphs containing cycles.
ExecutionHook¶
Protocol for step lifecycle hooks. Implement any or all methods.
ExecutionHook
¶
Checkpoint¶
Serializable snapshot of flow execution state for suspend/resume.
Checkpoint
dataclass
¶
Checkpoint(flow_config: dict[str, Any], context: dict[str, Any], created_at: str = (lambda: datetime.now(timezone.utc).isoformat())(), checkpoint_id: str = (lambda: str(uuid.uuid4()))())
CheckpointStore¶
Abstract base class for checkpoint persistence.
CheckpointStore
¶
Bases: ABC
Abstract checkpoint persistence.
load
abstractmethod
¶
Load a checkpoint by ID. Returns None if not found.
InMemoryCheckpointStore¶
In-memory implementation of CheckpointStore.
InMemoryCheckpointStore
¶
Bases: CheckpointStore
Default in-memory store for testing. Production uses DB-backed store.