Skip to content

Core Module

The core module contains the main classes for flow execution.


FlowEngine

The main orchestrator for executing flows.

FlowEngine

FlowEngine(config: FlowConfig, components: dict[str, BaseComponent], evaluator: Optional[ConditionEvaluator] = None, validate_types: bool = True, checkpoint_store: Any = None, hooks: list[Any] | None = None)

Executes a flow defined by a configuration.

The engine: 1. Loads components and their configurations 2. Executes steps in order 3. Evaluates conditions for conditional steps 4. Handles errors according to step configuration 5. Tracks execution metadata

Example
# Define components
components = {
    "fetch": FetchComponent("fetch"),
    "process": ProcessComponent("process"),
    "save": SaveComponent("save"),
}

# Load configuration
config = ConfigLoader.load("flow.yaml")

# Create engine
engine = FlowEngine(config, components)

# Execute flow
context = FlowContext()
result = engine.execute(context)

print(result.to_json())

Initialize the flow engine.

Parameters:

Name Type Description Default
config FlowConfig

Parsed flow configuration

required
components dict[str, BaseComponent]

Dictionary mapping component names to instances

required
evaluator Optional[ConditionEvaluator]

Optional custom condition evaluator

None
validate_types bool

If True (default), validates that component instances match their declared type paths in the config

True
checkpoint_store Any

Optional CheckpointStore for pause/resume support

None
hooks list[Any] | None

Optional list of ExecutionHook instances for lifecycle callbacks

None

Raises:

Type Description
FlowExecutionError

If components are missing or invalid

ConfigurationError

If validate_types is True and types don't match

execute

execute(context: Optional[FlowContext] = None, input_data: Any = None) -> FlowContext

Execute the flow.

For sequential flows: executes all steps in order. For conditional flows: executes only the first step whose condition matches (first-match branching, like a switch/case statement). For graph flows: executes DAG with topological ordering and port routing.

Parameters:

Name Type Description Default
context Optional[FlowContext]

Optional existing context (creates new if None)

None
input_data Any

Optional input data to attach to context

None

Returns:

Type Description
FlowContext

Final flow context with all accumulated data

Raises:

Type Description
FlowExecutionError

If execution fails and fail_fast is True

resume

resume(checkpoint_id: str, resume_data: dict[str, Any] | None = None) -> FlowContext

Resume a suspended flow from checkpoint.

Parameters:

Name Type Description Default
checkpoint_id str

ID from a previous suspended execution

required
resume_data dict[str, Any] | None

Data to inject (e.g., approval decision)

None

Returns:

Type Description
FlowContext

FlowContext with execution completed from suspension point

validate

validate() -> list[str]

Validate the flow configuration.

Returns:

Type Description
list[str]

List of validation errors (empty if valid)

dry_run

dry_run(context: Optional[FlowContext] = None) -> list[str]

Perform a dry run without executing components.

Parameters:

Name Type Description Default
context Optional[FlowContext]

Optional context for condition evaluation

None

Returns:

Type Description
list[str]

List of steps/nodes that would be executed

validate_component_types

validate_component_types() -> list[str]

Validate that component instances match their declared types.

Returns:

Type Description
list[str]

List of type mismatch errors (empty if all valid)

from_config classmethod

from_config(config: FlowConfig, evaluator: Optional[ConditionEvaluator] = None, registry: Optional[ComponentRegistry] = None, checkpoint_store: Any = None, hooks: list[Any] | None = None) -> FlowEngine

Create a FlowEngine by auto-instantiating components from config.

This method loads component classes from their type paths and creates instances automatically. Use this when you want YAML-complete flows.

Parameters:

Name Type Description Default
config FlowConfig

Parsed flow configuration

required
evaluator Optional[ConditionEvaluator]

Optional custom condition evaluator

None
registry Optional[ComponentRegistry]

Optional pre-configured component registry

None
checkpoint_store Any

Optional CheckpointStore for pause/resume

None
hooks list[Any] | None

Optional list of ExecutionHook instances

None

Returns:

Type Description
FlowEngine

Configured FlowEngine instance

Raises:

Type Description
ConfigurationError

If component types cannot be loaded

Example
config = ConfigLoader.load("flow.yaml")
engine = FlowEngine.from_config(config)
result = engine.execute()

BaseComponent

Abstract base class for all components.

BaseComponent

BaseComponent(name: str)

Bases: ABC

Abstract base class for all flow components.

Components are the building blocks of flows. Each component: - Has a unique name within the flow - Receives configuration at initialization - Processes context and returns updated context

Lifecycle
  1. init(name) - Instance creation
  2. init(config) - Configuration initialization (once)
  3. setup(context) - Pre-processing (each run)
  4. process(context) - Main logic (each run)
  5. teardown(context) - Post-processing (each run)
Example
class GreetingComponent(BaseComponent):
    def init(self, config: dict) -> None:
        super().init(config)
        self.greeting = config.get("greeting", "Hello")

    def process(self, context: FlowContext) -> FlowContext:
        name = context.get("name", "World")
        context.set("message", f"{self.greeting}, {name}!")
        return context

Initialize component with a name.

Parameters:

Name Type Description Default
name str

Unique identifier for this component instance

required

name property

name: str

Component's unique name.

config property

config: dict[str, Any]

Component's configuration dictionary.

is_initialized property

is_initialized: bool

Whether init() has been called.

is_async property

is_async: bool

True if this component overrides process_async with custom logic.

init

init(config: dict[str, Any]) -> None

Initialize component with configuration.

Called once when the flow is set up. Override to perform one-time setup like creating clients or loading resources.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary from YAML

required

setup

setup(context: FlowContext) -> None

Prepare for processing.

Called at the start of each flow execution, before process(). Override to perform per-run setup.

Parameters:

Name Type Description Default
context FlowContext

The current flow context

required

process abstractmethod

process(context: FlowContext) -> FlowContext

Execute component logic.

The main processing method. Must be implemented by subclasses.

Parameters:

Name Type Description Default
context FlowContext

Current flow context with accumulated data

required

Returns:

Type Description
FlowContext

Updated flow context (may be the same instance)

process_async async

process_async(context: FlowContext) -> FlowContext

Async processing. Override for native async operations.

Default implementation calls sync process() — no breaking change.

teardown

teardown(context: FlowContext) -> None

Cleanup after processing.

Called after process() completes, even if it raised an exception. Override to perform cleanup like closing connections.

Parameters:

Name Type Description Default
context FlowContext

The current flow context

required

validate_config

validate_config() -> list[str]

Validate component configuration.

Override to add custom validation logic.

Returns:

Type Description
list[str]

List of validation error messages (empty if valid)

health_check

health_check() -> bool

Check if component is healthy.

Override to add custom health checking.

Returns:

Type Description
bool

True if component is operational

check_deadline

check_deadline(context: FlowContext) -> None

Check if the execution deadline has been exceeded.

Call this periodically in long-running process() methods to cooperatively support timeout enforcement. If the deadline has passed, raises FlowTimeoutError.

Parameters:

Name Type Description Default
context FlowContext

The current flow context

required

Raises:

Type Description
FlowTimeoutError

If the deadline has been exceeded

Example
def process(self, context: FlowContext) -> FlowContext:
    for item in large_dataset:
        self.check_deadline(context)  # Check periodically
        process_item(item)
    return context

set_output_port

set_output_port(context: FlowContext, port: str) -> None

Signal which output port to activate.

Use in components with multiple outputs (e.g., Condition, Switch). The graph executor will route to edges matching this port.


FlowContext

Context object passed through all components.

FlowContext dataclass

FlowContext(data: DotDict = DotDict(), metadata: ExecutionMetadata = ExecutionMetadata(), input: Any = None, _active_port: Optional[str] = None)

Context object passed through all components in a flow.

The context accumulates data as it passes through components. Each component can read from and write to the context.

Attributes:

Name Type Description
data DotDict

Main data container with attribute-style access

metadata ExecutionMetadata

Execution metadata (timings, errors, etc.)

input Any

Optional initial input data

Example
context = FlowContext()

# Set values
context.set("user", {"name": "Alice", "age": 30})

# Get values with dot notation
print(context.data.user.name)  # "Alice"

# Check for values
print(context.get("missing", "default"))  # "default"

# Serialize for debugging
print(context.to_json())

data class-attribute instance-attribute

data: DotDict = field(default_factory=DotDict)

metadata class-attribute instance-attribute

metadata: ExecutionMetadata = field(default_factory=ExecutionMetadata)

input class-attribute instance-attribute

input: Any = None

set

set(key: str, value: Any) -> None

Set a value in the data container.

Parameters:

Name Type Description Default
key str

Key to set

required
value Any

Value to store

required

get

get(key: str, default: Any = None) -> Any

Get a value from the data container.

Parameters:

Name Type Description Default
key str

Key to retrieve

required
default Any

Default value if key not found

None

Returns:

Type Description
Any

Value or default

has

has(key: str) -> bool

Check if a key exists in the data container.

Parameters:

Name Type Description Default
key str

Key to check

required

Returns:

Type Description
bool

True if key exists

delete

delete(key: str) -> None

Delete a key from the data container.

Parameters:

Name Type Description Default
key str

Key to delete

required

set_port

set_port(port: str) -> None

Set the active output port for the current node.

Called by components like Condition to signal which branch to take. The graph executor reads this to determine edge routing.

get_active_port

get_active_port() -> Optional[str]

Get the active output port (used by graph executor).

clear_port

clear_port() -> None

Reset active port (called by executor between nodes).

suspend

suspend(node_id: str, reason: str = '') -> None

Suspend execution at the current node.

Called by components like HumanApproval to pause the workflow. The executor checks metadata.suspended after each node.

to_dict

to_dict() -> dict[str, Any]

Convert context to dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation of context

to_json

to_json(indent: int = 2) -> str

Serialize context to JSON string.

Parameters:

Name Type Description Default
indent int

JSON indentation level

2

Returns:

Type Description
str

JSON string representation

from_dict classmethod

from_dict(data: dict[str, Any]) -> FlowContext

Create context from dictionary.

Supports full round-trip: from_dict(to_dict()) preserves all fields.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with context data

required

Returns:

Type Description
FlowContext

New FlowContext instance

from_json classmethod

from_json(json_str: str) -> FlowContext

Create context from JSON string.

Parameters:

Name Type Description Default
json_str str

JSON string representation

required

Returns:

Type Description
FlowContext

New FlowContext instance

copy

copy() -> FlowContext

Create a shallow copy of the context.

Returns:

Type Description
FlowContext

New FlowContext with copied data


DotDict

Dictionary with attribute-style access.

DotDict

DotDict(data: Optional[dict[str, Any]] = None)

Dictionary with attribute-style access.

Allows accessing nested dictionary values using dot notation instead of bracket notation.

Example
d = DotDict({"user": {"name": "Alice"}})
print(d.user.name)  # "Alice"
d.user.age = 30
print(d.to_dict())  # {"user": {"name": "Alice", "age": 30}}

Initialize DotDict with optional data.

Parameters:

Name Type Description Default
data Optional[dict[str, Any]]

Initial dictionary data (defaults to empty dict)

None

get

get(key: str, default: Any = None) -> Any

Get value with default fallback.

Parameters:

Name Type Description Default
key str

Key to retrieve

required
default Any

Default value if key not found

None

Returns:

Type Description
Any

Value at key or default

keys

keys() -> list[str]

Get all keys in the data.

Returns:

Type Description
list[str]

List of keys

values

values() -> list[Any]

Get all values in the data.

Returns:

Type Description
list[Any]

List of values

items

items() -> list[tuple[str, Any]]

Get all key-value pairs.

Returns:

Type Description
list[tuple[str, Any]]

List of (key, value) tuples

update

update(data: dict[str, Any]) -> None

Update with dictionary values.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary to merge

required

to_dict

to_dict() -> dict[str, Any]

Convert to regular dictionary.

Returns:

Type Description
dict[str, Any]

Copy of underlying data dictionary


ExecutionMetadata

Tracks timing, errors, and execution state.

ExecutionMetadata dataclass

ExecutionMetadata(flow_id: str = (lambda: str(uuid.uuid4()))(), started_at: datetime = _utc_now(), completed_at: Optional[datetime] = None, step_timings: list[StepTiming] = list(), skipped_components: list[str] = list(), errors: list[dict[str, Any]] = list(), condition_errors: list[dict[str, Any]] = list(), deadline: Optional[float] = None, deadline_checked: bool = False, suspended: bool = False, suspended_at_node: Optional[str] = None, suspension_reason: Optional[str] = None, completed_nodes: list[str] = list(), node_visit_counts: dict[str, int] = dict(), iteration_count: int = 0, loop_timings: list[dict[str, Any]] = list(), max_iterations_reached: bool = False, converged: bool = False, _step_counter: int = 0)

Metadata about flow execution.

Tracks timing information, errors, and which components were skipped during flow execution.

Attributes:

Name Type Description
flow_id str

Unique identifier for this flow execution

started_at datetime

When execution started

completed_at Optional[datetime]

When execution completed (None if still running)

step_timings list[StepTiming]

List of timing info for each executed step

skipped_components list[str]

Names of components skipped due to conditions

errors list[dict[str, Any]]

List of error details from failed components

condition_errors list[dict[str, Any]]

List of condition evaluation errors

deadline Optional[float]

Absolute time (time.time()) by which current step must complete. Set by engine before each step, cleared after. Components can check this via BaseComponent.check_deadline() to cooperatively timeout.

flow_id class-attribute instance-attribute

flow_id: str = field(default_factory=lambda: str(uuid4()))

started_at class-attribute instance-attribute

started_at: datetime = field(default_factory=_utc_now)

completed_at class-attribute instance-attribute

completed_at: Optional[datetime] = None

step_timings class-attribute instance-attribute

step_timings: list[StepTiming] = field(default_factory=list)

skipped_components class-attribute instance-attribute

skipped_components: list[str] = field(default_factory=list)

errors class-attribute instance-attribute

errors: list[dict[str, Any]] = field(default_factory=list)

condition_errors class-attribute instance-attribute

condition_errors: list[dict[str, Any]] = field(default_factory=list)

component_timings property

component_timings: dict[str, float]

Get aggregated timings by component name.

If a component runs multiple times, returns the sum of all durations.

Returns:

Type Description
dict[str, float]

Dictionary mapping component names to total execution time

has_errors property

has_errors: bool

Check if any errors were recorded.

Returns:

Type Description
bool

True if there are recorded errors

has_condition_errors property

has_condition_errors: bool

Check if any condition evaluation errors were recorded.

Returns:

Type Description
bool

True if there are recorded condition errors

total_duration property

total_duration: Optional[float]

Get total execution duration in seconds.

Returns:

Type Description
Optional[float]

Duration in seconds, or None if not completed

suspended class-attribute instance-attribute

suspended: bool = False

suspended_at_node class-attribute instance-attribute

suspended_at_node: Optional[str] = None

suspension_reason class-attribute instance-attribute

suspension_reason: Optional[str] = None

completed_nodes class-attribute instance-attribute

completed_nodes: list[str] = field(default_factory=list)

add_error

add_error(component: str, error: Exception) -> None

Record an error from a component.

Parameters:

Name Type Description Default
component str

Name of the component that errored

required
error Exception

The exception that was raised

required

add_condition_error

add_condition_error(component: str, error: Exception, condition: str) -> None

Record a condition evaluation error.

Parameters:

Name Type Description Default
component str

Name of the component with the failed condition

required
error Exception

The exception that was raised

required
condition str

The condition expression that failed

required

record_timing

record_timing(component: str, seconds: float, started_at: Optional[datetime] = None, step_index: Optional[int] = None) -> None

Record execution time for a step.

Parameters:

Name Type Description Default
component str

Name of the component

required
seconds float

Execution time in seconds

required
started_at Optional[datetime]

When the step started (defaults to now)

None
step_index Optional[int]

Position in the flow definition (0-based). If not provided, uses execution order as step_index for backward compatibility.

None

StepTiming

Timing information for a single step execution.

StepTiming dataclass

StepTiming(step_index: int, component: str, duration: float, started_at: datetime, execution_order: int = 0)

Timing information for a single step execution.

Attributes:

Name Type Description
step_index int

Index of the step in the flow definition (0-based, matches config order)

component str

Name of the component executed

duration float

Execution time in seconds

started_at datetime

When the step started executing

execution_order int

Order in which this step was executed (0-based, skips excluded)

step_index instance-attribute

step_index: int

component instance-attribute

component: str

duration instance-attribute

duration: float

started_at instance-attribute

started_at: datetime

execution_order class-attribute instance-attribute

execution_order: int = 0

GraphExecutor

Executes graph-type flows using topological ordering with port-based routing.

GraphExecutor

GraphExecutor(nodes: list[GraphNodeConfig], edges: list[GraphEdgeConfig], components: dict[str, BaseComponent], settings: FlowSettings, hooks: list[Any] | None = None)

Executes a graph-type flow using topological ordering with port-based routing.

For DAG graphs, uses topological sort (Kahn's algorithm) for execution order. For cyclic graphs, uses a ready-queue executor with iteration limits.

Key semantics: - Root nodes (no incoming edges) execute first — these are triggers/entry points - Port routing: When a component sets active_port, only matching edges propagate - Unreachable nodes: If no incoming edge was activated, the node is skipped - Completed nodes: Nodes in completed_nodes are skipped (for resume support)

execute

execute(context: FlowContext) -> FlowContext

Execute the graph flow.

Dispatches to the DAG executor for acyclic graphs, or the cyclic executor for graphs containing cycles.


ExecutionHook

Protocol for step lifecycle hooks. Implement any or all methods.

ExecutionHook

Bases: Protocol

Callback protocol for step lifecycle events.

on_node_start

on_node_start(node_id: str, component_name: str, context: FlowContext) -> None

on_node_complete

on_node_complete(node_id: str, component_name: str, context: FlowContext, duration: float) -> None

on_node_error

on_node_error(node_id: str, component_name: str, error: Exception, context: FlowContext) -> None

on_node_skipped

on_node_skipped(node_id: str, component_name: str, reason: str) -> None

on_flow_suspended

on_flow_suspended(node_id: str, reason: str, checkpoint_id: str | None) -> None

Checkpoint

Serializable snapshot of flow execution state for suspend/resume.

Checkpoint dataclass

Checkpoint(flow_config: dict[str, Any], context: dict[str, Any], created_at: str = (lambda: datetime.now(timezone.utc).isoformat())(), checkpoint_id: str = (lambda: str(uuid.uuid4()))())

Serializable snapshot of a suspended flow execution.

checkpoint_id class-attribute instance-attribute

checkpoint_id: str = field(default_factory=lambda: str(uuid4()))

flow_config instance-attribute

flow_config: dict[str, Any]

context instance-attribute

context: dict[str, Any]

created_at class-attribute instance-attribute

created_at: str = field(default_factory=lambda: isoformat())

to_dict

to_dict() -> dict[str, Any]

from_dict classmethod

from_dict(data: dict[str, Any]) -> Checkpoint

to_json

to_json() -> str

from_json classmethod

from_json(json_str: str) -> Checkpoint

CheckpointStore

Abstract base class for checkpoint persistence.

CheckpointStore

Bases: ABC

Abstract checkpoint persistence.

save abstractmethod

save(checkpoint: Checkpoint) -> str

Save a checkpoint. Returns checkpoint_id.

load abstractmethod

load(checkpoint_id: str) -> Checkpoint | None

Load a checkpoint by ID. Returns None if not found.

delete abstractmethod

delete(checkpoint_id: str) -> None

Delete a checkpoint by ID.


InMemoryCheckpointStore

In-memory implementation of CheckpointStore.

InMemoryCheckpointStore

InMemoryCheckpointStore()

Bases: CheckpointStore

Default in-memory store for testing. Production uses DB-backed store.

save

save(checkpoint: Checkpoint) -> str

load

load(checkpoint_id: str) -> Checkpoint | None

delete

delete(checkpoint_id: str) -> None