Observer Pattern

Pattern Overview

The Observer Pattern defines a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically. In LLM applications, this pattern is crucial for building observable, monitorable AI systems.

Core Structure

# Subject (Observable)
class LLMSubject:
    def __init__(self):
        self.observers = []
    
    def attach(self, observer):
        self.observers.append(observer)
    
    def notify(self, event):
        for observer in self.observers:
            observer.update(event)

# Observer Interface
class Observer:
    def update(self, event):
        pass

# Concrete Observers
class CostMonitor(Observer):
    def update(self, event):
        # Track API costs
        pass

class PerformanceMonitor(Observer):
    def update(self, event):
        # Monitor response times
        pass

LLM-Specific Applications

1. Real-time API Monitoring

Problem: LLM APIs are expensive and can be slow. Teams need visibility into usage patterns, costs, and performance.

Solution: Observer pattern enables comprehensive monitoring without coupling monitoring logic to business logic.

class LLMClient:
    def call_api(self, prompt, model):
        self.notify_observers("call_start", prompt=prompt, model=model)
        response = self.actual_api_call(prompt, model)
        self.notify_observers("call_complete", response=response, cost=self.calculate_cost())
        return response

Benefits:

  • Zero-overhead monitoring (observers can be disabled)

  • Multiple monitoring systems without code changes

  • Real-time dashboards and alerts

2. Multi-Agent System Coordination

Problem: In complex AI systems with multiple agents, coordination and state synchronization is critical.

Solution: Agents observe each other's state changes for coordinated decision making.

class AgentCoordinator:
    def __init__(self):
        self.agents = []
        self.observers = []  # Other agents, loggers, dashboards
    
    def agent_completed_task(self, agent_id, task_result):
        self.notify_observers("task_complete", agent=agent_id, result=task_result)
        # Other agents can react to this completion

3. Training Pipeline Monitoring

Problem: Model training involves multiple stages, each requiring different monitoring strategies.

Solution: Observer pattern enables flexible monitoring across training phases.

class TrainingPipeline:
    def train_epoch(self):
        self.notify_observers("epoch_start", epoch=self.current_epoch)
        loss = self.forward_backward_pass()
        self.notify_observers("epoch_complete", loss=loss, metrics=self.metrics)

Enterprise Use Cases

Cost Optimization

  • Real-time budget tracking: Monitor API costs against budgets

  • Provider switching: Automatically switch providers based on cost thresholds

  • Usage analytics: Track which models/prompts are most expensive

Performance Optimization

  • Response time monitoring: Track API latency across providers

  • Load balancing: Distribute requests based on real-time performance

  • Caching decisions: Cache expensive calls based on usage patterns

System Reliability

  • Error tracking: Monitor failure rates across different providers

  • Automatic failover: Switch to backup providers on errors

  • Health monitoring: Track system health metrics

Implementation Patterns

1. Event-Driven Architecture

class LLMEvent:
    def __init__(self, event_type, **kwargs):
        self.type = event_type
        self.timestamp = datetime.now()
        self.data = kwargs

class EventBus:
    def __init__(self):
        self.observers = defaultdict(list)
    
    def subscribe(self, event_type, observer):
        self.observers[event_type].append(observer)
    
    def publish(self, event):
        for observer in self.observers[event.type]:
            observer.handle(event)

2. Decorator Integration

def observed_llm_call(observers=None):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for observer in observers or []:
                observer.before_call(*args, **kwargs)
            
            result = func(*args, **kwargs)
            
            for observer in observers or []:
                observer.after_call(result)
            
            return result
        return wrapper
    return decorator

3. Async Observer Pattern

import asyncio

class AsyncLLMSubject:
    async def notify_async(self, event):
        tasks = [observer.update_async(event) for observer in self.observers]
        await asyncio.gather(*tasks)

Best Practices

1. Weak References

Prevent memory leaks by using weak references for observers:

import weakref

class Subject:
    def __init__(self):
        self._observers = weakref.WeakSet()

2. Exception Isolation

Ensure one observer's failure doesn't affect others:

def notify(self, event):
    for observer in self.observers:
        try:
            observer.update(event)
        except Exception as e:
            self.log_error(f"Observer {observer} failed: {e}")

3. Performance Considerations

For high-frequency events, consider:

  • Batching notifications

  • Async processing

  • Observer prioritization

  • Conditional notifications

Anti-Patterns to Avoid

1. Observer Explosion

Don't create too many fine-grained observers. Group related functionality.

Bad: TokenCountObserver, CostCalculatorObserver, BillingObserver Good: CostMonitorObserver (handles all cost-related logic)

2. Tight Coupling

Observers shouldn't depend on specific event structures.

Bad:

def update(self, event):
    cost = event.response.metadata.billing.cost  # Tight coupling

Good:

def update(self, event):
    cost = event.get('cost') or self.calculate_cost(event)  # Flexible

3. Synchronous Heavy Processing

Don't block the main thread with heavy observer processing.

Bad: Direct database writes in observers Good: Queue events for async processing

Integration with AI Frameworks

LangChain Integration

from langchain.callbacks.base import BaseCallbackHandler

class ObserverCallback(BaseCallbackHandler):
    def __init__(self, subject):
        self.subject = subject
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        self.subject.notify("llm_start", prompts=prompts)
    
    def on_llm_end(self, response, **kwargs):
        self.subject.notify("llm_end", response=response)

OpenAI API Integration

class ObservedOpenAIClient:
    def __init__(self, observers=None):
        self.client = OpenAI()
        self.observers = observers or []
    
    def chat_completions_create(self, **kwargs):
        self.notify("request_start", **kwargs)
        response = self.client.chat.completions.create(**kwargs)
        self.notify("request_end", response=response)
        return response

Metrics and KPIs

Observer pattern enables tracking key metrics:

  • Cost Metrics: Total spend, cost per request, budget utilization

  • Performance Metrics: Response time, throughput, success rate

  • Usage Metrics: Popular models, peak hours, user patterns

  • Quality Metrics: Response quality scores, user satisfaction

Conclusion

The Observer Pattern is fundamental to building production-ready LLM applications. It enables:

  1. Observability: Real-time monitoring and debugging

  2. Flexibility: Add/remove monitoring without code changes

  3. Scalability: Handle increasing complexity through loose coupling

  4. Reliability: Isolate monitoring from core functionality


πŸ”— Interactive Implementation

πŸ““ Observer Pattern Notebook - Live implementation with real-time LLM monitoring, cost tracking, and performance analytics.

In LLM applications where costs, performance, and reliability are critical, the Observer Pattern provides the foundation for building observable, maintainable AI systems.

Last updated