Skip to content

Configuration Module

Classes for loading and validating flow configurations.


ConfigLoader

Loads and validates YAML flow configurations.

ConfigLoader

Loads and validates flow configurations.

Supports loading from: - YAML files - YAML strings - Python dictionaries

Example
# From file
config = ConfigLoader.load("flow.yaml")

# From string
yaml_str = '''
name: "Test Flow"
components:
  - name: test
    type: myapp.TestComponent
flow:
  steps:
    - component: test
'''
config = ConfigLoader.loads(yaml_str)

# From dictionary
config = ConfigLoader.from_dict({
    "name": "Test Flow",
    "components": [...],
    "flow": {...}
})

load staticmethod

load(path: Union[str, Path]) -> FlowConfig

Load configuration from a YAML file.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the YAML configuration file

required

Returns:

Type Description
FlowConfig

Validated FlowConfig object

Raises:

Type Description
ConfigurationError

If file cannot be read or parsed

loads staticmethod

loads(yaml_string: str) -> FlowConfig

Load configuration from a YAML string.

Parameters:

Name Type Description Default
yaml_string str

YAML content as string

required

Returns:

Type Description
FlowConfig

Validated FlowConfig object

Raises:

Type Description
ConfigurationError

If YAML is invalid or validation fails

from_dict staticmethod

from_dict(data: dict[str, Any]) -> FlowConfig

Load configuration from a dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Configuration dictionary

required

Returns:

Type Description
FlowConfig

Validated FlowConfig object

Raises:

Type Description
ConfigurationError

If validation fails


FlowConfig

Complete flow configuration model.

FlowConfig

Bases: BaseModel

Complete flow configuration.

This is the root model for a flow configuration file.

Attributes:

Name Type Description
name str

Human-readable flow name

version str

Configuration version string

description Optional[str]

Optional flow description

components list[ComponentConfig]

List of component definitions

flow FlowDefinition

Flow definition with steps

Example YAML
name: "My Flow"
version: "1.0"
components:
  - name: fetcher
    type: myapp.FetchComponent
    config:
      url: "https://api.example.com"
flow:
  type: sequential
  steps:
    - component: fetcher

name class-attribute instance-attribute

name: str = Field(..., description='Flow name')

version class-attribute instance-attribute

version: str = Field(default='1.0', description='Configuration version')

description class-attribute instance-attribute

description: Optional[str] = Field(default=None, description='Flow description')

components class-attribute instance-attribute

components: list[ComponentConfig] = Field(..., description='Component definitions', min_length=1)

flow class-attribute instance-attribute

flow: FlowDefinition = Field(..., description='Flow definition')

settings property

settings: FlowSettings

Shortcut to flow settings.

steps property

steps: list[StepConfig]

Shortcut to flow steps.

get_component_config

get_component_config(name: str) -> Optional[ComponentConfig]

Get configuration for a named component.

Parameters:

Name Type Description Default
name str

Component name to find

required

Returns:

Type Description
Optional[ComponentConfig]

ComponentConfig if found, None otherwise


ComponentConfig

Configuration for a single component.

ComponentConfig

Bases: BaseModel

Configuration for a single component.

Attributes:

Name Type Description
name str

Unique component name within the flow

type str

Component class path (e.g., "myapp.components.MyComponent")

config dict[str, Any]

Component-specific configuration dictionary

name class-attribute instance-attribute

name: str = Field(..., description='Unique component name')

type class-attribute instance-attribute

type: str = Field(..., description='Component class path')

config class-attribute instance-attribute

config: dict[str, Any] = Field(default_factory=dict, description='Component-specific configuration')

FlowDefinition

Flow structure and execution definition.

FlowDefinition

Bases: BaseModel

Flow structure definition.

Attributes:

Name Type Description
type Literal['sequential', 'conditional', 'graph']

Flow execution type that determines how steps are processed: - "sequential": (default) Runs all steps in order. Conditions guard individual steps - if a step's condition is False, it's skipped and the next step runs. All matching steps execute. - "conditional": First-match branching (like switch/case). Stops after the first step whose condition evaluates to True. Only one step executes. Defaults on_condition_error to "skip". - "graph": DAG-based execution with topological ordering and port-based routing.

settings FlowSettings

Execution settings

steps Optional[list[StepConfig]]

Ordered list of execution steps (for sequential/conditional)

nodes Optional[list[GraphNodeConfig]]

List of graph nodes (for graph type)

edges Optional[list[GraphEdgeConfig]]

List of graph edges (for graph type)

type class-attribute instance-attribute

type: Literal['sequential', 'conditional', 'graph'] = Field(default='sequential', description="Flow execution type: 'sequential' runs all matching steps, 'conditional' stops after first match, 'graph' executes DAG with port-based routing")

settings class-attribute instance-attribute

settings: FlowSettings = Field(default_factory=FlowSettings, description='Execution settings')

steps class-attribute instance-attribute

steps: Optional[list[StepConfig]] = Field(default=None, description='Ordered list of execution steps (sequential/conditional)')

nodes class-attribute instance-attribute

nodes: Optional[list[GraphNodeConfig]] = Field(default=None, description='List of graph nodes (graph type)')

edges class-attribute instance-attribute

edges: Optional[list[GraphEdgeConfig]] = Field(default=None, description='List of graph edges (graph type)')

GraphNodeConfig

Configuration for a single node in a graph flow.

GraphNodeConfig

Bases: BaseModel

A node in a graph flow.

Attributes:

Name Type Description
id str

Unique node ID within the graph

component str

References a component name

description Optional[str]

Human-readable node description

on_error Literal['fail', 'skip', 'continue']

How to handle errors (fail/skip/continue)

max_visits Optional[int]

Maximum times this node can execute in cyclic graphs. None means use the flow-level max_iterations setting.

id class-attribute instance-attribute

id: str = Field(..., description='Unique node ID within the graph')

component class-attribute instance-attribute

component: str = Field(..., description='Component name to execute')

description class-attribute instance-attribute

description: Optional[str] = Field(default=None, description='Human-readable node description')

on_error class-attribute instance-attribute

on_error: Literal['fail', 'skip', 'continue'] = Field(default='fail', description='Error handling behavior')

GraphEdgeConfig

Configuration for an edge in a graph flow.

GraphEdgeConfig

Bases: BaseModel

An edge connecting two nodes in a graph flow.

Attributes:

Name Type Description
source str

Source node ID

target str

Target node ID

port Optional[str]

Output port name (e.g. "true", "false"). None = unconditional edge.

source class-attribute instance-attribute

source: str = Field(..., description='Source node ID')

target class-attribute instance-attribute

target: str = Field(..., description='Target node ID')

port class-attribute instance-attribute

port: Optional[str] = Field(default=None, description="Output port name (e.g. 'true', 'false'). None = unconditional.")

StepConfig

Configuration for a single execution step.

StepConfig

Bases: BaseModel

Configuration for a single execution step.

Attributes:

Name Type Description
component str

Name of component to execute

description Optional[str]

Human-readable step description

condition Optional[str]

Python expression for conditional execution

on_error Literal['fail', 'skip', 'continue']

How to handle errors (fail/skip/continue)

component class-attribute instance-attribute

component: str = Field(..., description='Component name to execute')

description class-attribute instance-attribute

description: Optional[str] = Field(default=None, description='Human-readable step description')

condition class-attribute instance-attribute

condition: Optional[str] = Field(default=None, description='Python expression for conditional execution')

on_error class-attribute instance-attribute

on_error: Literal['fail', 'skip', 'continue'] = Field(default='fail', description='Error handling behavior')

FlowSettings

Flow execution settings.

FlowSettings

Bases: BaseModel

Flow execution settings.

Attributes:

Name Type Description
fail_fast bool

If True, stop on first error. Default True.

timeout_seconds float

Maximum flow execution time. Default 300.

timeout_mode Literal['cooperative', 'hard_async', 'hard_process']

How to enforce timeouts: - "cooperative": Components must call check_deadline() (default) - "hard_async": Use asyncio.wait_for for async component execution - "hard_process": Run steps in separate processes with hard kill

require_deadline_check bool

If True, raise error when long-running components don't call check_deadline() (only applies to cooperative mode). Default False (only warns).

on_condition_error Literal['fail', 'skip', 'warn']

How to handle condition evaluation errors: - "fail": Raise ConditionEvaluationError (default) - "skip": Skip the step and record the error - "warn": Log a warning and skip the step

max_iterations int

Maximum number of loop iterations for cyclic graphs. Default 10. Range: 1-1000.

on_max_iterations Literal['fail', 'exit', 'warn']

Policy when max_iterations is reached: - "fail": Raise MaxIterationsError (default) - "exit": Silently stop execution - "warn": Log a warning and stop execution

convergence_check bool

Reserved for v0.3.1. Enable convergence detection.

convergence_keys list[str]

Reserved for v0.3.1. Context keys to check for convergence.

fail_fast class-attribute instance-attribute

fail_fast: bool = Field(default=True, description='Stop on first error')

timeout_seconds class-attribute instance-attribute

timeout_seconds: float = Field(default=300.0, description='Maximum flow execution time', gt=0)

timeout_mode class-attribute instance-attribute

timeout_mode: Literal['cooperative', 'hard_async', 'hard_process'] = Field(default='cooperative', description="Timeout enforcement mode: 'cooperative' (components call check_deadline), 'hard_async' (asyncio.wait_for), 'hard_process' (process isolation)")

require_deadline_check class-attribute instance-attribute

require_deadline_check: bool = Field(default=False, description="If True, raise error when long-running components don't call check_deadline() in cooperative mode. Default False (only warns).")

on_condition_error class-attribute instance-attribute

on_condition_error: Literal['fail', 'skip', 'warn'] = Field(default='fail', description='How to handle condition evaluation errors')

Usage Examples

Loading from File

from flowengine import ConfigLoader

config = ConfigLoader.load("flow.yaml")
print(config.name)
print(config.version)

Loading from String

yaml_content = """
name: "Test Flow"
version: "1.0"
components:
  - name: test
    type: myapp.TestComponent
flow:
  steps:
    - component: test
"""

config = ConfigLoader.loads(yaml_content)

Loading from Dictionary

config = ConfigLoader.from_dict({
    "name": "Test Flow",
    "version": "1.0",
    "components": [
        {"name": "test", "type": "myapp.TestComponent"}
    ],
    "flow": {
        "steps": [{"component": "test"}]
    }
})

Accessing Configuration

# Get component config by name
component_config = config.get_component_config("test")
print(component_config.type)

# Access settings
print(config.settings.timeout_seconds)
print(config.settings.fail_fast)

# Iterate steps
for step in config.steps:
    print(f"{step.component}: {step.description}")

Loading Graph Flow Configuration

config = ConfigLoader.from_dict({
    "name": "Graph Flow",
    "version": "1.0",
    "components": [
        {"name": "fetch", "type": "myapp.FetchComponent"},
        {"name": "process", "type": "myapp.ProcessComponent"},
    ],
    "flow": {
        "type": "graph",
        "nodes": [
            {"id": "n1", "component": "fetch"},
            {"id": "n2", "component": "process"},
        ],
        "edges": [
            {"source": "n1", "target": "n2"},
        ]
    }
})

# Access graph nodes and edges
for node in config.flow.nodes:
    print(f"Node {node.id}: {node.component}")
for edge in config.flow.edges:
    print(f"Edge {edge.source} -> {edge.target} (port: {edge.port})")