Components¶
Components are the building blocks of FlowEngine. Each component encapsulates a unit of business logic that can be configured, tested, and reused across different flows.
Component Lifecycle¶
Every component follows a defined lifecycle with five methods:
┌─────────────────────────────────────────────────────────┐
│ Component Lifecycle │
├─────────────────────────────────────────────────────────┤
│ │
│ 1. __init__(name) → Instance creation │
│ ↓ │
│ 2. init(config) → One-time configuration │
│ ↓ │
│ ┌──────────────────── Per Execution ──────────────────┐│
│ │ 3. setup(context) → Pre-processing ││
│ │ ↓ ││
│ │ 4. process(context) → Main logic [REQUIRED] ││
│ │ ↓ ││
│ │ 5. teardown(context)→ Cleanup (always runs) ││
│ └─────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────┘
| Method | Called | Purpose |
|---|---|---|
__init__(name) |
Once | Instance creation with unique name |
init(config) |
Once | Process configuration dictionary |
setup(context) |
Each execution | Pre-processing setup |
process(context) |
Each execution | Main logic (required) |
teardown(context) |
Each execution | Cleanup, always runs even on error |
Creating a Component¶
Basic Component¶
from flowengine import BaseComponent, FlowContext
class SimpleComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
context.set("result", "Hello from SimpleComponent!")
return context
Configurable Component¶
class ConfigurableComponent(BaseComponent):
def init(self, config: dict) -> None:
super().init(config) # Always call super().init()
self.multiplier = config.get("multiplier", 1)
self.prefix = config.get("prefix", "")
def process(self, context: FlowContext) -> FlowContext:
value = context.get("input_value", 0)
result = value * self.multiplier
context.set("output", f"{self.prefix}{result}")
return context
YAML configuration:
components:
- name: calculator
type: myapp.ConfigurableComponent
config:
multiplier: 10
prefix: "Result: "
Lifecycle Methods in Detail¶
init(config)¶
Called once when the engine initializes. Use for:
- Parsing configuration options
- Initializing instance attributes
- Validating required configuration
def init(self, config: dict) -> None:
super().init(config) # Required!
# Parse configuration
self.api_key = config.get("api_key")
self.timeout = config.get("timeout", 30)
self.retries = config.get("retries", 3)
# Initialize resources (not connections - do that in setup)
self._client = None
setup(context)¶
Called before each process(). Use for:
- Opening connections
- Acquiring resources
- Reading initial context state
def setup(self, context: FlowContext) -> None:
# Open database connection
self._conn = DatabaseConnection(self.connection_string)
# Read any required context values
self._batch_id = context.get("batch_id")
process(context)¶
The main processing logic. This is the only required method.
def process(self, context: FlowContext) -> FlowContext:
# Read input
data = context.get("input_data")
# Process
result = self.transform(data)
# Write output
context.set("output_data", result)
return context # Always return context
teardown(context)¶
Called after process() completes, even if it raised an exception. Use for:
- Closing connections
- Releasing resources
- Final cleanup
def teardown(self, context: FlowContext) -> None:
if self._conn:
self._conn.close()
self._conn = None
Configuration Validation¶
Override validate_config() to check required settings:
class DatabaseComponent(BaseComponent):
def init(self, config: dict) -> None:
super().init(config)
self.connection_string = config.get("connection_string")
self.table_name = config.get("table_name")
def validate_config(self) -> list[str]:
errors = []
if not self.connection_string:
errors.append("connection_string is required")
if not self.table_name:
errors.append("table_name is required")
return errors
def process(self, context: FlowContext) -> FlowContext:
# Safe to use self.connection_string and self.table_name
...
Validation is called during engine initialization:
engine = FlowEngine(config, components)
errors = engine.validate()
if errors:
print("Configuration errors:", errors)
Health Checks¶
Implement health_check() to verify component readiness:
class APIComponent(BaseComponent):
def init(self, config: dict) -> None:
super().init(config)
self.base_url = config["base_url"]
def health_check(self) -> bool:
try:
response = requests.get(f"{self.base_url}/health")
return response.status_code == 200
except Exception:
return False
Check health before execution:
for name, component in components.items():
if not component.health_check():
print(f"Component {name} is not healthy!")
Timeout Compliance¶
For long-running operations, call check_deadline() periodically:
class BatchProcessor(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
items = context.get("items", [])
results = []
for item in items:
# Check if we've exceeded the deadline
self.check_deadline(context)
# Process item
result = self.process_item(item)
results.append(result)
context.set("results", results)
return context
Warning
If require_deadline_check: true is set in flow settings and your component doesn't call check_deadline(), a DeadlineCheckError will be raised.
Complete Example¶
from flowengine import BaseComponent, FlowContext
class ETLComponent(BaseComponent):
"""Complete ETL component with full lifecycle."""
def init(self, config: dict) -> None:
super().init(config)
self.source_table = config.get("source_table")
self.target_table = config.get("target_table")
self.batch_size = config.get("batch_size", 1000)
self._conn = None
def validate_config(self) -> list[str]:
errors = []
if not self.source_table:
errors.append("source_table is required")
if not self.target_table:
errors.append("target_table is required")
return errors
def health_check(self) -> bool:
# Would check database connectivity
return True
def setup(self, context: FlowContext) -> None:
self._conn = create_db_connection()
def process(self, context: FlowContext) -> FlowContext:
# Extract
data = self._conn.query(f"SELECT * FROM {self.source_table}")
# Transform with deadline checks
transformed = []
for i, record in enumerate(data):
if i % 100 == 0:
self.check_deadline(context)
transformed.append(self.transform_record(record))
# Load
self._conn.bulk_insert(self.target_table, transformed)
context.set("rows_processed", len(transformed))
return context
def teardown(self, context: FlowContext) -> None:
if self._conn:
self._conn.close()
self._conn = None
def transform_record(self, record):
# Transform logic
return record
Async Components¶
Components can implement native async processing by overriding process_async():
import asyncio
from flowengine import BaseComponent, FlowContext
class AsyncAPIComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
# Sync fallback (used when async is not available)
context.set("result", sync_fetch())
return context
async def process_async(self, context: FlowContext) -> FlowContext:
# Native async implementation
result = await async_fetch()
context.set("result", result)
return context
is_async Property¶
Detect whether a component supports async:
comp = AsyncAPIComponent("api")
print(comp.is_async) # True — process_async is overridden
sync_comp = SimpleComponent("sync")
print(sync_comp.is_async) # False — only process() defined
Default Fallback¶
If a component does not override process_async(), calling it will automatically run the synchronous process() method. This means all components can be used in async contexts.
Port-Based Output Routing¶
In graph flows, components can direct execution to specific downstream branches using output ports:
class RouterComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
request_type = context.get("request_type")
if request_type == "urgent":
self.set_output_port(context, "urgent")
else:
self.set_output_port(context, "normal")
return context
The set_output_port(context, port_name) method sets the active port on the context. The graph executor then only activates edges whose port field matches the active port (plus any unconditional edges with no port).
Low-Level Port API¶
You can also use the context's port methods directly:
context.set_port("my_port") # Set active port
port = context.get_active_port() # Get active port (or None)
context.clear_port() # Clear active port
Best Practices¶
- Always call
super().init(config)in yourinit()method - Always return the context from
process() - Use
setup()for connections, notinit() - Always close resources in
teardown() - Call
check_deadline()in loops for long operations - Validate configuration in
validate_config() - Keep components focused - one responsibility per component
- Use
set_output_port()for graph branching instead of manipulating context flags - Override
process_async()for I/O-bound components that benefit from async