Configuration Module¶
Classes for loading and validating flow configurations.
ConfigLoader¶
Loads and validates YAML flow configurations.
ConfigLoader
¶
Loads and validates flow configurations.
Supports loading from: - YAML files - YAML strings - Python dictionaries
Example
# From file
config = ConfigLoader.load("flow.yaml")
# From string
yaml_str = '''
name: "Test Flow"
components:
- name: test
type: myapp.TestComponent
flow:
steps:
- component: test
'''
config = ConfigLoader.loads(yaml_str)
# From dictionary
config = ConfigLoader.from_dict({
"name": "Test Flow",
"components": [...],
"flow": {...}
})
load
staticmethod
¶
Load configuration from a YAML file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Union[str, Path]
|
Path to the YAML configuration file |
required |
Returns:
| Type | Description |
|---|---|
FlowConfig
|
Validated FlowConfig object |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If file cannot be read or parsed |
loads
staticmethod
¶
Load configuration from a YAML string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
yaml_string
|
str
|
YAML content as string |
required |
Returns:
| Type | Description |
|---|---|
FlowConfig
|
Validated FlowConfig object |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If YAML is invalid or validation fails |
from_dict
staticmethod
¶
Load configuration from a dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Configuration dictionary |
required |
Returns:
| Type | Description |
|---|---|
FlowConfig
|
Validated FlowConfig object |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If validation fails |
FlowConfig¶
Complete flow configuration model.
FlowConfig
¶
Bases: BaseModel
Complete flow configuration.
This is the root model for a flow configuration file.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Human-readable flow name |
version |
str
|
Configuration version string |
description |
Optional[str]
|
Optional flow description |
components |
list[ComponentConfig]
|
List of component definitions |
flow |
FlowDefinition
|
Flow definition with steps |
Example YAML
version
class-attribute
instance-attribute
¶
description
class-attribute
instance-attribute
¶
components
class-attribute
instance-attribute
¶
flow
class-attribute
instance-attribute
¶
get_component_config
¶
Get configuration for a named component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Component name to find |
required |
Returns:
| Type | Description |
|---|---|
Optional[ComponentConfig]
|
ComponentConfig if found, None otherwise |
ComponentConfig¶
Configuration for a single component.
ComponentConfig
¶
Bases: BaseModel
Configuration for a single component.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Unique component name within the flow |
type |
str
|
Component class path (e.g., "myapp.components.MyComponent") |
config |
dict[str, Any]
|
Component-specific configuration dictionary |
FlowDefinition¶
Flow structure and execution definition.
FlowDefinition
¶
Bases: BaseModel
Flow structure definition.
Attributes:
| Name | Type | Description |
|---|---|---|
type |
Literal['sequential', 'conditional', 'graph']
|
Flow execution type that determines how steps are processed: - "sequential": (default) Runs all steps in order. Conditions guard individual steps - if a step's condition is False, it's skipped and the next step runs. All matching steps execute. - "conditional": First-match branching (like switch/case). Stops after the first step whose condition evaluates to True. Only one step executes. Defaults on_condition_error to "skip". - "graph": DAG-based execution with topological ordering and port-based routing. |
settings |
FlowSettings
|
Execution settings |
steps |
Optional[list[StepConfig]]
|
Ordered list of execution steps (for sequential/conditional) |
nodes |
Optional[list[GraphNodeConfig]]
|
List of graph nodes (for graph type) |
edges |
Optional[list[GraphEdgeConfig]]
|
List of graph edges (for graph type) |
type
class-attribute
instance-attribute
¶
type: Literal['sequential', 'conditional', 'graph'] = Field(default='sequential', description="Flow execution type: 'sequential' runs all matching steps, 'conditional' stops after first match, 'graph' executes DAG with port-based routing")
settings
class-attribute
instance-attribute
¶
steps
class-attribute
instance-attribute
¶
steps: Optional[list[StepConfig]] = Field(default=None, description='Ordered list of execution steps (sequential/conditional)')
nodes
class-attribute
instance-attribute
¶
nodes: Optional[list[GraphNodeConfig]] = Field(default=None, description='List of graph nodes (graph type)')
edges
class-attribute
instance-attribute
¶
edges: Optional[list[GraphEdgeConfig]] = Field(default=None, description='List of graph edges (graph type)')
GraphNodeConfig¶
Configuration for a single node in a graph flow.
GraphNodeConfig
¶
Bases: BaseModel
A node in a graph flow.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
str
|
Unique node ID within the graph |
component |
str
|
References a component name |
description |
Optional[str]
|
Human-readable node description |
on_error |
Literal['fail', 'skip', 'continue']
|
How to handle errors (fail/skip/continue) |
max_visits |
Optional[int]
|
Maximum times this node can execute in cyclic graphs. None means use the flow-level max_iterations setting. |
GraphEdgeConfig¶
Configuration for an edge in a graph flow.
GraphEdgeConfig
¶
Bases: BaseModel
An edge connecting two nodes in a graph flow.
Attributes:
| Name | Type | Description |
|---|---|---|
source |
str
|
Source node ID |
target |
str
|
Target node ID |
port |
Optional[str]
|
Output port name (e.g. "true", "false"). None = unconditional edge. |
port
class-attribute
instance-attribute
¶
port: Optional[str] = Field(default=None, description="Output port name (e.g. 'true', 'false'). None = unconditional.")
StepConfig¶
Configuration for a single execution step.
StepConfig
¶
Bases: BaseModel
Configuration for a single execution step.
Attributes:
| Name | Type | Description |
|---|---|---|
component |
str
|
Name of component to execute |
description |
Optional[str]
|
Human-readable step description |
condition |
Optional[str]
|
Python expression for conditional execution |
on_error |
Literal['fail', 'skip', 'continue']
|
How to handle errors (fail/skip/continue) |
component
class-attribute
instance-attribute
¶
description
class-attribute
instance-attribute
¶
condition
class-attribute
instance-attribute
¶
condition: Optional[str] = Field(default=None, description='Python expression for conditional execution')
on_error
class-attribute
instance-attribute
¶
on_error: Literal['fail', 'skip', 'continue'] = Field(default='fail', description='Error handling behavior')
FlowSettings¶
Flow execution settings.
FlowSettings
¶
Bases: BaseModel
Flow execution settings.
Attributes:
| Name | Type | Description |
|---|---|---|
fail_fast |
bool
|
If True, stop on first error. Default True. |
timeout_seconds |
float
|
Maximum flow execution time. Default 300. |
timeout_mode |
Literal['cooperative', 'hard_async', 'hard_process']
|
How to enforce timeouts: - "cooperative": Components must call check_deadline() (default) - "hard_async": Use asyncio.wait_for for async component execution - "hard_process": Run steps in separate processes with hard kill |
require_deadline_check |
bool
|
If True, raise error when long-running components don't call check_deadline() (only applies to cooperative mode). Default False (only warns). |
on_condition_error |
Literal['fail', 'skip', 'warn']
|
How to handle condition evaluation errors: - "fail": Raise ConditionEvaluationError (default) - "skip": Skip the step and record the error - "warn": Log a warning and skip the step |
max_iterations |
int
|
Maximum number of loop iterations for cyclic graphs. Default 10. Range: 1-1000. |
on_max_iterations |
Literal['fail', 'exit', 'warn']
|
Policy when max_iterations is reached: - "fail": Raise MaxIterationsError (default) - "exit": Silently stop execution - "warn": Log a warning and stop execution |
convergence_check |
bool
|
Reserved for v0.3.1. Enable convergence detection. |
convergence_keys |
list[str]
|
Reserved for v0.3.1. Context keys to check for convergence. |
fail_fast
class-attribute
instance-attribute
¶
timeout_seconds
class-attribute
instance-attribute
¶
timeout_mode
class-attribute
instance-attribute
¶
timeout_mode: Literal['cooperative', 'hard_async', 'hard_process'] = Field(default='cooperative', description="Timeout enforcement mode: 'cooperative' (components call check_deadline), 'hard_async' (asyncio.wait_for), 'hard_process' (process isolation)")
require_deadline_check
class-attribute
instance-attribute
¶
require_deadline_check: bool = Field(default=False, description="If True, raise error when long-running components don't call check_deadline() in cooperative mode. Default False (only warns).")
on_condition_error
class-attribute
instance-attribute
¶
on_condition_error: Literal['fail', 'skip', 'warn'] = Field(default='fail', description='How to handle condition evaluation errors')
Usage Examples¶
Loading from File¶
from flowengine import ConfigLoader
config = ConfigLoader.load("flow.yaml")
print(config.name)
print(config.version)
Loading from String¶
yaml_content = """
name: "Test Flow"
version: "1.0"
components:
- name: test
type: myapp.TestComponent
flow:
steps:
- component: test
"""
config = ConfigLoader.loads(yaml_content)
Loading from Dictionary¶
config = ConfigLoader.from_dict({
"name": "Test Flow",
"version": "1.0",
"components": [
{"name": "test", "type": "myapp.TestComponent"}
],
"flow": {
"steps": [{"component": "test"}]
}
})
Accessing Configuration¶
# Get component config by name
component_config = config.get_component_config("test")
print(component_config.type)
# Access settings
print(config.settings.timeout_seconds)
print(config.settings.fail_fast)
# Iterate steps
for step in config.steps:
print(f"{step.component}: {step.description}")
Loading Graph Flow Configuration¶
config = ConfigLoader.from_dict({
"name": "Graph Flow",
"version": "1.0",
"components": [
{"name": "fetch", "type": "myapp.FetchComponent"},
{"name": "process", "type": "myapp.ProcessComponent"},
],
"flow": {
"type": "graph",
"nodes": [
{"id": "n1", "component": "fetch"},
{"id": "n2", "component": "process"},
],
"edges": [
{"source": "n1", "target": "n2"},
]
}
})
# Access graph nodes and edges
for node in config.flow.nodes:
print(f"Node {node.id}: {node.component}")
for edge in config.flow.edges:
print(f"Edge {edge.source} -> {edge.target} (port: {edge.port})")