Skip to content

Contrib Module

Ready-to-use components for common tasks.


LoggingComponent

Logs context state for debugging and monitoring.

LoggingComponent

LoggingComponent(name: str)

Bases: BaseComponent

Component that logs context data.

Useful for debugging flows by logging context state at specific points in the execution.

Config

level: Log level (debug, info, warning, error). Default: info message: Log message template. Default: "Context state" log_data: Whether to log context data. Default: True log_metadata: Whether to log execution metadata. Default: False keys: Specific keys to log (if None, logs all). Default: None

Example YAML
- name: logger
  type: flowengine.contrib.logging.LoggingComponent
  config:
    level: debug
    message: "After processing"
    log_data: true
    keys:
      - user
      - result
Example usage
logger = LoggingComponent("debug_log")
logger.init({
    "level": "debug",
    "message": "Current state",
    "keys": ["user", "data"]
})

init

init(config: dict[str, Any]) -> None

Initialize with logging configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary

required

validate_config

validate_config() -> list[str]

Validate logging configuration.

Returns:

Type Description
list[str]

List of validation errors

process

process(context: FlowContext) -> FlowContext

Log context data.

Parameters:

Name Type Description Default
context FlowContext

Current flow context

required

Returns:

Type Description
FlowContext

Unchanged flow context

Configuration Options

Option Type Default Description
level str "info" Log level: debug, info, warning, error
message str "Context state" Log message prefix
log_data bool True Include context data in log
log_metadata bool False Include execution metadata in log
keys list[str] None Specific keys to log; None = all

YAML Configuration

components:
  - name: debug_logger
    type: flowengine.contrib.logging.LoggingComponent
    config:
      level: debug
      message: "After processing"
      log_data: true
      log_metadata: true
      keys:
        - user
        - result

Usage Example

flow:
  steps:
    - component: fetch_data

    - component: debug_logger
      description: "Log state after fetch"

    - component: process_data

    - component: debug_logger
      description: "Log state after process"

HTTPComponent

Makes HTTP requests to external APIs.

Installation

Requires the http extra: pip install flowengine[http]

HTTPComponent

HTTPComponent(name: str)

Bases: BaseComponent

Component that makes HTTP requests.

Fetches data from HTTP endpoints and stores responses in context. Requires the httpx package: pip install flowengine[http]

Config

base_url: Base URL for requests (required) timeout: Request timeout in seconds. Default: 30 headers: Optional headers dictionary. Default: {} method: HTTP method. Default: GET endpoint_key: Context key for endpoint path. Default: "endpoint" result_key: Context key for storing result. Default: component name

Example YAML
- name: api_fetch
  type: flowengine.contrib.http.HTTPComponent
  config:
    base_url: "https://api.example.com"
    timeout: 30
    headers:
      Authorization: "Bearer ${API_TOKEN}"
Example usage
fetcher = HTTPComponent("api_fetch")
fetcher.init({
    "base_url": "https://api.example.com",
    "timeout": 30
})

context = FlowContext()
context.set("endpoint", "/users/123")
context = fetcher.process(context)

print(context.data.api_fetch.status_code)  # 200
print(context.data.api_fetch.data)  # Response JSON

init

init(config: dict[str, Any]) -> None

Initialize with HTTP configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary

required

validate_config

validate_config() -> list[str]

Validate HTTP configuration.

Returns:

Type Description
list[str]

List of validation errors

setup

setup(context: FlowContext) -> None

Create HTTP client for this run.

Parameters:

Name Type Description Default
context FlowContext

Current flow context

required

process

process(context: FlowContext) -> FlowContext

Fetch data and store in context.

Parameters:

Name Type Description Default
context FlowContext

Current flow context

required

Returns:

Type Description
FlowContext

Updated flow context with response data

Raises:

Type Description
RuntimeError

If httpx is not available

HTTPError

If request fails

teardown

teardown(context: FlowContext) -> None

Close HTTP client.

Parameters:

Name Type Description Default
context FlowContext

Current flow context

required

Configuration Options

Option Type Default Description
base_url str required Base URL for requests
timeout float 30 Request timeout in seconds
headers dict {} HTTP headers to include
method str "GET" HTTP method
endpoint_key str "endpoint" Context key for endpoint path
result_key str component name Context key for storing result

Supported Methods

GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS

Response Format

The component stores the response in context:

{
    "status": "success",      # or "error"
    "status_code": 200,       # HTTP status code
    "data": {...},            # Parsed JSON or text
    "headers": {...}          # Response headers
}

YAML Configuration

components:
  - name: api_client
    type: flowengine.contrib.http.HTTPComponent
    config:
      base_url: "https://api.example.com"
      timeout: 30
      headers:
        Authorization: "Bearer ${API_TOKEN}"
        Content-Type: "application/json"
      method: GET

Usage Examples

Simple GET Request

components:
  - name: fetch_users
    type: flowengine.contrib.http.HTTPComponent
    config:
      base_url: "https://api.example.com"
      method: GET

flow:
  steps:
    - component: fetch_users
context.set("endpoint", "/users")
result = engine.execute(context)

# Access response
users = result.data.fetch_users.data
status = result.data.fetch_users.status_code

POST Request with Body

# Set endpoint and body in context
context.set("endpoint", "/users")
context.set("request_body", {"name": "Alice", "email": "alice@example.com"})

# HTTPComponent reads request_body from context
result = engine.execute(context)

Dynamic Endpoint

steps:
  - component: get_user_id
    # Sets context.data.user_id

  - component: fetch_user_details
    # Uses context.data.endpoint set by previous step
class GetUserId(BaseComponent):
    def process(self, context):
        user_id = context.get("request_params", {}).get("id")
        context.set("endpoint", f"/users/{user_id}")
        return context

Error Handling

flow:
  settings:
    fail_fast: false

  steps:
    - component: api_call
      on_error: continue

    - component: handle_success
      condition: "context.data.api_call.status == 'success'"

    - component: handle_error
      condition: "context.data.api_call.status == 'error'"

Creating Custom Contrib Components

Follow these patterns when creating reusable components:

from flowengine import BaseComponent, FlowContext

class CustomContribComponent(BaseComponent):
    """Custom component for reuse across projects."""

    def init(self, config: dict) -> None:
        super().init(config)
        # Parse configuration with sensible defaults
        self.option = config.get("option", "default")

    def validate_config(self) -> list[str]:
        errors = []
        # Validate required options
        if not self.config.get("required_option"):
            errors.append("required_option is required")
        return errors

    def process(self, context: FlowContext) -> FlowContext:
        # Implement main logic
        result = self.do_something()
        context.set(self.name, result)
        return context