Flow Configuration¶
Flows define how components are orchestrated. FlowEngine supports three flow types with different execution semantics.
YAML Structure¶
A complete flow configuration has three main sections:
# Metadata
name: "My Flow"
version: "1.0"
description: "Optional description"
# Component definitions
components:
- name: component_name
type: module.path.ComponentClass
config:
key: value
# Flow definition
flow:
type: sequential # or "conditional" or "graph"
settings:
fail_fast: true
timeout_seconds: 300
steps: # For sequential/conditional flows
- component: component_name
condition: "context.data.ready == True"
# OR for graph flows:
# nodes:
# - id: node_id
# component: component_name
# edges:
# - source: node_a
# target: node_b
Flow Types¶
Sequential (Default)¶
Runs all steps in order. Each step's condition determines whether that individual step runs.
flow:
type: sequential
steps:
- component: fetch_data # Always runs
- component: validate_data # Runs if condition True
condition: "context.data.fetch_status == 'success'"
- component: transform_data # Runs if condition True
condition: "context.data.is_valid == True"
- component: save_data # Runs if condition True
condition: "context.data.transformed is not None"
All four steps are evaluated. Multiple steps can execute if their conditions match.
Step 1: fetch_data → Runs (no condition)
Step 2: validate_data → Runs if fetch_status == 'success'
Step 3: transform_data → Runs if is_valid == True
Step 4: save_data → Runs if transformed is not None
Conditional (First-Match)¶
First-match branching like a switch/case statement. Stops after the first step whose condition is True.
flow:
type: conditional
steps:
- component: handle_user
condition: "context.data.request_type == 'user'"
- component: handle_order
condition: "context.data.request_type == 'order'"
- component: handle_admin
condition: "context.data.request_type == 'admin'"
- component: handle_unknown # No condition = default case
Only one step executes. Once a condition matches, remaining steps are skipped.
Input: request_type = "order"
Step 1: handle_user → Skip (condition False)
Step 2: handle_order → Run (condition True) ← STOP HERE
Step 3: handle_admin → Skip (not evaluated)
Step 4: handle_unknown → Skip (not evaluated)
Graph (DAG Execution)¶
Define flows as directed acyclic graphs where nodes are components and edges define execution flow. The graph executor uses topological ordering (Kahn's algorithm) to determine execution order.
flow:
type: graph
settings:
timeout_seconds: 60
nodes:
- id: fetch
component: fetch_data
- id: validate
component: validator
- id: process
component: processor
- id: notify
component: notifier
edges:
- source: fetch
target: validate
- source: validate
target: process
- source: process
target: notify
Port-Based Routing¶
Components can signal which output port to activate, enabling conditional branching in graphs:
class ValidatorComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
if is_valid(context.get("data")):
self.set_output_port(context, "valid")
else:
self.set_output_port(context, "invalid")
return context
flow:
type: graph
nodes:
- id: fetch
component: fetch_data
- id: validate
component: validator
- id: process_valid
component: processor
- id: handle_invalid
component: error_handler
edges:
- source: fetch
target: validate
- source: validate
target: process_valid
port: "valid" # Only activates when port == "valid"
- source: validate
target: handle_invalid
port: "invalid" # Only activates when port == "invalid"
Edge routing rules:
| Edge Type | Behavior |
|---|---|
No port (unconditional) |
Always activates |
port: "name" |
Only activates when component sets matching active port |
| No active port set | Only unconditional edges activate |
Node Configuration¶
Each node can have:
nodes:
- id: my_node # Required: unique node ID
component: my_component # Required: component name
description: "What it does" # Optional
on_error: fail # Optional: fail (default), skip, or continue
Multiple Root Nodes¶
Nodes with no incoming edges are root nodes — they all execute first:
nodes:
- id: source_a
component: fetch_api
- id: source_b
component: fetch_db
- id: merge
component: merger
edges:
- source: source_a
target: merge
- source: source_b
target: merge
Both source_a and source_b execute before merge.
Comparison¶
| Flow Type | Behavior | Use Case |
|---|---|---|
sequential |
All matching steps run | Data pipelines, multi-step processing |
conditional |
First match wins, then stop | Request routing, dispatch, switch/case |
graph |
DAG with port-based routing | Agent orchestration, approval flows, complex workflows |
Step Configuration (Sequential/Conditional)¶
Each step can have the following options:
steps:
- component: my_component # Required: component name
description: "What it does" # Optional: documentation
condition: "expression" # Optional: when to run
on_error: fail # Optional: error handling
Component (Required)¶
References a component defined in the components section:
components:
- name: processor
type: myapp.ProcessorComponent
flow:
steps:
- component: processor # Must match component name
Description (Optional)¶
Documents what the step does:
Condition (Optional)¶
A Python expression evaluated at runtime. See Conditions for full details.
On Error (Optional)¶
How to handle errors in this step:
| Value | Behavior |
|---|---|
fail |
Stop execution, raise exception (default) |
skip |
Log error, mark step as skipped |
continue |
Log error, continue to next step |
Flow Settings¶
Configure execution behavior:
flow:
settings:
fail_fast: true
timeout_seconds: 300
timeout_mode: cooperative
require_deadline_check: false
on_condition_error: fail
Settings Reference¶
| Setting | Default | Description |
|---|---|---|
fail_fast |
true |
Stop on first component error |
timeout_seconds |
300 |
Maximum execution time in seconds |
timeout_mode |
cooperative |
cooperative, hard_async, or hard_process |
require_deadline_check |
false |
Require components to call check_deadline() |
on_condition_error |
fail |
fail, skip, or warn |
Complete Examples¶
Data Pipeline (Sequential)¶
name: "ETL Pipeline"
version: "2.0"
components:
- name: extract
type: etl.ExtractComponent
config:
source: "database"
- name: transform
type: etl.TransformComponent
config:
operations: ["clean", "normalize"]
- name: validate
type: etl.ValidateComponent
- name: load
type: etl.LoadComponent
config:
destination: "warehouse"
- name: notify_success
type: etl.NotifyComponent
config:
channel: "slack"
- name: notify_failure
type: etl.NotifyComponent
config:
channel: "pagerduty"
flow:
type: sequential
settings:
fail_fast: false
timeout_seconds: 3600
steps:
- component: extract
description: "Extract data from source"
- component: transform
description: "Apply transformations"
condition: "context.data.extract_status == 'success'"
on_error: continue
- component: validate
description: "Validate transformed data"
condition: "context.data.transformed_data is not None"
- component: load
description: "Load into data warehouse"
condition: "context.data.validation_passed == True"
- component: notify_success
condition: "context.data.load_status == 'success'"
- component: notify_failure
condition: "context.metadata.has_errors == True"
Request Router (Conditional)¶
name: "API Request Router"
version: "1.0"
components:
- name: auth_check
type: api.AuthComponent
- name: handle_users
type: api.UserHandler
- name: handle_orders
type: api.OrderHandler
- name: handle_products
type: api.ProductHandler
- name: handle_admin
type: api.AdminHandler
- name: handle_not_found
type: api.NotFoundHandler
flow:
type: sequential
steps:
# Auth always runs first
- component: auth_check
description: "Verify authentication"
# Then route based on request
flow:
type: conditional
steps:
- component: handle_admin
condition: "context.data.user.is_admin == True and context.data.path.startswith('/admin')"
- component: handle_users
condition: "context.data.path.startswith('/users')"
- component: handle_orders
condition: "context.data.path.startswith('/orders')"
- component: handle_products
condition: "context.data.path.startswith('/products')"
- component: handle_not_found
# No condition = default case
Approval Workflow (Graph with Suspend/Resume)¶
name: "Approval Workflow"
version: "1.0"
components:
- name: intake
type: workflow.IntakeComponent
- name: approval
type: workflow.ApprovalComponent
- name: process_approved
type: workflow.ProcessComponent
- name: process_rejected
type: workflow.RejectionComponent
flow:
type: graph
settings:
timeout_seconds: 3600
nodes:
- id: intake
component: intake
- id: approve
component: approval
- id: approved
component: process_approved
- id: rejected
component: process_rejected
edges:
- source: intake
target: approve
- source: approve
target: approved
port: "approved"
- source: approve
target: rejected
port: "rejected"
Dry Run¶
Preview which steps would execute without running them:
engine = FlowEngine(config, components)
context = FlowContext()
context.set("request_type", "order")
# Preview execution
steps_to_run = engine.dry_run(context)
print("Would execute:", steps_to_run)
# Output: ["handle_order"]
Next Steps¶
- Conditions - Master condition expressions
- Timeout Modes - Protect against runaway flows
- Error Handling - Handle failures gracefully