Observer Pattern
Pattern Overview
The Observer Pattern defines a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically. In LLM applications, this pattern is crucial for building observable, monitorable AI systems.
Core Structure
# Subject (Observable)
class LLMSubject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def notify(self, event):
for observer in self.observers:
observer.update(event)
# Observer Interface
class Observer:
def update(self, event):
pass
# Concrete Observers
class CostMonitor(Observer):
def update(self, event):
# Track API costs
pass
class PerformanceMonitor(Observer):
def update(self, event):
# Monitor response times
pass
LLM-Specific Applications
1. Real-time API Monitoring
Problem: LLM APIs are expensive and can be slow. Teams need visibility into usage patterns, costs, and performance.
Solution: Observer pattern enables comprehensive monitoring without coupling monitoring logic to business logic.
class LLMClient:
def call_api(self, prompt, model):
self.notify_observers("call_start", prompt=prompt, model=model)
response = self.actual_api_call(prompt, model)
self.notify_observers("call_complete", response=response, cost=self.calculate_cost())
return response
Benefits:
Zero-overhead monitoring (observers can be disabled)
Multiple monitoring systems without code changes
Real-time dashboards and alerts
2. Multi-Agent System Coordination
Problem: In complex AI systems with multiple agents, coordination and state synchronization is critical.
Solution: Agents observe each other's state changes for coordinated decision making.
class AgentCoordinator:
def __init__(self):
self.agents = []
self.observers = [] # Other agents, loggers, dashboards
def agent_completed_task(self, agent_id, task_result):
self.notify_observers("task_complete", agent=agent_id, result=task_result)
# Other agents can react to this completion
3. Training Pipeline Monitoring
Problem: Model training involves multiple stages, each requiring different monitoring strategies.
Solution: Observer pattern enables flexible monitoring across training phases.
class TrainingPipeline:
def train_epoch(self):
self.notify_observers("epoch_start", epoch=self.current_epoch)
loss = self.forward_backward_pass()
self.notify_observers("epoch_complete", loss=loss, metrics=self.metrics)
Enterprise Use Cases
Cost Optimization
Real-time budget tracking: Monitor API costs against budgets
Provider switching: Automatically switch providers based on cost thresholds
Usage analytics: Track which models/prompts are most expensive
Performance Optimization
Response time monitoring: Track API latency across providers
Load balancing: Distribute requests based on real-time performance
Caching decisions: Cache expensive calls based on usage patterns
System Reliability
Error tracking: Monitor failure rates across different providers
Automatic failover: Switch to backup providers on errors
Health monitoring: Track system health metrics
Implementation Patterns
1. Event-Driven Architecture
class LLMEvent:
def __init__(self, event_type, **kwargs):
self.type = event_type
self.timestamp = datetime.now()
self.data = kwargs
class EventBus:
def __init__(self):
self.observers = defaultdict(list)
def subscribe(self, event_type, observer):
self.observers[event_type].append(observer)
def publish(self, event):
for observer in self.observers[event.type]:
observer.handle(event)
2. Decorator Integration
def observed_llm_call(observers=None):
def decorator(func):
def wrapper(*args, **kwargs):
for observer in observers or []:
observer.before_call(*args, **kwargs)
result = func(*args, **kwargs)
for observer in observers or []:
observer.after_call(result)
return result
return wrapper
return decorator
3. Async Observer Pattern
import asyncio
class AsyncLLMSubject:
async def notify_async(self, event):
tasks = [observer.update_async(event) for observer in self.observers]
await asyncio.gather(*tasks)
Best Practices
1. Weak References
Prevent memory leaks by using weak references for observers:
import weakref
class Subject:
def __init__(self):
self._observers = weakref.WeakSet()
2. Exception Isolation
Ensure one observer's failure doesn't affect others:
def notify(self, event):
for observer in self.observers:
try:
observer.update(event)
except Exception as e:
self.log_error(f"Observer {observer} failed: {e}")
3. Performance Considerations
For high-frequency events, consider:
Batching notifications
Async processing
Observer prioritization
Conditional notifications
Anti-Patterns to Avoid
1. Observer Explosion
Don't create too many fine-grained observers. Group related functionality.
Bad: TokenCountObserver
, CostCalculatorObserver
, BillingObserver
Good: CostMonitorObserver
(handles all cost-related logic)
2. Tight Coupling
Observers shouldn't depend on specific event structures.
Bad:
def update(self, event):
cost = event.response.metadata.billing.cost # Tight coupling
Good:
def update(self, event):
cost = event.get('cost') or self.calculate_cost(event) # Flexible
3. Synchronous Heavy Processing
Don't block the main thread with heavy observer processing.
Bad: Direct database writes in observers Good: Queue events for async processing
Integration with AI Frameworks
LangChain Integration
from langchain.callbacks.base import BaseCallbackHandler
class ObserverCallback(BaseCallbackHandler):
def __init__(self, subject):
self.subject = subject
def on_llm_start(self, serialized, prompts, **kwargs):
self.subject.notify("llm_start", prompts=prompts)
def on_llm_end(self, response, **kwargs):
self.subject.notify("llm_end", response=response)
OpenAI API Integration
class ObservedOpenAIClient:
def __init__(self, observers=None):
self.client = OpenAI()
self.observers = observers or []
def chat_completions_create(self, **kwargs):
self.notify("request_start", **kwargs)
response = self.client.chat.completions.create(**kwargs)
self.notify("request_end", response=response)
return response
Metrics and KPIs
Observer pattern enables tracking key metrics:
Cost Metrics: Total spend, cost per request, budget utilization
Performance Metrics: Response time, throughput, success rate
Usage Metrics: Popular models, peak hours, user patterns
Quality Metrics: Response quality scores, user satisfaction
Conclusion
The Observer Pattern is fundamental to building production-ready LLM applications. It enables:
Observability: Real-time monitoring and debugging
Flexibility: Add/remove monitoring without code changes
Scalability: Handle increasing complexity through loose coupling
Reliability: Isolate monitoring from core functionality
π Interactive Implementation
π Observer Pattern Notebook - Live implementation with real-time LLM monitoring, cost tracking, and performance analytics.
In LLM applications where costs, performance, and reliability are critical, the Observer Pattern provides the foundation for building observable, maintainable AI systems.
Last updated