Stage 6: Production Integration & Scaling

Stage 6: Production Integration & Scaling

Prompt Engineering
AI
DSPy
Production AI
OpenAI
LLM
Python
2025-09-13

Introduction: Enterprise Production Integration

Production deployment of prompt engineering systems requires enterprise-grade patterns that ensure reliability, scalability, and cost-effectiveness. This final stage integrates everything we've built into production-ready systems with comprehensive monitoring, failure handling, and security.

Complete Learning Journey: Novice to Expert

You'll implement circuit breakers, rate limiting, cost management, security controls, and automated deployment pipelines that enable teams to deploy AI systems with confidence.

Production Architecture and Deployment Patterns

Enterprise prompt engineering systems require robust architecture that handles failures gracefully, scales automatically, and provides comprehensive observability.

Production LLM System with Enterprise Patterns
import asyncio import time import logging from contextlib import asynccontextmanager from typing import Dict, List, Any, Optional, Callable from dataclasses import dataclass from enum import Enum import threading import hashlib import json class CircuitBreakerState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject requests HALF_OPEN = "half_open" # Testing if service recovered @dataclass class CircuitBreakerConfig: failure_threshold: int = 5 recovery_timeout: int = 60 success_threshold: int = 3 timeout_duration: float = 30.0 class CircuitBreaker: """Circuit breaker for LLM API calls""" def __init__(self, config: CircuitBreakerConfig): self.config = config self.state = CircuitBreakerState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = 0 self.lock = threading.Lock() def can_execute(self) -> bool: """Check if request can be executed""" with self.lock: if self.state == CircuitBreakerState.CLOSED: return True elif self.state == CircuitBreakerState.OPEN: if time.time() - self.last_failure_time >= self.config.recovery_timeout: self.state = CircuitBreakerState.HALF_OPEN self.success_count = 0 return True return False elif self.state == CircuitBreakerState.HALF_OPEN: return True def record_success(self): """Record successful execution""" with self.lock: if self.state == CircuitBreakerState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.config.success_threshold: self.state = CircuitBreakerState.CLOSED self.failure_count = 0 elif self.state == CircuitBreakerState.CLOSED: self.failure_count = 0 def record_failure(self): """Record failed execution""" with self.lock: self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitBreakerState.CLOSED: if self.failure_count >= self.config.failure_threshold: self.state = CircuitBreakerState.OPEN elif self.state == CircuitBreakerState.HALF_OPEN: self.state = CircuitBreakerState.OPEN self.success_count = 0 class RateLimiter: """Token bucket rate limiter for API calls""" def __init__(self, max_tokens: int, refill_rate: float): self.max_tokens = max_tokens self.refill_rate = refill_rate # tokens per second self.tokens = max_tokens self.last_refill = time.time() self.lock = threading.Lock() async def acquire(self, tokens_needed: int = 1) -> bool: """Acquire tokens from the bucket""" with self.lock: now = time.time() time_passed = now - self.last_refill # Refill tokens based on time passed tokens_to_add = time_passed * self.refill_rate self.tokens = min(self.max_tokens, self.tokens + tokens_to_add) self.last_refill = now if self.tokens >= tokens_needed: self.tokens -= tokens_needed return True return False def wait_time(self, tokens_needed: int = 1) -> float: """Calculate wait time for tokens to become available""" with self.lock: if self.tokens >= tokens_needed: return 0.0 tokens_short = tokens_needed - self.tokens return tokens_short / self.refill_rate class ResponseCache: """Intelligent response cache for LLM calls""" def __init__(self, max_size: int = 10000, default_ttl: int = 300): self.max_size = max_size self.default_ttl = default_ttl self.cache: Dict[str, Dict] = {} self.access_times: Dict[str, float] = {} self.lock = threading.Lock() def _generate_cache_key(self, prompt: str, model: str, temperature: float) -> str: """Generate deterministic cache key""" key_data = f"{prompt}|{model}|{temperature}" return hashlib.sha256(key_data.encode()).hexdigest()[:16] def _cleanup_expired(self): """Remove expired entries""" current_time = time.time() expired_keys = [ key for key, data in self.cache.items() if current_time > data['expires_at'] ] for key in expired_keys: del self.cache[key] del self.access_times[key] def _evict_lru(self): """Evict least recently used entries""" if len(self.cache) >= self.max_size: # Remove 20% of least recently used entries sorted_keys = sorted(self.access_times.items(), key=lambda x: x[1]) keys_to_remove = sorted_keys[:len(sorted_keys) // 5] for key, _ in keys_to_remove: del self.cache[key] del self.access_times[key] async def get(self, prompt: str, model: str, temperature: float = 0.7) -> Optional[str]: """Get cached response if available""" cache_key = self._generate_cache_key(prompt, model, temperature) with self.lock: self._cleanup_expired() if cache_key in self.cache: self.access_times[cache_key] = time.time() return self.cache[cache_key]['response'] return None async def set(self, prompt: str, model: str, response: str, temperature: float = 0.7, ttl: Optional[int] = None): """Cache response""" cache_key = self._generate_cache_key(prompt, model, temperature) ttl = ttl or self.default_ttl with self.lock: self._cleanup_expired() self._evict_lru() self.cache[cache_key] = { 'response': response, 'expires_at': time.time() + ttl } self.access_times[cache_key] = time.time() class CostTracker: """Track and manage LLM costs""" def __init__(self, daily_budget: float = 1000.0): self.daily_budget = daily_budget self.costs_today = 0.0 self.cost_history: Dict[str, float] = {} # date -> cost self.model_pricing = { 'gpt-4': {'input': 0.03, 'output': 0.06}, # per 1K tokens 'gpt-3.5-turbo': {'input': 0.001, 'output': 0.002}, 'claude-3': {'input': 0.015, 'output': 0.075} } self.lock = threading.Lock() def estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float: """Estimate cost for a request""" if model not in self.model_pricing: return 0.0 pricing = self.model_pricing[model] input_cost = (input_tokens / 1000) * pricing['input'] output_cost = (output_tokens / 1000) * pricing['output'] return input_cost + output_cost def can_afford_request(self, estimated_cost: float) -> bool: """Check if request is within budget""" with self.lock: return (self.costs_today + estimated_cost) <= self.daily_budget def record_cost(self, actual_cost: float): """Record actual cost incurred""" with self.lock: self.costs_today += actual_cost today = time.strftime('%Y-%m-%d') self.cost_history[today] = self.cost_history.get(today, 0.0) + actual_cost def get_cost_summary(self) -> Dict[str, Any]: """Get cost summary and projections""" with self.lock: today = time.strftime('%Y-%m-%d') budget_used_percentage = (self.costs_today / self.daily_budget) * 100 # Calculate average daily cost over last 7 days recent_days = sorted(self.cost_history.keys())[-7:] recent_costs = [self.cost_history[day] for day in recent_days] avg_daily_cost = sum(recent_costs) / len(recent_costs) if recent_costs else 0 return { 'today_cost': self.costs_today, 'daily_budget': self.daily_budget, 'budget_used_percentage': budget_used_percentage, 'remaining_budget': self.daily_budget - self.costs_today, 'average_daily_cost_7d': avg_daily_cost, 'projected_monthly_cost': avg_daily_cost * 30 } class ProductionLLMSystem: """Production-ready LLM system with enterprise patterns""" def __init__(self, trace_collector, experiment_manager, prompt_version_manager): self.trace_collector = trace_collector self.experiment_manager = experiment_manager self.prompt_version_manager = prompt_version_manager # Initialize enterprise components self.circuit_breaker = CircuitBreaker(CircuitBreakerConfig()) self.rate_limiter = RateLimiter(max_tokens=100, refill_rate=10.0) self.cache = ResponseCache() self.cost_tracker = CostTracker(daily_budget=500.0) # Setup logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) # Error handling and fallbacks self.fallback_responses = { 'business_analysis': 'I apologize, but I cannot complete the analysis at this time. Please try again later or contact support.', 'customer_support': 'Thank you for contacting us. Due to high volume, your request will be processed shortly.', 'default': 'I apologize for any inconvenience. Please try again later.' } async def execute_prompt_with_enterprise_patterns(self, prompt_template_id: str, inputs: Dict[str, Any], user_id: str = None, experiment_id: str = None, priority: str = 'normal') -> Dict[str, Any]: """Execute prompt with comprehensive enterprise patterns""" # Start distributed tracing async with self.trace_collector.trace_context( operation_type='llm_generation', inputs={'prompt_template_id': prompt_template_id, 'inputs': inputs}, metadata={ 'user_id': user_id, 'experiment_id': experiment_id, 'priority': priority } ) as trace: try: # Step 1: Circuit breaker check if not self.circuit_breaker.can_execute(): self.logger.warning("Circuit breaker is open, using fallback") return self._create_fallback_response(prompt_template_id, "circuit_breaker_open") # Step 2: Rate limiting if priority != 'high': # High priority requests bypass rate limiting if not await self.rate_limiter.acquire(): wait_time = self.rate_limiter.wait_time() if wait_time > 10: # Don't wait more than 10 seconds self.logger.warning(f"Rate limit exceeded for user {user_id}") return self._create_fallback_response(prompt_template_id, "rate_limited") else: await asyncio.sleep(wait_time) # Step 3: Get prompt template with experiment variant prompt_template = await self._get_prompt_template_with_experiment( prompt_template_id, user_id, experiment_id ) # Step 4: Check cache rendered_prompt = prompt_template.render(inputs) model_name = prompt_template.metadata.get('model_name', 'gpt-3.5-turbo') cached_response = await self.cache.get(rendered_prompt, model_name) if cached_response: trace.metadata['cache_hit'] = True return { 'response': cached_response, 'cached': True, 'cost': 0.0, 'model': model_name } # Step 5: Cost validation estimated_tokens = self._estimate_token_usage(rendered_prompt) estimated_cost = self.cost_tracker.estimate_cost( model_name, estimated_tokens['input'], estimated_tokens['output'] ) if not self.cost_tracker.can_afford_request(estimated_cost): self.logger.warning("Daily budget exhausted") return self._create_fallback_response(prompt_template_id, "budget_exhausted") # Step 6: Execute LLM call with retries result = await self._execute_with_retries( rendered_prompt, model_name, trace, max_retries=3 ) # Step 7: Cache successful results if result['status'] == 'success': await self.cache.set( rendered_prompt, model_name, result['response'], ttl=300 # 5 minutes ) # Step 8: Record experiment interaction if experiment_id: variant_id = self.experiment_manager.active_assignments.get(f"{experiment_id}:{user_id}") if variant_id: self.experiment_manager.record_experiment_interaction( experiment_id, variant_id, user_id, inputs, result ) # Step 9: Record costs and update circuit breaker if result.get('actual_cost'): self.cost_tracker.record_cost(result['actual_cost']) if result['status'] == 'success': self.circuit_breaker.record_success() else: self.circuit_breaker.record_failure() return result except Exception as e: self.logger.error(f"Unexpected error in prompt execution: {str(e)}", exc_info=True) self.circuit_breaker.record_failure() return self._create_fallback_response(prompt_template_id, "unexpected_error", str(e)) async def _execute_with_retries(self, prompt: str, model: str, trace, max_retries: int = 3) -> Dict[str, Any]: """Execute LLM call with exponential backoff retries""" for attempt in range(max_retries): try: start_time = time.time() # Simulate LLM API call (replace with actual implementation) response = await self._mock_llm_call(prompt, model) duration = time.time() - start_time # Estimate actual cost based on response token_usage = self._count_tokens(prompt, response) actual_cost = self.cost_tracker.estimate_cost( model, token_usage['input'], token_usage['output'] ) trace.metadata.update({ 'model_used': model, 'attempt_number': attempt + 1, 'duration_seconds': duration, 'token_usage': token_usage, 'estimated_cost': actual_cost }) return { 'response': response, 'status': 'success', 'model': model, 'duration': duration, 'token_usage': token_usage, 'actual_cost': actual_cost, 'attempt': attempt + 1 } except Exception as e: self.logger.warning(f"Attempt {attempt + 1} failed: {str(e)}") if attempt == max_retries - 1: # Final attempt failed return { 'response': None, 'status': 'error', 'error': str(e), 'attempts': max_retries } # Exponential backoff wait_time = (2 ** attempt) * 0.5 await asyncio.sleep(wait_time) async def _mock_llm_call(self, prompt: str, model: str) -> str: """Mock LLM API call (replace with actual implementation)""" # Simulate API latency await asyncio.sleep(0.5) # Simulate occasional failures if time.time() % 20 < 1: # 5% failure rate raise Exception("Mock API error") return f"Mock response from {model} for prompt: {prompt[:50]}..." def _estimate_token_usage(self, prompt: str) -> Dict[str, int]: """Estimate token usage (replace with actual tokenizer)""" # Simple approximation: 1 token per 4 characters input_tokens = len(prompt) // 4 output_tokens = input_tokens // 2 # Assume output is half of input return { 'input': input_tokens, 'output': output_tokens } def _count_tokens(self, prompt: str, response: str) -> Dict[str, int]: """Count actual tokens used (replace with actual tokenizer)""" return { 'input': len(prompt) // 4, 'output': len(response) // 4 } def _create_fallback_response(self, prompt_template_id: str, reason: str, error_details: str = None) -> Dict[str, Any]: """Create fallback response for various failure scenarios""" # Determine fallback message based on prompt type fallback_message = self.fallback_responses.get( prompt_template_id.split('_')[0], # Extract type from template ID self.fallback_responses['default'] ) return { 'response': fallback_message, 'status': 'fallback', 'reason': reason, 'error_details': error_details, 'cached': False, 'cost': 0.0 } async def _get_prompt_template_with_experiment(self, template_id: str, user_id: str, experiment_id: str): """Get prompt template considering active experiments""" if experiment_id and user_id: # Check if user is in an experiment variant_id = self.experiment_manager.assign_variant(experiment_id, user_id) if variant_id: # Get experiment-specific template version experiment = self.experiment_manager.experiments[experiment_id] for variant in experiment.variants: if variant.variant_id == variant_id: return self.prompt_version_manager.get_template( template_id, variant.prompt_version ) # Return standard template return self.prompt_version_manager.get_template(template_id) def get_system_health(self) -> Dict[str, Any]: """Get comprehensive system health status""" cost_summary = self.cost_tracker.get_cost_summary() return { 'circuit_breaker': { 'state': self.circuit_breaker.state.value, 'failure_count': self.circuit_breaker.failure_count, 'success_count': self.circuit_breaker.success_count }, 'rate_limiter': { 'tokens_available': self.rate_limiter.tokens, 'max_tokens': self.rate_limiter.max_tokens, 'utilization': (1 - self.rate_limiter.tokens / self.rate_limiter.max_tokens) * 100 }, 'cache': { 'size': len(self.cache.cache), 'max_size': self.cache.max_size, 'utilization': (len(self.cache.cache) / self.cache.max_size) * 100 }, 'costs': cost_summary, 'status': self._determine_overall_status(cost_summary) } def _determine_overall_status(self, cost_summary: Dict[str, Any]) -> str: """Determine overall system status""" if self.circuit_breaker.state == CircuitBreakerState.OPEN: return 'degraded' if cost_summary['budget_used_percentage'] > 90: return 'warning' if self.rate_limiter.tokens < (self.rate_limiter.max_tokens * 0.1): return 'warning' return 'healthy' # Production deployment example async def demonstrate_production_system(): """Demonstrate production system with enterprise patterns""" print("=== Production LLM System Demo ===") # Mock dependencies (in production, these would be real implementations) class MockTraceCollector: @asynccontextmanager async def trace_context(self, operation_type, inputs, metadata): trace = type('Trace', (), {'metadata': metadata})() yield trace class MockExperimentManager: def __init__(self): self.active_assignments = {} self.experiments = {} def assign_variant(self, experiment_id, user_id): return None def record_experiment_interaction(self, *args): pass class MockPromptVersionManager: def get_template(self, template_id, version=None): return type('Template', (), { 'render': lambda inputs: f"Mock prompt for {template_id}", 'metadata': {'model_name': 'gpt-3.5-turbo'} })() # Initialize production system trace_collector = MockTraceCollector() experiment_manager = MockExperimentManager() prompt_manager = MockPromptVersionManager() production_system = ProductionLLMSystem( trace_collector, experiment_manager, prompt_manager ) print("System initialized with enterprise patterns...") # Simulate various request patterns tasks = [] # Normal requests for i in range(10): task = production_system.execute_prompt_with_enterprise_patterns( prompt_template_id='business_analysis_v1', inputs={'data': f'Sample data {i}'}, user_id=f'user_{i % 3}', priority='normal' ) tasks.append(task) # High priority requests for i in range(3): task = production_system.execute_prompt_with_enterprise_patterns( prompt_template_id='urgent_analysis_v1', inputs={'data': f'Urgent data {i}'}, user_id=f'vip_user_{i}', priority='high' ) tasks.append(task) # Execute all requests concurrently print("Executing requests with enterprise patterns...") results = await asyncio.gather(*tasks, return_exceptions=True) # Analyze results successful_requests = sum(1 for r in results if isinstance(r, dict) and r.get('status') != 'error') cached_requests = sum(1 for r in results if isinstance(r, dict) and r.get('cached')) fallback_requests = sum(1 for r in results if isinstance(r, dict) and r.get('status') == 'fallback') print(f"\nResults Summary:") print(f"Total requests: {len(results)}") print(f"Successful: {successful_requests}") print(f"Cached responses: {cached_requests}") print(f"Fallback responses: {fallback_requests}") # Get system health health = production_system.get_system_health() print(f"\nSystem Health: {health['status']}") print(f"Circuit Breaker: {health['circuit_breaker']['state']}") print(f"Rate Limiter Utilization: {health['rate_limiter']['utilization']:.1f}%") print(f"Cache Utilization: {health['cache']['utilization']:.1f}%") print(f"Budget Used: {health['costs']['budget_used_percentage']:.1f}%") if __name__ == "__main__": asyncio.run(demonstrate_production_system())

Conclusion: From Novice to Expert

You've now completed the comprehensive journey from AI novice to prompt engineering expert. You have the knowledge, tools, and frameworks to build production-ready AI systems that rival commercial platforms.

The key to continued success is systematic iteration, comprehensive measurement, and continuous learning. The prompt engineering field evolves rapidly, but the foundational principles and systematic approaches you've learned will serve as a stable foundation for adapting to new developments.

  • Mastered professional prompt engineering fundamentals and patterns
  • Built reusable template systems with version control and collaboration
  • Implemented comprehensive evaluation frameworks with LLM-as-a-judge
  • Automated prompt optimization using DSPy and modern ML techniques
  • Created a complete LangSmith alternative monitoring platform
  • Deployed enterprise-grade production systems with reliability patterns

Continue building, experimenting, and sharing your learnings with the community. The future of AI development belongs to those who can systematically create, evaluate, and optimize AI systems rather than relying on trial and error.