Timeout Modes¶
FlowEngine provides three timeout enforcement modes to protect against runaway flows and ensure predictable execution times.
Overview¶
flow:
settings:
timeout_seconds: 60
timeout_mode: cooperative # cooperative, hard_async, or hard_process
| Mode | Enforcement | Best For |
|---|---|---|
cooperative |
Components call check_deadline() |
Default, safest for complex components |
hard_async |
Uses asyncio.wait_for |
I/O-bound operations |
hard_process |
Runs in separate process | CPU-bound, guaranteed termination |
Cooperative Mode (Default)¶
Components voluntarily check if the deadline has passed.
How It Works¶
- Engine sets a deadline before each step
- Engine checks deadline between steps
- Components call
check_deadline()during long operations - If deadline exceeded,
FlowTimeoutErroris raised
Component Implementation¶
class BatchProcessor(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
items = context.get("items", [])
results = []
for i, item in enumerate(items):
# Check every N iterations
if i % 100 == 0:
self.check_deadline(context)
results.append(self.process_item(item))
context.set("results", results)
return context
Strict Enforcement¶
With require_deadline_check: true, components must call check_deadline():
If a component runs longer than a threshold (default 1 second) without calling check_deadline(), a DeadlineCheckError is raised.
from flowengine import DeadlineCheckError
try:
result = engine.execute()
except DeadlineCheckError as e:
print(f"Component '{e.component}' didn't check deadline")
print(f"Ran for {e.duration:.2f}s")
Hard Async Mode¶
Uses asyncio.wait_for to enforce timeouts. Components run in a thread executor, allowing cancellation of blocking operations.
How It Works¶
- Each step runs in a thread via asyncio
asyncio.wait_forenforces the timeout- On timeout, the task is cancelled
- Teardown always runs in the main thread
When to Use¶
- I/O-bound operations (network, file system)
- Components that block on external resources
- When you can't modify components to call
check_deadline()
Limitations¶
- Cannot interrupt pure CPU-bound operations
- Thread may continue briefly after cancellation
- Not suitable for infinite loops
Hard Process Mode¶
Runs each step in a separate process with hard kill on timeout.
How It Works¶
- Each step runs in a separate process
- Process is killed if it exceeds timeout
- Context is serialized/deserialized across process boundary
- Teardown runs in main process after process terminates
When to Use¶
- CPU-bound operations that may hang
- Untrusted or third-party components
- When guaranteed termination is required
Requirements¶
- Components must be picklable
- Context data must be JSON-serializable
- Higher overhead than other modes
# Works - standard class
class SafeComponent(BaseComponent):
def process(self, context):
...
# Doesn't work - lambda not picklable
class UnsafeComponent(BaseComponent):
def init(self, config):
self.transform = lambda x: x * 2 # Can't pickle
Comparison¶
| Scenario | Cooperative | Hard Async | Hard Process |
|---|---|---|---|
| Between steps | ✅ Always | ✅ Always | ✅ Always |
Component calls check_deadline() |
✅ Yes | ✅ Yes | ✅ Yes |
| Component blocks without checking | ❌ Overruns | ✅ Cancelled | ✅ Killed |
| Teardown runs on timeout | ✅ Yes | ✅ Yes | ✅ Yes |
| Overhead | Low | Medium | High |
| CPU-bound protection | ❌ No | ❌ No | ✅ Yes |
Choosing a Mode¶
┌─────────────────────────────────────────────────────────┐
│ Choose Timeout Mode │
├─────────────────────────────────────────────────────────┤
│ │
│ Components call check_deadline()? │
│ └── YES → Use cooperative (default, safest) │
│ └── NO → Components do I/O operations? │
│ └── YES → Use hard_async │
│ └── NO → Components are CPU-bound? │
│ └── YES → Use hard_process │
│ └── NO → Use cooperative │
│ │
└─────────────────────────────────────────────────────────┘
Error Handling¶
from flowengine import FlowTimeoutError, DeadlineCheckError
try:
result = engine.execute()
except FlowTimeoutError as e:
print(f"Timeout after {e.elapsed:.2f}s (limit: {e.timeout}s)")
print(f"Message: {e.message}")
except DeadlineCheckError as e:
print(f"Component '{e.component}' didn't check deadline")
print(f"Duration: {e.duration:.2f}s")
Best Practices¶
Cooperative Mode¶
class CooperativeComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
# Check before long operations
self.check_deadline(context)
data = fetch_large_dataset()
# Check in loops
for i, item in enumerate(data):
if i % 100 == 0:
self.check_deadline(context)
process(item)
# Check before I/O
self.check_deadline(context)
save_results()
return context
Hard Async Mode¶
class AsyncSafeComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
# I/O operations are interruptible
response = requests.get(self.url, timeout=10)
context.set("data", response.json())
return context
Hard Process Mode¶
class ProcessSafeComponent(BaseComponent):
def init(self, config: dict) -> None:
super().init(config)
# Only store serializable data
self.url = config["url"]
self.timeout = config.get("timeout", 30)
def process(self, context: FlowContext) -> FlowContext:
# Will be killed if timeout exceeded
result = expensive_cpu_operation()
context.set("result", result)
return context
Configuration Examples¶
Development (Fast Fail)¶
Production (Resilient)¶
Untrusted Components¶
Next Steps¶
- Error Handling - Handle timeout errors
- Components - Implement
check_deadline() - API Reference - Exception details