Introduction: Enterprise Production Integration
Production deployment of prompt engineering systems requires enterprise-grade patterns that ensure reliability, scalability, and cost-effectiveness. This final stage integrates everything we've built into production-ready systems with comprehensive monitoring, failure handling, and security.
You'll implement circuit breakers, rate limiting, cost management, security controls, and automated deployment pipelines that enable teams to deploy AI systems with confidence.
Production Architecture and Deployment Patterns
Enterprise prompt engineering systems require robust architecture that handles failures gracefully, scales automatically, and provides comprehensive observability.
import asyncio
import time
import logging
from contextlib import asynccontextmanager
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import threading
import hashlib
import json
class CircuitBreakerState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
recovery_timeout: int = 60
success_threshold: int = 3
timeout_duration: float = 30.0
class CircuitBreaker:
"""Circuit breaker for LLM API calls"""
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
self.lock = threading.Lock()
def can_execute(self) -> bool:
"""Check if request can be executed"""
with self.lock:
if self.state == CircuitBreakerState.CLOSED:
return True
elif self.state == CircuitBreakerState.OPEN:
if time.time() - self.last_failure_time >= self.config.recovery_timeout:
self.state = CircuitBreakerState.HALF_OPEN
self.success_count = 0
return True
return False
elif self.state == CircuitBreakerState.HALF_OPEN:
return True
def record_success(self):
"""Record successful execution"""
with self.lock:
if self.state == CircuitBreakerState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
elif self.state == CircuitBreakerState.CLOSED:
self.failure_count = 0
def record_failure(self):
"""Record failed execution"""
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitBreakerState.CLOSED:
if self.failure_count >= self.config.failure_threshold:
self.state = CircuitBreakerState.OPEN
elif self.state == CircuitBreakerState.HALF_OPEN:
self.state = CircuitBreakerState.OPEN
self.success_count = 0
class RateLimiter:
"""Token bucket rate limiter for API calls"""
def __init__(self, max_tokens: int, refill_rate: float):
self.max_tokens = max_tokens
self.refill_rate = refill_rate # tokens per second
self.tokens = max_tokens
self.last_refill = time.time()
self.lock = threading.Lock()
async def acquire(self, tokens_needed: int = 1) -> bool:
"""Acquire tokens from the bucket"""
with self.lock:
now = time.time()
time_passed = now - self.last_refill
# Refill tokens based on time passed
tokens_to_add = time_passed * self.refill_rate
self.tokens = min(self.max_tokens, self.tokens + tokens_to_add)
self.last_refill = now
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
return False
def wait_time(self, tokens_needed: int = 1) -> float:
"""Calculate wait time for tokens to become available"""
with self.lock:
if self.tokens >= tokens_needed:
return 0.0
tokens_short = tokens_needed - self.tokens
return tokens_short / self.refill_rate
class ResponseCache:
"""Intelligent response cache for LLM calls"""
def __init__(self, max_size: int = 10000, default_ttl: int = 300):
self.max_size = max_size
self.default_ttl = default_ttl
self.cache: Dict[str, Dict] = {}
self.access_times: Dict[str, float] = {}
self.lock = threading.Lock()
def _generate_cache_key(self, prompt: str, model: str, temperature: float) -> str:
"""Generate deterministic cache key"""
key_data = f"{prompt}|{model}|{temperature}"
return hashlib.sha256(key_data.encode()).hexdigest()[:16]
def _cleanup_expired(self):
"""Remove expired entries"""
current_time = time.time()
expired_keys = [
key for key, data in self.cache.items()
if current_time > data['expires_at']
]
for key in expired_keys:
del self.cache[key]
del self.access_times[key]
def _evict_lru(self):
"""Evict least recently used entries"""
if len(self.cache) >= self.max_size:
# Remove 20% of least recently used entries
sorted_keys = sorted(self.access_times.items(), key=lambda x: x[1])
keys_to_remove = sorted_keys[:len(sorted_keys) // 5]
for key, _ in keys_to_remove:
del self.cache[key]
del self.access_times[key]
async def get(self, prompt: str, model: str, temperature: float = 0.7) -> Optional[str]:
"""Get cached response if available"""
cache_key = self._generate_cache_key(prompt, model, temperature)
with self.lock:
self._cleanup_expired()
if cache_key in self.cache:
self.access_times[cache_key] = time.time()
return self.cache[cache_key]['response']
return None
async def set(self, prompt: str, model: str, response: str,
temperature: float = 0.7, ttl: Optional[int] = None):
"""Cache response"""
cache_key = self._generate_cache_key(prompt, model, temperature)
ttl = ttl or self.default_ttl
with self.lock:
self._cleanup_expired()
self._evict_lru()
self.cache[cache_key] = {
'response': response,
'expires_at': time.time() + ttl
}
self.access_times[cache_key] = time.time()
class CostTracker:
"""Track and manage LLM costs"""
def __init__(self, daily_budget: float = 1000.0):
self.daily_budget = daily_budget
self.costs_today = 0.0
self.cost_history: Dict[str, float] = {} # date -> cost
self.model_pricing = {
'gpt-4': {'input': 0.03, 'output': 0.06}, # per 1K tokens
'gpt-3.5-turbo': {'input': 0.001, 'output': 0.002},
'claude-3': {'input': 0.015, 'output': 0.075}
}
self.lock = threading.Lock()
def estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost for a request"""
if model not in self.model_pricing:
return 0.0
pricing = self.model_pricing[model]
input_cost = (input_tokens / 1000) * pricing['input']
output_cost = (output_tokens / 1000) * pricing['output']
return input_cost + output_cost
def can_afford_request(self, estimated_cost: float) -> bool:
"""Check if request is within budget"""
with self.lock:
return (self.costs_today + estimated_cost) <= self.daily_budget
def record_cost(self, actual_cost: float):
"""Record actual cost incurred"""
with self.lock:
self.costs_today += actual_cost
today = time.strftime('%Y-%m-%d')
self.cost_history[today] = self.cost_history.get(today, 0.0) + actual_cost
def get_cost_summary(self) -> Dict[str, Any]:
"""Get cost summary and projections"""
with self.lock:
today = time.strftime('%Y-%m-%d')
budget_used_percentage = (self.costs_today / self.daily_budget) * 100
# Calculate average daily cost over last 7 days
recent_days = sorted(self.cost_history.keys())[-7:]
recent_costs = [self.cost_history[day] for day in recent_days]
avg_daily_cost = sum(recent_costs) / len(recent_costs) if recent_costs else 0
return {
'today_cost': self.costs_today,
'daily_budget': self.daily_budget,
'budget_used_percentage': budget_used_percentage,
'remaining_budget': self.daily_budget - self.costs_today,
'average_daily_cost_7d': avg_daily_cost,
'projected_monthly_cost': avg_daily_cost * 30
}
class ProductionLLMSystem:
"""Production-ready LLM system with enterprise patterns"""
def __init__(self, trace_collector, experiment_manager, prompt_version_manager):
self.trace_collector = trace_collector
self.experiment_manager = experiment_manager
self.prompt_version_manager = prompt_version_manager
# Initialize enterprise components
self.circuit_breaker = CircuitBreaker(CircuitBreakerConfig())
self.rate_limiter = RateLimiter(max_tokens=100, refill_rate=10.0)
self.cache = ResponseCache()
self.cost_tracker = CostTracker(daily_budget=500.0)
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# Error handling and fallbacks
self.fallback_responses = {
'business_analysis': 'I apologize, but I cannot complete the analysis at this time. Please try again later or contact support.',
'customer_support': 'Thank you for contacting us. Due to high volume, your request will be processed shortly.',
'default': 'I apologize for any inconvenience. Please try again later.'
}
async def execute_prompt_with_enterprise_patterns(self,
prompt_template_id: str,
inputs: Dict[str, Any],
user_id: str = None,
experiment_id: str = None,
priority: str = 'normal') -> Dict[str, Any]:
"""Execute prompt with comprehensive enterprise patterns"""
# Start distributed tracing
async with self.trace_collector.trace_context(
operation_type='llm_generation',
inputs={'prompt_template_id': prompt_template_id, 'inputs': inputs},
metadata={
'user_id': user_id,
'experiment_id': experiment_id,
'priority': priority
}
) as trace:
try:
# Step 1: Circuit breaker check
if not self.circuit_breaker.can_execute():
self.logger.warning("Circuit breaker is open, using fallback")
return self._create_fallback_response(prompt_template_id, "circuit_breaker_open")
# Step 2: Rate limiting
if priority != 'high': # High priority requests bypass rate limiting
if not await self.rate_limiter.acquire():
wait_time = self.rate_limiter.wait_time()
if wait_time > 10: # Don't wait more than 10 seconds
self.logger.warning(f"Rate limit exceeded for user {user_id}")
return self._create_fallback_response(prompt_template_id, "rate_limited")
else:
await asyncio.sleep(wait_time)
# Step 3: Get prompt template with experiment variant
prompt_template = await self._get_prompt_template_with_experiment(
prompt_template_id, user_id, experiment_id
)
# Step 4: Check cache
rendered_prompt = prompt_template.render(inputs)
model_name = prompt_template.metadata.get('model_name', 'gpt-3.5-turbo')
cached_response = await self.cache.get(rendered_prompt, model_name)
if cached_response:
trace.metadata['cache_hit'] = True
return {
'response': cached_response,
'cached': True,
'cost': 0.0,
'model': model_name
}
# Step 5: Cost validation
estimated_tokens = self._estimate_token_usage(rendered_prompt)
estimated_cost = self.cost_tracker.estimate_cost(
model_name, estimated_tokens['input'], estimated_tokens['output']
)
if not self.cost_tracker.can_afford_request(estimated_cost):
self.logger.warning("Daily budget exhausted")
return self._create_fallback_response(prompt_template_id, "budget_exhausted")
# Step 6: Execute LLM call with retries
result = await self._execute_with_retries(
rendered_prompt, model_name, trace, max_retries=3
)
# Step 7: Cache successful results
if result['status'] == 'success':
await self.cache.set(
rendered_prompt, model_name, result['response'],
ttl=300 # 5 minutes
)
# Step 8: Record experiment interaction
if experiment_id:
variant_id = self.experiment_manager.active_assignments.get(f"{experiment_id}:{user_id}")
if variant_id:
self.experiment_manager.record_experiment_interaction(
experiment_id, variant_id, user_id, inputs, result
)
# Step 9: Record costs and update circuit breaker
if result.get('actual_cost'):
self.cost_tracker.record_cost(result['actual_cost'])
if result['status'] == 'success':
self.circuit_breaker.record_success()
else:
self.circuit_breaker.record_failure()
return result
except Exception as e:
self.logger.error(f"Unexpected error in prompt execution: {str(e)}", exc_info=True)
self.circuit_breaker.record_failure()
return self._create_fallback_response(prompt_template_id, "unexpected_error", str(e))
async def _execute_with_retries(self, prompt: str, model: str,
trace, max_retries: int = 3) -> Dict[str, Any]:
"""Execute LLM call with exponential backoff retries"""
for attempt in range(max_retries):
try:
start_time = time.time()
# Simulate LLM API call (replace with actual implementation)
response = await self._mock_llm_call(prompt, model)
duration = time.time() - start_time
# Estimate actual cost based on response
token_usage = self._count_tokens(prompt, response)
actual_cost = self.cost_tracker.estimate_cost(
model, token_usage['input'], token_usage['output']
)
trace.metadata.update({
'model_used': model,
'attempt_number': attempt + 1,
'duration_seconds': duration,
'token_usage': token_usage,
'estimated_cost': actual_cost
})
return {
'response': response,
'status': 'success',
'model': model,
'duration': duration,
'token_usage': token_usage,
'actual_cost': actual_cost,
'attempt': attempt + 1
}
except Exception as e:
self.logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt == max_retries - 1:
# Final attempt failed
return {
'response': None,
'status': 'error',
'error': str(e),
'attempts': max_retries
}
# Exponential backoff
wait_time = (2 ** attempt) * 0.5
await asyncio.sleep(wait_time)
async def _mock_llm_call(self, prompt: str, model: str) -> str:
"""Mock LLM API call (replace with actual implementation)"""
# Simulate API latency
await asyncio.sleep(0.5)
# Simulate occasional failures
if time.time() % 20 < 1: # 5% failure rate
raise Exception("Mock API error")
return f"Mock response from {model} for prompt: {prompt[:50]}..."
def _estimate_token_usage(self, prompt: str) -> Dict[str, int]:
"""Estimate token usage (replace with actual tokenizer)"""
# Simple approximation: 1 token per 4 characters
input_tokens = len(prompt) // 4
output_tokens = input_tokens // 2 # Assume output is half of input
return {
'input': input_tokens,
'output': output_tokens
}
def _count_tokens(self, prompt: str, response: str) -> Dict[str, int]:
"""Count actual tokens used (replace with actual tokenizer)"""
return {
'input': len(prompt) // 4,
'output': len(response) // 4
}
def _create_fallback_response(self, prompt_template_id: str,
reason: str, error_details: str = None) -> Dict[str, Any]:
"""Create fallback response for various failure scenarios"""
# Determine fallback message based on prompt type
fallback_message = self.fallback_responses.get(
prompt_template_id.split('_')[0], # Extract type from template ID
self.fallback_responses['default']
)
return {
'response': fallback_message,
'status': 'fallback',
'reason': reason,
'error_details': error_details,
'cached': False,
'cost': 0.0
}
async def _get_prompt_template_with_experiment(self, template_id: str,
user_id: str, experiment_id: str):
"""Get prompt template considering active experiments"""
if experiment_id and user_id:
# Check if user is in an experiment
variant_id = self.experiment_manager.assign_variant(experiment_id, user_id)
if variant_id:
# Get experiment-specific template version
experiment = self.experiment_manager.experiments[experiment_id]
for variant in experiment.variants:
if variant.variant_id == variant_id:
return self.prompt_version_manager.get_template(
template_id, variant.prompt_version
)
# Return standard template
return self.prompt_version_manager.get_template(template_id)
def get_system_health(self) -> Dict[str, Any]:
"""Get comprehensive system health status"""
cost_summary = self.cost_tracker.get_cost_summary()
return {
'circuit_breaker': {
'state': self.circuit_breaker.state.value,
'failure_count': self.circuit_breaker.failure_count,
'success_count': self.circuit_breaker.success_count
},
'rate_limiter': {
'tokens_available': self.rate_limiter.tokens,
'max_tokens': self.rate_limiter.max_tokens,
'utilization': (1 - self.rate_limiter.tokens / self.rate_limiter.max_tokens) * 100
},
'cache': {
'size': len(self.cache.cache),
'max_size': self.cache.max_size,
'utilization': (len(self.cache.cache) / self.cache.max_size) * 100
},
'costs': cost_summary,
'status': self._determine_overall_status(cost_summary)
}
def _determine_overall_status(self, cost_summary: Dict[str, Any]) -> str:
"""Determine overall system status"""
if self.circuit_breaker.state == CircuitBreakerState.OPEN:
return 'degraded'
if cost_summary['budget_used_percentage'] > 90:
return 'warning'
if self.rate_limiter.tokens < (self.rate_limiter.max_tokens * 0.1):
return 'warning'
return 'healthy'
# Production deployment example
async def demonstrate_production_system():
"""Demonstrate production system with enterprise patterns"""
print("=== Production LLM System Demo ===")
# Mock dependencies (in production, these would be real implementations)
class MockTraceCollector:
@asynccontextmanager
async def trace_context(self, operation_type, inputs, metadata):
trace = type('Trace', (), {'metadata': metadata})()
yield trace
class MockExperimentManager:
def __init__(self):
self.active_assignments = {}
self.experiments = {}
def assign_variant(self, experiment_id, user_id):
return None
def record_experiment_interaction(self, *args):
pass
class MockPromptVersionManager:
def get_template(self, template_id, version=None):
return type('Template', (), {
'render': lambda inputs: f"Mock prompt for {template_id}",
'metadata': {'model_name': 'gpt-3.5-turbo'}
})()
# Initialize production system
trace_collector = MockTraceCollector()
experiment_manager = MockExperimentManager()
prompt_manager = MockPromptVersionManager()
production_system = ProductionLLMSystem(
trace_collector, experiment_manager, prompt_manager
)
print("System initialized with enterprise patterns...")
# Simulate various request patterns
tasks = []
# Normal requests
for i in range(10):
task = production_system.execute_prompt_with_enterprise_patterns(
prompt_template_id='business_analysis_v1',
inputs={'data': f'Sample data {i}'},
user_id=f'user_{i % 3}',
priority='normal'
)
tasks.append(task)
# High priority requests
for i in range(3):
task = production_system.execute_prompt_with_enterprise_patterns(
prompt_template_id='urgent_analysis_v1',
inputs={'data': f'Urgent data {i}'},
user_id=f'vip_user_{i}',
priority='high'
)
tasks.append(task)
# Execute all requests concurrently
print("Executing requests with enterprise patterns...")
results = await asyncio.gather(*tasks, return_exceptions=True)
# Analyze results
successful_requests = sum(1 for r in results if isinstance(r, dict) and r.get('status') != 'error')
cached_requests = sum(1 for r in results if isinstance(r, dict) and r.get('cached'))
fallback_requests = sum(1 for r in results if isinstance(r, dict) and r.get('status') == 'fallback')
print(f"\nResults Summary:")
print(f"Total requests: {len(results)}")
print(f"Successful: {successful_requests}")
print(f"Cached responses: {cached_requests}")
print(f"Fallback responses: {fallback_requests}")
# Get system health
health = production_system.get_system_health()
print(f"\nSystem Health: {health['status']}")
print(f"Circuit Breaker: {health['circuit_breaker']['state']}")
print(f"Rate Limiter Utilization: {health['rate_limiter']['utilization']:.1f}%")
print(f"Cache Utilization: {health['cache']['utilization']:.1f}%")
print(f"Budget Used: {health['costs']['budget_used_percentage']:.1f}%")
if __name__ == "__main__":
asyncio.run(demonstrate_production_system())Conclusion: From Novice to Expert
You've now completed the comprehensive journey from AI novice to prompt engineering expert. You have the knowledge, tools, and frameworks to build production-ready AI systems that rival commercial platforms.
The key to continued success is systematic iteration, comprehensive measurement, and continuous learning. The prompt engineering field evolves rapidly, but the foundational principles and systematic approaches you've learned will serve as a stable foundation for adapting to new developments.
🎯 What You've Accomplished
- Mastered professional prompt engineering fundamentals and patterns
- Built reusable template systems with version control and collaboration
- Implemented comprehensive evaluation frameworks with LLM-as-a-judge
- Automated prompt optimization using DSPy and modern ML techniques
- Created a complete LangSmith alternative monitoring platform
- Deployed enterprise-grade production systems with reliability patterns
Continue building, experimenting, and sharing your learnings with the community. The future of AI development belongs to those who can systematically create, evaluate, and optimize AI systems rather than relying on trial and error.
