Contrib Module¶
Ready-to-use components for common tasks.
LoggingComponent¶
Logs context state for debugging and monitoring.
LoggingComponent
¶
Bases: BaseComponent
Component that logs context data.
Useful for debugging flows by logging context state at specific points in the execution.
Config
level: Log level (debug, info, warning, error). Default: info message: Log message template. Default: "Context state" log_data: Whether to log context data. Default: True log_metadata: Whether to log execution metadata. Default: False keys: Specific keys to log (if None, logs all). Default: None
Example YAML
Example usage
init
¶
Initialize with logging configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary |
required |
validate_config
¶
Validate logging configuration.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of validation errors |
process
¶
Log context data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
FlowContext
|
Current flow context |
required |
Returns:
| Type | Description |
|---|---|
FlowContext
|
Unchanged flow context |
Configuration Options¶
| Option | Type | Default | Description |
|---|---|---|---|
level |
str |
"info" |
Log level: debug, info, warning, error |
message |
str |
"Context state" |
Log message prefix |
log_data |
bool |
True |
Include context data in log |
log_metadata |
bool |
False |
Include execution metadata in log |
keys |
list[str] |
None |
Specific keys to log; None = all |
YAML Configuration¶
components:
- name: debug_logger
type: flowengine.contrib.logging.LoggingComponent
config:
level: debug
message: "After processing"
log_data: true
log_metadata: true
keys:
- user
- result
Usage Example¶
flow:
steps:
- component: fetch_data
- component: debug_logger
description: "Log state after fetch"
- component: process_data
- component: debug_logger
description: "Log state after process"
HTTPComponent¶
Makes HTTP requests to external APIs.
Installation
Requires the http extra: pip install flowengine[http]
HTTPComponent
¶
Bases: BaseComponent
Component that makes HTTP requests.
Fetches data from HTTP endpoints and stores responses in context. Requires the httpx package: pip install flowengine[http]
Config
base_url: Base URL for requests (required) timeout: Request timeout in seconds. Default: 30 headers: Optional headers dictionary. Default: {} method: HTTP method. Default: GET endpoint_key: Context key for endpoint path. Default: "endpoint" result_key: Context key for storing result. Default: component name
Example YAML
Example usage
fetcher = HTTPComponent("api_fetch")
fetcher.init({
"base_url": "https://api.example.com",
"timeout": 30
})
context = FlowContext()
context.set("endpoint", "/users/123")
context = fetcher.process(context)
print(context.data.api_fetch.status_code) # 200
print(context.data.api_fetch.data) # Response JSON
init
¶
Initialize with HTTP configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary |
required |
validate_config
¶
Validate HTTP configuration.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of validation errors |
setup
¶
Create HTTP client for this run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
FlowContext
|
Current flow context |
required |
process
¶
Fetch data and store in context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
FlowContext
|
Current flow context |
required |
Returns:
| Type | Description |
|---|---|
FlowContext
|
Updated flow context with response data |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If httpx is not available |
HTTPError
|
If request fails |
teardown
¶
Close HTTP client.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
FlowContext
|
Current flow context |
required |
Configuration Options¶
| Option | Type | Default | Description |
|---|---|---|---|
base_url |
str |
required | Base URL for requests |
timeout |
float |
30 |
Request timeout in seconds |
headers |
dict |
{} |
HTTP headers to include |
method |
str |
"GET" |
HTTP method |
endpoint_key |
str |
"endpoint" |
Context key for endpoint path |
result_key |
str |
component name | Context key for storing result |
Supported Methods¶
GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS
Response Format¶
The component stores the response in context:
{
"status": "success", # or "error"
"status_code": 200, # HTTP status code
"data": {...}, # Parsed JSON or text
"headers": {...} # Response headers
}
YAML Configuration¶
components:
- name: api_client
type: flowengine.contrib.http.HTTPComponent
config:
base_url: "https://api.example.com"
timeout: 30
headers:
Authorization: "Bearer ${API_TOKEN}"
Content-Type: "application/json"
method: GET
Usage Examples¶
Simple GET Request¶
components:
- name: fetch_users
type: flowengine.contrib.http.HTTPComponent
config:
base_url: "https://api.example.com"
method: GET
flow:
steps:
- component: fetch_users
context.set("endpoint", "/users")
result = engine.execute(context)
# Access response
users = result.data.fetch_users.data
status = result.data.fetch_users.status_code
POST Request with Body¶
# Set endpoint and body in context
context.set("endpoint", "/users")
context.set("request_body", {"name": "Alice", "email": "alice@example.com"})
# HTTPComponent reads request_body from context
result = engine.execute(context)
Dynamic Endpoint¶
steps:
- component: get_user_id
# Sets context.data.user_id
- component: fetch_user_details
# Uses context.data.endpoint set by previous step
class GetUserId(BaseComponent):
def process(self, context):
user_id = context.get("request_params", {}).get("id")
context.set("endpoint", f"/users/{user_id}")
return context
Error Handling¶
flow:
settings:
fail_fast: false
steps:
- component: api_call
on_error: continue
- component: handle_success
condition: "context.data.api_call.status == 'success'"
- component: handle_error
condition: "context.data.api_call.status == 'error'"
Creating Custom Contrib Components¶
Follow these patterns when creating reusable components:
from flowengine import BaseComponent, FlowContext
class CustomContribComponent(BaseComponent):
"""Custom component for reuse across projects."""
def init(self, config: dict) -> None:
super().init(config)
# Parse configuration with sensible defaults
self.option = config.get("option", "default")
def validate_config(self) -> list[str]:
errors = []
# Validate required options
if not self.config.get("required_option"):
errors.append("required_option is required")
return errors
def process(self, context: FlowContext) -> FlowContext:
# Implement main logic
result = self.do_something()
context.set(self.name, result)
return context