Introduction: From Ad-Hoc to Systematic
Stage 1 established the fundamentals of effective prompting. Now we evolve from creating individual prompts to building systematic, reusable template systems that scale across teams and applications. Professional prompt engineering requires treating prompts like code: versioned, tested, maintainable, and collaborative.
Template design transforms prompt engineering from an art into an engineering discipline. You'll learn to build prompt libraries that support parameter validation, inheritance patterns, version control, and team collaboration - the foundation for enterprise-grade AI applications.
This stage covers advanced template design patterns, version control systems, parameter management, and collaborative workflows that enable teams to build and maintain sophisticated prompt libraries.
Building Reusable Prompt Templates
Professional template systems provide consistency, maintainability, and scalability. They separate content from structure, enable parameter validation, and support complex composition patterns.
Template Architecture Design
Effective template systems use layered architecture with base templates, specialized templates, and parameter management:
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from enum import Enum
import json
import jsonschema
from datetime import datetime
import hashlib
class TemplateType(Enum):
BASE = "base"
SPECIALIZED = "specialized"
COMPOSITE = "composite"
class ValidationLevel(Enum):
STRICT = "strict"
LENIENT = "lenient"
DISABLED = "disabled"
@dataclass
class TemplateMetadata:
"""Metadata for tracking template properties and history"""
name: str
version: str
author: str
created_at: datetime
description: str
template_type: TemplateType
parent_template: Optional[str] = None
tags: List[str] = field(default_factory=list)
validation_level: ValidationLevel = ValidationLevel.STRICT
usage_count: int = 0
last_modified: Optional[datetime] = None
@dataclass
class ParameterDefinition:
"""Definition for template parameters with validation rules"""
name: str
parameter_type: str # string, number, array, object, boolean
description: str
required: bool = True
default_value: Any = None
validation_schema: Optional[Dict] = None
examples: List[Any] = field(default_factory=list)
sensitive: bool = False # For PII or secrets
class PromptTemplate(ABC):
"""Base class for all prompt templates"""
def __init__(self, metadata: TemplateMetadata):
self.metadata = metadata
self.parameters: Dict[str, ParameterDefinition] = {}
self.template_content: str = ""
self.validation_enabled = True
@abstractmethod
def define_parameters(self) -> Dict[str, ParameterDefinition]:
"""Define the parameters this template accepts"""
pass
@abstractmethod
def get_template_content(self) -> str:
"""Return the template content with placeholders"""
pass
def add_parameter(self, param: ParameterDefinition):
"""Add a parameter definition to this template"""
self.parameters[param.name] = param
def validate_parameters(self, provided_params: Dict[str, Any]) -> Dict[str, Any]:
"""Validate provided parameters against template requirements"""
if not self.validation_enabled:
return provided_params
validated_params = {}
errors = []
# Check required parameters
for param_name, param_def in self.parameters.items():
if param_def.required and param_name not in provided_params:
if param_def.default_value is not None:
validated_params[param_name] = param_def.default_value
else:
errors.append(f"Required parameter '{param_name}' is missing")
continue
if param_name in provided_params:
value = provided_params[param_name]
# Type validation
if not self._validate_parameter_type(value, param_def):
errors.append(f"Parameter '{param_name}' has invalid type. Expected: {param_def.parameter_type}")
continue
# Schema validation if provided
if param_def.validation_schema:
try:
jsonschema.validate(value, param_def.validation_schema)
except jsonschema.ValidationError as e:
errors.append(f"Parameter '{param_name}' validation failed: {e.message}")
continue
validated_params[param_name] = value
# Check for unexpected parameters
unexpected_params = set(provided_params.keys()) - set(self.parameters.keys())
if unexpected_params and self.metadata.validation_level == ValidationLevel.STRICT:
errors.append(f"Unexpected parameters: {', '.join(unexpected_params)}")
if errors:
raise ValueError(f"Parameter validation failed: {'; '.join(errors)}")
return validated_params
def _validate_parameter_type(self, value: Any, param_def: ParameterDefinition) -> bool:
"""Validate parameter type matches expected type"""
type_mapping = {
'string': str,
'number': (int, float),
'boolean': bool,
'array': list,
'object': dict
}
expected_type = type_mapping.get(param_def.parameter_type)
if expected_type is None:
return True # Unknown type, skip validation
return isinstance(value, expected_type)
def render(self, parameters: Dict[str, Any]) -> str:
"""Render the template with provided parameters"""
validated_params = self.validate_parameters(parameters)
template_content = self.get_template_content()
try:
rendered = template_content.format(**validated_params)
self.metadata.usage_count += 1
return rendered
except KeyError as e:
raise ValueError(f"Template rendering failed: Missing placeholder {e}")
except Exception as e:
raise ValueError(f"Template rendering failed: {str(e)}")
def get_parameter_documentation(self) -> Dict[str, Any]:
"""Generate documentation for template parameters"""
docs = {
'template_name': self.metadata.name,
'version': self.metadata.version,
'description': self.metadata.description,
'parameters': {}
}
for param_name, param_def in self.parameters.items():
docs['parameters'][param_name] = {
'type': param_def.parameter_type,
'description': param_def.description,
'required': param_def.required,
'default': param_def.default_value,
'examples': param_def.examples,
'sensitive': param_def.sensitive
}
return docs
class BusinessAnalysisTemplate(PromptTemplate):
"""Specialized template for business analysis tasks"""
def __init__(self):
metadata = TemplateMetadata(
name="business_analysis_v2",
version="2.1.0",
author="prompt_engineering_team",
created_at=datetime.now(),
description="Professional business analysis template with configurable depth and focus",
template_type=TemplateType.SPECIALIZED,
tags=["business", "analysis", "executive", "strategic"]
)
super().__init__(metadata)
self.parameters = self.define_parameters()
def define_parameters(self) -> Dict[str, ParameterDefinition]:
return {
'analyst_role': ParameterDefinition(
name='analyst_role',
parameter_type='string',
description='The type of analyst conducting the analysis',
required=True,
examples=['senior business analyst', 'data scientist', 'strategy consultant'],
validation_schema={
'type': 'string',
'minLength': 3,
'maxLength': 100
}
),
'target_audience': ParameterDefinition(
name='target_audience',
parameter_type='string',
description='Who will receive this analysis',
required=True,
examples=['C-level executives', 'board of directors', 'department heads'],
validation_schema={
'type': 'string',
'minLength': 3,
'maxLength': 100
}
),
'analysis_type': ParameterDefinition(
name='analysis_type',
parameter_type='string',
description='Type of analysis to perform',
required=True,
examples=['competitive analysis', 'market research', 'performance review', 'strategic planning'],
validation_schema={
'type': 'string',
'enum': ['competitive', 'market', 'performance', 'strategic', 'financial', 'operational']
}
),
'data_context': ParameterDefinition(
name='data_context',
parameter_type='string',
description='Description of the data being analyzed',
required=True,
examples=['Q3 sales performance data', 'customer satisfaction survey results'],
validation_schema={'type': 'string', 'minLength': 10}
),
'key_questions': ParameterDefinition(
name='key_questions',
parameter_type='array',
description='Specific questions the analysis should address',
required=True,
examples=[["What are our top growth opportunities?", "Where are we losing market share?"]],
validation_schema={
'type': 'array',
'items': {'type': 'string', 'minLength': 5},
'minItems': 1,
'maxItems': 5
}
),
'output_length': ParameterDefinition(
name='output_length',
parameter_type='string',
description='Desired length of the analysis output',
required=False,
default_value='medium',
examples=['brief', 'medium', 'detailed'],
validation_schema={
'type': 'string',
'enum': ['brief', 'medium', 'detailed']
}
),
'include_recommendations': ParameterDefinition(
name='include_recommendations',
parameter_type='boolean',
description='Whether to include actionable recommendations',
required=False,
default_value=True
),
'urgency_level': ParameterDefinition(
name='urgency_level',
parameter_type='string',
description='How urgent the analysis is for decision-making',
required=False,
default_value='medium',
validation_schema={
'type': 'string',
'enum': ['low', 'medium', 'high', 'critical']
}
)
}
def get_template_content(self) -> str:
return '''CONTEXT: You are a {analyst_role} preparing a {analysis_type} analysis for {target_audience}. This analysis is {urgency_level} priority and will directly inform strategic decision-making.
DATA CONTEXT: {data_context}
KEY QUESTIONS TO ADDRESS:
{key_questions_formatted}
ANALYSIS REQUIREMENTS:
- Depth Level: {output_length}
- Include Actionable Recommendations: {include_recommendations}
- Focus on insights that require immediate executive attention
- Provide specific, quantified findings where possible
- Highlight any data limitations or assumptions
OUTPUT STRUCTURE:
1. Executive Summary (2-3 key findings)
2. Detailed Analysis (addressing each key question)
{recommendations_section}
4. Confidence Levels and Data Quality Assessment
5. Next Steps and Monitoring Recommendations
FORMATTING GUIDELINES:
- Use bullet points for key findings
- Include specific numbers and percentages where available
- Highlight critical insights with clear impact statements
- Maintain professional tone appropriate for {target_audience}
Begin your analysis:'''
def render(self, parameters: Dict[str, Any]) -> str:
"""Custom rendering with parameter preprocessing"""
validated_params = self.validate_parameters(parameters)
# Format key questions as numbered list
questions = validated_params['key_questions']
formatted_questions = '\n'.join([f"{i+1}. {q}" for i, q in enumerate(questions)])
validated_params['key_questions_formatted'] = formatted_questions
# Conditional recommendations section
if validated_params.get('include_recommendations', True):
validated_params['recommendations_section'] = "3. Strategic Recommendations with Implementation Timeline"
else:
validated_params['recommendations_section'] = "3. Additional Analysis and Context"
template_content = self.get_template_content()
rendered = template_content.format(**validated_params)
self.metadata.usage_count += 1
return rendered
# Usage example
def demonstrate_template_usage():
"""Example of using the business analysis template"""
template = BusinessAnalysisTemplate()
# Print template documentation
docs = template.get_parameter_documentation()
print("Template Documentation:")
print(json.dumps(docs, indent=2, default=str))
# Example parameters
params = {
'analyst_role': 'senior strategy consultant',
'target_audience': 'C-level executives',
'analysis_type': 'competitive',
'data_context': 'Q3 market share data and competitor pricing analysis',
'key_questions': [
'How has our market position changed compared to last quarter?',
'What competitive threats require immediate response?',
'Where can we gain market share most effectively?'
],
'output_length': 'detailed',
'urgency_level': 'high'
}
# Render the template
try:
rendered_prompt = template.render(params)
print("\nRendered Prompt:")
print(rendered_prompt)
print(f"\nTemplate Usage Count: {template.metadata.usage_count}")
except ValueError as e:
print(f"Template rendering failed: {e}")
if __name__ == "__main__":
demonstrate_template_usage()Template Inheritance Patterns
Inheritance allows you to create template hierarchies that share common structures while enabling specialization:
class BaseAnalysisTemplate(PromptTemplate):
"""Base template for all analysis tasks"""
def __init__(self, metadata: TemplateMetadata):
super().__init__(metadata)
self.add_common_parameters()
def add_common_parameters(self):
"""Add parameters common to all analysis templates"""
common_params = {
'context_role': ParameterDefinition(
name='context_role',
parameter_type='string',
description='Professional role for the analysis context',
required=True,
examples=['business analyst', 'data scientist', 'consultant']
),
'output_format': ParameterDefinition(
name='output_format',
parameter_type='string',
description='Desired output format',
required=False,
default_value='structured',
validation_schema={'type': 'string', 'enum': ['structured', 'narrative', 'executive_summary']}
),
'confidence_reporting': ParameterDefinition(
name='confidence_reporting',
parameter_type='boolean',
description='Include confidence levels in analysis',
required=False,
default_value=True
)
}
for param_name, param_def in common_params.items():
self.parameters[param_name] = param_def
def get_base_template_structure(self) -> str:
"""Provide base structure that child templates can extend"""
return '''CONTEXT: You are a {context_role} conducting professional analysis.
{specific_context}
ANALYSIS REQUIREMENTS:
- Output Format: {output_format}
- Include Confidence Levels: {confidence_reporting}
{specific_requirements}
{analysis_body}
{output_formatting}'''
def get_template_content(self) -> str:
"""Child classes must implement specific template content"""
base_structure = self.get_base_template_structure()
return base_structure.format(
specific_context=self.get_specific_context(),
specific_requirements=self.get_specific_requirements(),
analysis_body=self.get_analysis_body(),
output_formatting=self.get_output_formatting()
)
@abstractmethod
def get_specific_context(self) -> str:
"""Child classes provide specific context"""
pass
@abstractmethod
def get_specific_requirements(self) -> str:
"""Child classes provide specific requirements"""
pass
@abstractmethod
def get_analysis_body(self) -> str:
"""Child classes provide main analysis structure"""
pass
def get_output_formatting(self) -> str:
"""Default output formatting - can be overridden"""
return '''OUTPUT FORMAT:
- Use clear section headers
- Include specific metrics where available
- Provide actionable insights
- Maintain professional tone'''
class CompetitiveAnalysisTemplate(BaseAnalysisTemplate):
"""Specialized template for competitive analysis"""
def __init__(self):
metadata = TemplateMetadata(
name="competitive_analysis",
version="1.0.0",
author="strategy_team",
created_at=datetime.now(),
description="Competitive landscape analysis template",
template_type=TemplateType.SPECIALIZED,
parent_template="base_analysis",
tags=["competitive", "strategy", "market"]
)
super().__init__(metadata)
self.add_competitive_parameters()
def add_competitive_parameters(self):
"""Add parameters specific to competitive analysis"""
competitive_params = {
'competitors': ParameterDefinition(
name='competitors',
parameter_type='array',
description='List of competitors to analyze',
required=True,
examples=[["Company A", "Company B", "Company C"]],
validation_schema={
'type': 'array',
'items': {'type': 'string', 'minLength': 1},
'minItems': 1,
'maxItems': 10
}
),
'analysis_dimensions': ParameterDefinition(
name='analysis_dimensions',
parameter_type='array',
description='Dimensions to analyze competitors on',
required=False,
default_value=['market_share', 'pricing', 'features', 'customer_satisfaction'],
examples=[["pricing", "features", "market_share"]],
validation_schema={
'type': 'array',
'items': {'type': 'string'},
'minItems': 1
}
),
'market_data': ParameterDefinition(
name='market_data',
parameter_type='string',
description='Description of available market data',
required=True,
examples=['Q3 market share reports and pricing analysis']
)
}
for param_name, param_def in competitive_params.items():
self.parameters[param_name] = param_def
def get_specific_context(self) -> str:
return '''COMPETITIVE ANALYSIS CONTEXT:
Market Data Available: {market_data}
Competitors to Analyze: {competitors_formatted}
Analysis Dimensions: {analysis_dimensions_formatted}'''
def get_specific_requirements(self) -> str:
return '''- Focus on actionable competitive intelligence
- Identify market gaps and opportunities
- Assess competitive threats and responses
- Provide strategic positioning recommendations'''
def get_analysis_body(self) -> str:
return '''COMPETITIVE ANALYSIS STRUCTURE:
1. MARKET LANDSCAPE OVERVIEW
- Current market dynamics
- Key trends affecting competition
2. COMPETITOR ANALYSIS
For each competitor, analyze:
{analysis_dimensions_formatted}
3. COMPETITIVE POSITIONING
- Our strengths vs. competitors
- Vulnerabilities and threats
- Market opportunities
4. STRATEGIC RECOMMENDATIONS
- Immediate competitive responses needed
- Medium-term positioning strategy
- Long-term competitive advantages to build
5. MONITORING AND METRICS
- Key indicators to track competitive changes
- Early warning signals for competitive threats'''
def render(self, parameters: Dict[str, Any]) -> str:
"""Custom rendering with competitive-specific formatting"""
validated_params = self.validate_parameters(parameters)
# Format competitors list
competitors = validated_params['competitors']
validated_params['competitors_formatted'] = ', '.join(competitors)
# Format analysis dimensions
dimensions = validated_params['analysis_dimensions']
validated_params['analysis_dimensions_formatted'] = '\n - '.join(dimensions)
return super().render(validated_params)
class TemplateRegistry:
"""Registry for managing template collections"""
def __init__(self):
self.templates: Dict[str, PromptTemplate] = {}
self.template_history: Dict[str, List[TemplateMetadata]] = {}
def register_template(self, template: PromptTemplate):
"""Register a template in the registry"""
template_key = f"{template.metadata.name}_{template.metadata.version}"
self.templates[template_key] = template
# Track version history
if template.metadata.name not in self.template_history:
self.template_history[template.metadata.name] = []
self.template_history[template.metadata.name].append(template.metadata)
def get_template(self, name: str, version: Optional[str] = None) -> PromptTemplate:
"""Retrieve a template by name and version"""
if version:
template_key = f"{name}_{version}"
else:
# Get latest version
versions = self.template_history.get(name, [])
if not versions:
raise ValueError(f"Template '{name}' not found")
latest_version = max(versions, key=lambda v: v.created_at)
template_key = f"{name}_{latest_version.version}"
if template_key not in self.templates:
raise ValueError(f"Template '{template_key}' not found")
return self.templates[template_key]
def list_templates(self) -> Dict[str, List[str]]:
"""List all templates and their versions"""
result = {}
for name, history in self.template_history.items():
result[name] = [meta.version for meta in sorted(history, key=lambda h: h.created_at)]
return result
def get_template_lineage(self, name: str) -> List[TemplateMetadata]:
"""Get the version history of a template"""
return self.template_history.get(name, [])
# Example usage of inheritance system
def demonstrate_inheritance():
"""Demonstrate template inheritance and registry usage"""
# Create registry
registry = TemplateRegistry()
# Register templates
competitive_template = CompetitiveAnalysisTemplate()
registry.register_template(competitive_template)
print("Available templates:")
print(json.dumps(registry.list_templates(), indent=2))
# Use template
params = {
'context_role': 'senior strategy consultant',
'competitors': ['Google', 'Microsoft', 'Amazon'],
'analysis_dimensions': ['market_share', 'pricing_strategy', 'product_features', 'customer_acquisition'],
'market_data': 'Q3 cloud services market data including revenue, growth rates, and customer segments',
'output_format': 'executive_summary',
'confidence_reporting': True
}
template = registry.get_template('competitive_analysis')
rendered_prompt = template.render(params)
print("\nRendered Competitive Analysis Template:")
print(rendered_prompt[:500] + "...")
if __name__ == "__main__":
demonstrate_inheritance()Version Control for Prompts
Professional prompt development requires systematic version control that tracks changes, enables rollbacks, and supports collaborative development across teams.
Semantic Versioning for Prompts
Adopt semantic versioning (MAJOR.MINOR.PATCH) adapted for prompt engineering workflows:
🔥 MAJOR Version
Breaking changes that require new evaluation
- Complete prompt restructure
- Changed parameter requirements
- Different output format
🔧 MINOR Version
Backward-compatible improvements
- Added optional parameters
- Enhanced instructions
- Improved examples
🩹 PATCH Version
Bug fixes and minor corrections
- Fixed typos or grammar
- Clarified ambiguous wording
- Updated examples
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import json
import git # GitPython library
class ChangeType(Enum):
MAJOR = "major"
MINOR = "minor"
PATCH = "patch"
class PromptVersionManager:
"""Comprehensive version management for prompt templates"""
def __init__(self, repository_path: str = "."):
self.repo_path = repository_path
self.versions: Dict[str, PromptVersion] = {}
self.deployments: Dict[str, str] = {} # environment -> version_id
self.change_log: List[ChangeLogEntry] = []
try:
self.git_repo = git.Repo(repository_path)
except git.InvalidGitRepositoryError:
print(f"Warning: {repository_path} is not a Git repository")
self.git_repo = None
def create_version(self, template: PromptTemplate, change_type: ChangeType,
change_summary: str, author: str,
parent_version: Optional[str] = None,
breaking_changes: Optional[List[str]] = None) -> str:
"""Create a new version with comprehensive tracking"""
# Determine version numbers
if parent_version and parent_version in self.versions:
parent = self.versions[parent_version]
major, minor, patch = parent.major, parent.minor, parent.patch
if change_type == ChangeType.MAJOR:
major += 1
minor = 0
patch = 0
elif change_type == ChangeType.MINOR:
minor += 1
patch = 0
else: # PATCH
patch += 1
else:
major, minor, patch = 1, 0, 0
# Generate content hash for integrity verification
template_content = template.get_template_content()
content_hash = hashlib.sha256(
f"{template_content}{json.dumps(template.get_parameter_documentation(), sort_keys=True)}".encode()
).hexdigest()
# Create version ID
version_string = f"{major}.{minor}.{patch}"
version_id = f"{template.metadata.name}_v{version_string}"
# Create version record
version = PromptVersion(
version_id=version_id,
template_name=template.metadata.name,
major=major,
minor=minor,
patch=patch,
content_hash=content_hash,
template_content=template_content,
parameter_schema=template.get_parameter_documentation(),
created_at=datetime.now(),
created_by=author,
change_type=change_type,
change_summary=change_summary,
parent_version=parent_version,
breaking_changes=breaking_changes or [],
git_commit_hash=self._get_current_git_commit() if self.git_repo else None
)
self.versions[version_id] = version
# Log the change
change_entry = ChangeLogEntry(
version_id=version_id,
change_type=change_type,
summary=change_summary,
author=author,
timestamp=datetime.now(),
breaking_changes=breaking_changes or []
)
self.change_log.append(change_entry)
# Update template metadata
template.metadata.version = version_string
template.metadata.last_modified = datetime.now()
return version_id
def deploy_version(self, version_id: str, environment: str,
deployment_notes: str = "", approver: str = "") -> bool:
"""Deploy a version to a specific environment"""
if version_id not in self.versions:
raise ValueError(f"Version {version_id} not found")
version = self.versions[version_id]
# Check for breaking changes if deploying to production
if environment == "production" and version.change_type == ChangeType.MAJOR:
if not version.breaking_changes:
raise ValueError("Major version requires breaking changes documentation")
# Record deployment
deployment = DeploymentRecord(
version_id=version_id,
environment=environment,
deployed_at=datetime.now(),
deployed_by=approver or "system",
deployment_notes=deployment_notes,
git_commit_hash=self._get_current_git_commit() if self.git_repo else None
)
# Store deployment record (in practice, this would go to a database)
self.deployments[environment] = version_id
print(f"Deployed {version_id} to {environment}")
return True
def get_deployed_version(self, environment: str = "production") -> Optional[PromptVersion]:
"""Get the currently deployed version for an environment"""
if environment not in self.deployments:
return None
version_id = self.deployments[environment]
return self.versions.get(version_id)
def compare_versions(self, version_id_1: str, version_id_2: str) -> Dict[str, Any]:
"""Compare two versions and show differences"""
if version_id_1 not in self.versions or version_id_2 not in self.versions:
raise ValueError("One or both versions not found")
v1 = self.versions[version_id_1]
v2 = self.versions[version_id_2]
# Simple text diff (in practice, use a proper diff library)
content_diff = self._simple_diff(v1.template_content, v2.template_content)
# Parameter changes
param_changes = self._compare_parameters(
v1.parameter_schema['parameters'],
v2.parameter_schema['parameters']
)
return {
'version_1': {'id': version_id_1, 'version': f"{v1.major}.{v1.minor}.{v1.patch}"},
'version_2': {'id': version_id_2, 'version': f"{v2.major}.{v2.minor}.{v2.patch}"},
'content_changes': content_diff,
'parameter_changes': param_changes,
'change_summary': v2.change_summary if v2.created_at > v1.created_at else v1.change_summary
}
def get_version_lineage(self, template_name: str) -> List[Dict[str, Any]]:
"""Get the complete version history for a template"""
template_versions = [
v for v in self.versions.values()
if v.template_name == template_name
]
# Sort by creation time
template_versions.sort(key=lambda v: v.created_at)
lineage = []
for version in template_versions:
lineage.append({
'version_id': version.version_id,
'version': f"{version.major}.{version.minor}.{version.patch}",
'change_type': version.change_type.value,
'summary': version.change_summary,
'author': version.created_by,
'created_at': version.created_at.isoformat(),
'parent_version': version.parent_version,
'breaking_changes': version.breaking_changes
})
return lineage
def rollback_deployment(self, environment: str, target_version_id: str,
rollback_reason: str, operator: str) -> bool:
"""Rollback to a previous version in an environment"""
if target_version_id not in self.versions:
raise ValueError(f"Target version {target_version_id} not found")
current_version_id = self.deployments.get(environment)
if not current_version_id:
raise ValueError(f"No deployment found for environment {environment}")
# Log rollback
rollback_entry = RollbackRecord(
from_version_id=current_version_id,
to_version_id=target_version_id,
environment=environment,
rollback_reason=rollback_reason,
performed_by=operator,
performed_at=datetime.now()
)
# Update deployment
self.deployments[environment] = target_version_id
print(f"Rolled back {environment} from {current_version_id} to {target_version_id}")
print(f"Reason: {rollback_reason}")
return True
def _get_current_git_commit(self) -> Optional[str]:
"""Get current Git commit hash"""
if not self.git_repo:
return None
try:
return self.git_repo.head.commit.hexsha
except:
return None
def _simple_diff(self, text1: str, text2: str) -> Dict[str, Any]:
"""Simple diff implementation - use proper library in production"""
lines1 = text1.split('\n')
lines2 = text2.split('\n')
# Very basic line-by-line comparison
additions = []
deletions = []
for i, line in enumerate(lines2):
if i >= len(lines1) or line != lines1[i]:
additions.append(f"+ Line {i+1}: {line}")
for i, line in enumerate(lines1):
if i >= len(lines2) or line != lines2[i]:
deletions.append(f"- Line {i+1}: {line}")
return {
'additions': additions,
'deletions': deletions,
'total_changes': len(additions) + len(deletions)
}
def _compare_parameters(self, params1: Dict, params2: Dict) -> Dict[str, Any]:
"""Compare parameter schemas between versions"""
added_params = set(params2.keys()) - set(params1.keys())
removed_params = set(params1.keys()) - set(params2.keys())
modified_params = []
for param_name in set(params1.keys()) & set(params2.keys()):
if params1[param_name] != params2[param_name]:
modified_params.append({
'parameter': param_name,
'old': params1[param_name],
'new': params2[param_name]
})
return {
'added': list(added_params),
'removed': list(removed_params),
'modified': modified_params
}
@dataclass
class PromptVersion:
"""Complete version record for a prompt template"""
version_id: str
template_name: str
major: int
minor: int
patch: int
content_hash: str
template_content: str
parameter_schema: Dict[str, Any]
created_at: datetime
created_by: str
change_type: ChangeType
change_summary: str
parent_version: Optional[str] = None
breaking_changes: List[str] = None
git_commit_hash: Optional[str] = None
@dataclass
class ChangeLogEntry:
"""Entry in the prompt change log"""
version_id: str
change_type: ChangeType
summary: str
author: str
timestamp: datetime
breaking_changes: List[str]
@dataclass
class DeploymentRecord:
"""Record of a version deployment"""
version_id: str
environment: str
deployed_at: datetime
deployed_by: str
deployment_notes: str
git_commit_hash: Optional[str] = None
@dataclass
class RollbackRecord:
"""Record of a deployment rollback"""
from_version_id: str
to_version_id: str
environment: str
rollback_reason: str
performed_by: str
performed_at: datetime
# Example usage
def demonstrate_version_management():
"""Demonstrate comprehensive version management"""
# Initialize version manager
version_manager = PromptVersionManager()
# Create initial template
template = BusinessAnalysisTemplate()
# Create initial version
v1_id = version_manager.create_version(
template=template,
change_type=ChangeType.MAJOR,
change_summary="Initial release of business analysis template",
author="prompt_engineering_team"
)
print(f"Created initial version: {v1_id}")
# Deploy to staging
version_manager.deploy_version(
version_id=v1_id,
environment="staging",
deployment_notes="Initial deployment for testing",
approver="team_lead"
)
# Simulate template improvement
# (In practice, you'd modify the template)
v2_id = version_manager.create_version(
template=template,
change_type=ChangeType.MINOR,
change_summary="Added support for urgency levels and enhanced output formatting",
author="prompt_engineer_alice",
parent_version=v1_id
)
# Deploy new version to production
version_manager.deploy_version(
version_id=v2_id,
environment="production",
deployment_notes="Production release with urgency level support",
approver="engineering_manager"
)
# Show version lineage
lineage = version_manager.get_version_lineage(template.metadata.name)
print("\nVersion History:")
for version in lineage:
print(f" {version['version']} ({version['change_type']}) - {version['summary']}")
# Compare versions
comparison = version_manager.compare_versions(v1_id, v2_id)
print(f"\nChanges from v1 to v2: {comparison['change_summary']}")
# Simulate rollback scenario
version_manager.rollback_deployment(
environment="production",
target_version_id=v1_id,
rollback_reason="Performance issue detected in v2",
operator="incident_responder"
)
if __name__ == "__main__":
demonstrate_version_management()Collaborative Workflows
Professional prompt engineering requires team collaboration patterns that ensure quality, consistency, and knowledge sharing across team members.
Prompt Review and Approval Process
Implement a systematic review process similar to code reviews, ensuring prompt quality and team alignment:
from enum import Enum
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import uuid
class ReviewStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
CHANGES_REQUESTED = "changes_requested"
REJECTED = "rejected"
class ReviewerRole(Enum):
PROMPT_ENGINEER = "prompt_engineer"
DOMAIN_EXPERT = "domain_expert"
TECH_LEAD = "tech_lead"
PRODUCT_MANAGER = "product_manager"
@dataclass
class ReviewComment:
"""Individual comment in a prompt review"""
comment_id: str
reviewer: str
reviewer_role: ReviewerRole
section: str # Which part of the prompt
comment_text: str
severity: str # critical, major, minor, suggestion
created_at: datetime
resolved: bool = False
resolution_comment: Optional[str] = None
@dataclass
class PromptReview:
"""Complete review record for a prompt version"""
review_id: str
prompt_version_id: str
requested_by: str
reviewers: List[str]
status: ReviewStatus
created_at: datetime
completed_at: Optional[datetime]
comments: List[ReviewComment]
approval_criteria: Dict[str, bool]
final_decision: Optional[str] = None
class PromptReviewSystem:
"""System for managing collaborative prompt reviews"""
def __init__(self):
self.reviews: Dict[str, PromptReview] = {}
self.reviewer_assignments: Dict[str, List[ReviewerRole]] = {}
self.review_templates: Dict[str, Dict] = self._initialize_review_templates()
def _initialize_review_templates(self) -> Dict[str, Dict]:
"""Initialize review criteria templates for different prompt types"""
return {
'business_analysis': {
'required_reviewers': [ReviewerRole.PROMPT_ENGINEER, ReviewerRole.DOMAIN_EXPERT],
'approval_criteria': {
'clarity_and_structure': 'Instructions are clear and well-structured',
'parameter_validation': 'Parameters are properly defined and validated',
'output_specification': 'Expected output format is clearly specified',
'domain_accuracy': 'Business context and terminology are accurate',
'edge_case_handling': 'Edge cases and error conditions are addressed',
'performance_considerations': 'Prompt is optimized for consistent performance'
},
'review_checklist': [
'Prompt follows CRAFT framework',
'All parameters have proper validation',
'Examples are relevant and accurate',
'Output format is production-ready',
'Error handling is comprehensive'
]
},
'customer_support': {
'required_reviewers': [ReviewerRole.PROMPT_ENGINEER, ReviewerRole.PRODUCT_MANAGER],
'approval_criteria': {
'customer_tone': 'Maintains appropriate customer-facing tone',
'escalation_handling': 'Proper escalation paths are defined',
'compliance': 'Meets customer service compliance requirements',
'consistency': 'Consistent with brand voice and guidelines'
}
}
}
def initiate_review(self, prompt_version_id: str, prompt_type: str,
requested_by: str, custom_reviewers: Optional[List[str]] = None) -> str:
"""Start a new prompt review process"""
review_id = str(uuid.uuid4())
# Determine reviewers
template = self.review_templates.get(prompt_type, self.review_templates['business_analysis'])
required_roles = template['required_reviewers']
if custom_reviewers:
reviewers = custom_reviewers
else:
reviewers = self._assign_reviewers(required_roles)
# Initialize approval criteria
approval_criteria = {
criterion: False for criterion in template['approval_criteria'].keys()
}
review = PromptReview(
review_id=review_id,
prompt_version_id=prompt_version_id,
requested_by=requested_by,
reviewers=reviewers,
status=ReviewStatus.PENDING,
created_at=datetime.now(),
completed_at=None,
comments=[],
approval_criteria=approval_criteria
)
self.reviews[review_id] = review
# Notify reviewers (in practice, this would send actual notifications)
self._notify_reviewers(review_id, reviewers)
return review_id
def add_review_comment(self, review_id: str, reviewer: str,
section: str, comment_text: str,
severity: str = "minor") -> str:
"""Add a comment to a prompt review"""
if review_id not in self.reviews:
raise ValueError(f"Review {review_id} not found")
review = self.reviews[review_id]
if reviewer not in review.reviewers:
raise ValueError(f"Reviewer {reviewer} not assigned to this review")
comment_id = str(uuid.uuid4())
comment = ReviewComment(
comment_id=comment_id,
reviewer=reviewer,
reviewer_role=self._get_reviewer_role(reviewer),
section=section,
comment_text=comment_text,
severity=severity,
created_at=datetime.now()
)
review.comments.append(comment)
return comment_id
def submit_review_decision(self, review_id: str, reviewer: str,
approved_criteria: Dict[str, bool],
overall_decision: ReviewStatus,
summary_comment: str = "") -> bool:
"""Submit a reviewer's decision on the prompt"""
if review_id not in self.reviews:
raise ValueError(f"Review {review_id} not found")
review = self.reviews[review_id]
if reviewer not in review.reviewers:
raise ValueError(f"Reviewer {reviewer} not assigned to this review")
# Update approval criteria (merge with existing)
for criterion, approved in approved_criteria.items():
if criterion in review.approval_criteria:
review.approval_criteria[criterion] = approved
# Add summary comment if provided
if summary_comment:
self.add_review_comment(
review_id=review_id,
reviewer=reviewer,
section="overall",
comment_text=f"Review Decision: {overall_decision.value}. {summary_comment}",
severity="major" if overall_decision != ReviewStatus.APPROVED else "minor"
)
# Check if all reviewers have completed their reviews
self._check_review_completion(review_id)
return True
def resolve_comment(self, review_id: str, comment_id: str,
resolution_comment: str, resolved_by: str) -> bool:
"""Mark a review comment as resolved"""
if review_id not in self.reviews:
raise ValueError(f"Review {review_id} not found")
review = self.reviews[review_id]
for comment in review.comments:
if comment.comment_id == comment_id:
comment.resolved = True
comment.resolution_comment = f"{resolved_by}: {resolution_comment}"
return True
raise ValueError(f"Comment {comment_id} not found in review {review_id}")
def get_review_status(self, review_id: str) -> Dict[str, Any]:
"""Get comprehensive status of a review"""
if review_id not in self.reviews:
raise ValueError(f"Review {review_id} not found")
review = self.reviews[review_id]
# Calculate progress
total_criteria = len(review.approval_criteria)
approved_criteria = sum(1 for approved in review.approval_criteria.values() if approved)
# Categorize comments
unresolved_critical = [
c for c in review.comments
if not c.resolved and c.severity == "critical"
]
unresolved_major = [
c for c in review.comments
if not c.resolved and c.severity == "major"
]
return {
'review_id': review_id,
'status': review.status.value,
'progress': {
'criteria_approved': f"{approved_criteria}/{total_criteria}",
'completion_percentage': (approved_criteria / total_criteria) * 100 if total_criteria > 0 else 0
},
'comments': {
'total': len(review.comments),
'unresolved_critical': len(unresolved_critical),
'unresolved_major': len(unresolved_major),
'resolved': len([c for c in review.comments if c.resolved])
},
'blockers': [c.comment_text for c in unresolved_critical],
'reviewers': review.reviewers,
'can_approve': self._can_approve_review(review_id)
}
def generate_review_report(self, review_id: str) -> Dict[str, Any]:
"""Generate a comprehensive review report"""
if review_id not in self.reviews:
raise ValueError(f"Review {review_id} not found")
review = self.reviews[review_id]
status = self.get_review_status(review_id)
# Group comments by section and severity
comments_by_section = {}
for comment in review.comments:
if comment.section not in comments_by_section:
comments_by_section[comment.section] = []
comments_by_section[comment.section].append({
'reviewer': comment.reviewer,
'severity': comment.severity,
'text': comment.comment_text,
'resolved': comment.resolved
})
# Approval criteria status
criteria_status = []
for criterion, approved in review.approval_criteria.items():
criteria_status.append({
'criterion': criterion,
'approved': approved,
'status': '✅' if approved else '❌'
})
return {
'review_summary': {
'review_id': review_id,
'prompt_version': review.prompt_version_id,
'status': review.status.value,
'requested_by': review.requested_by,
'created_at': review.created_at.isoformat(),
'completed_at': review.completed_at.isoformat() if review.completed_at else None
},
'progress': status['progress'],
'approval_criteria': criteria_status,
'comments_by_section': comments_by_section,
'next_steps': self._get_next_steps(review_id)
}
def _assign_reviewers(self, required_roles: List[ReviewerRole]) -> List[str]:
"""Assign reviewers based on required roles"""
# In practice, this would query a user database
role_assignments = {
ReviewerRole.PROMPT_ENGINEER: ["alice_engineer", "bob_engineer"],
ReviewerRole.DOMAIN_EXPERT: ["charlie_expert", "diana_expert"],
ReviewerRole.TECH_LEAD: ["eve_lead"],
ReviewerRole.PRODUCT_MANAGER: ["frank_pm"]
}
reviewers = []
for role in required_roles:
if role in role_assignments:
# Assign first available reviewer for each role
reviewers.append(role_assignments[role][0])
return reviewers
def _get_reviewer_role(self, reviewer: str) -> ReviewerRole:
"""Get the role of a reviewer"""
# In practice, this would query user roles from a database
role_mapping = {
"alice_engineer": ReviewerRole.PROMPT_ENGINEER,
"bob_engineer": ReviewerRole.PROMPT_ENGINEER,
"charlie_expert": ReviewerRole.DOMAIN_EXPERT,
"diana_expert": ReviewerRole.DOMAIN_EXPERT,
"eve_lead": ReviewerRole.TECH_LEAD,
"frank_pm": ReviewerRole.PRODUCT_MANAGER
}
return role_mapping.get(reviewer, ReviewerRole.PROMPT_ENGINEER)
def _check_review_completion(self, review_id: str):
"""Check if review is complete and update status"""
review = self.reviews[review_id]
# Check if all critical issues are resolved
unresolved_critical = [
c for c in review.comments
if not c.resolved and c.severity == "critical"
]
if unresolved_critical:
review.status = ReviewStatus.CHANGES_REQUESTED
return
# Check if majority of criteria are approved
approved_count = sum(1 for approved in review.approval_criteria.values() if approved)
total_count = len(review.approval_criteria)
if approved_count >= (total_count * 0.8): # 80% approval threshold
review.status = ReviewStatus.APPROVED
review.completed_at = datetime.now()
elif approved_count < (total_count * 0.5): # Less than 50% approval
review.status = ReviewStatus.CHANGES_REQUESTED
def _can_approve_review(self, review_id: str) -> bool:
"""Determine if review can be approved"""
review = self.reviews[review_id]
# No unresolved critical issues
unresolved_critical = [
c for c in review.comments
if not c.resolved and c.severity == "critical"
]
if unresolved_critical:
return False
# Majority criteria approved
approved_count = sum(1 for approved in review.approval_criteria.values() if approved)
total_count = len(review.approval_criteria)
return approved_count >= (total_count * 0.8)
def _get_next_steps(self, review_id: str) -> List[str]:
"""Get recommended next steps for the review"""
review = self.reviews[review_id]
next_steps = []
if review.status == ReviewStatus.PENDING:
next_steps.append("Waiting for reviewer feedback")
elif review.status == ReviewStatus.CHANGES_REQUESTED:
# Find unresolved issues
unresolved = [c for c in review.comments if not c.resolved and c.severity in ["critical", "major"]]
if unresolved:
next_steps.append(f"Address {len(unresolved)} unresolved issues")
next_steps.extend([f"- {c.comment_text[:100]}..." for c in unresolved[:3]])
elif review.status == ReviewStatus.APPROVED:
next_steps.append("Ready for deployment")
return next_steps
def _notify_reviewers(self, review_id: str, reviewers: List[str]):
"""Notify reviewers of new review assignment"""
# In practice, this would send actual notifications
print(f"Review {review_id} assigned to reviewers: {', '.join(reviewers)}")
# Example usage
def demonstrate_review_workflow():
"""Demonstrate collaborative review workflow"""
review_system = PromptReviewSystem()
# Initiate review
review_id = review_system.initiate_review(
prompt_version_id="business_analysis_v2.1.0",
prompt_type="business_analysis",
requested_by="prompt_author"
)
print(f"Started review: {review_id}")
# Reviewer 1: Prompt Engineer feedback
review_system.add_review_comment(
review_id=review_id,
reviewer="alice_engineer",
section="parameter_validation",
comment_text="Parameter validation schema needs improvement for the 'key_questions' array",
severity="major"
)
# Reviewer 2: Domain Expert feedback
review_system.add_review_comment(
review_id=review_id,
reviewer="charlie_expert",
section="business_context",
comment_text="The competitive analysis section should include more specific metrics",
severity="minor"
)
# Submit review decisions
review_system.submit_review_decision(
review_id=review_id,
reviewer="alice_engineer",
approved_criteria={
"clarity_and_structure": True,
"parameter_validation": False, # Major issue found
"output_specification": True
},
overall_decision=ReviewStatus.CHANGES_REQUESTED,
summary_comment="Overall structure is good, but parameter validation needs work"
)
# Get review status
status = review_system.get_review_status(review_id)
print(f"\nReview Status: {status['status']}")
print(f"Progress: {status['progress']['criteria_approved']}")
print(f"Unresolved Critical Issues: {status['comments']['unresolved_critical']}")
# Generate report
report = review_system.generate_review_report(review_id)
print(f"\nNext Steps: {report['next_steps']}")
if __name__ == "__main__":
demonstrate_review_workflow()Next Steps: From Templates to Evaluation
You now have the tools to build professional template systems with version control, inheritance patterns, and collaborative workflows. This foundation enables systematic prompt development that scales across teams and applications.
Practice Implementation
Before advancing to Stage 3, implement these template design patterns:
- Build a Template Library: Create 3-5 templates for different use cases in your domain, implementing the inheritance patterns shown.
- Implement Version Control: Set up a version management system for your templates with semantic versioning.
- Create Review Process: Establish a review workflow for new templates or major changes.
- Parameter Validation: Add comprehensive parameter validation with JSON Schema for at least one complex template.
🎯 Stage 3 Preview: Systematic Evaluation
Next, we'll build comprehensive evaluation systems that measure template performance, implement LLM-as-a-judge patterns, and create automated testing pipelines.
- Multi-dimensional evaluation metrics
- LLM-as-a-judge implementation patterns
- Automated evaluation pipelines
- Performance benchmarking and regression testing
- A/B testing frameworks for prompts
Further Resources
Advanced resources for template design, version control, and collaborative development:
Template Design Resources
Comprehensive framework for building and managing prompt templates
Industry best practices for versioning and managing prompt templates
Jinja2 template inheritance concepts applicable to prompt design
Learn JSON Schema for validating prompt template parameters
Git workflow patterns adaptable for prompt template management
Twelve-factor app principles for managing template configurations
