From concept to production - everything you need to know
What you'll learn:
Prerequisites: Basic Python, API experience
An AI Agent is a digital assistant that can think, plan, and act autonomously to achieve goals.
| Traditional Chatbot | AI Agent |
|---|---|
| ❌ Only responds to questions | ✅ Works toward goals |
| ❌ No memory between chats | ✅ Remembers and learns |
| ❌ Can't use tools | ✅ Integrates with systems |
| ❌ No planning ability | ✅ Breaks down complex tasks |
+---------------------------------------------------------------+
AGENT LOOP
+---------------------------------------------------------------+
[User Input] --> PERCEIVE --> THINK --> ACT --> LEARN --> [Response]
| | | |
v v v v
+---------+ +--------+ +-----+ +---------+
|Context | |Planning| |Tools| |Storage |
|History | |LLM | |Exec | |Extract |
|Facts | |Analysis| |APIs | |Learn |
|Memory | |Decide | |Calc | |Remember |
+---------+ +--------+ +-----+ +---------+
+---------------------------------------------------------------+
PERCEIVE: Gather information from environment/user
THINK: Process information and plan actions
ACT: Execute actions or provide responses
LEARN: Update knowledge for future use +-------------------+
| User Interface |
+-------+-----------+
|
+-------v-----------+
| Agent Controller | <--- Orchestrates Everything
+--+----+----+----+-+
+-----+ | | +-----+
| | | |
+-------v--+ +-----v---+ +v-----+ +v--------+
| LLM | | Memory | | Tools| | Monitor |
| | | | | | | |
| GPT-4 | | Working | |Search| | Health |
| Claude | | Long-tm | | Calc | | Metrics |
| Gemini | | Vector | | APIs | | Logs |
+----------+ +---------+ +------+ +---------+👤 User Interface ↓ 🎯 Agent Controller (orchestrates everything) ↙ ↓ ↓ ↘ 🤖 LLM 💾 Memory 🔧 Tools 📊 Monitor
---
## 3. Your First Agent (Complete Implementation)
Let's build a working agent that demonstrates all core concepts:
### Core Agent Class
```python
import openai
from datetime import datetime
import re
import json
from typing import Dict, Any, List
class MyFirstAgent:
"""Complete AI agent with agent loop, tools, memory, and learning"""
def __init__(self, api_key: str, name: str = "MyFirstAgent"):
self.client = openai.OpenAI(api_key=api_key)
self.name = name
self.conversation_history = []
self.learned_facts = {}
# Available tools
self.tools = {
'calculator': self._calculator_tool,
'remember_fact': self._remember_fact_tool,
'recall_fact': self._recall_fact_tool,
'get_time': self._time_tool
}
print(f"🤖 {self.name} initialized with {len(self.tools)} tools!")
```
### The Agent Loop Implementation
**Agent Loop Flow:**User: "What's 157 * 23?" | v +---------------------------------------------------------------+
```python
def chat(self, user_message: str) -> str:
"""Main chat method implementing PERCEIVE → THINK → ACT → LEARN"""
print(f"🔍 Processing: {user_message}")
try:
# PERCEIVE: Gather context
context = self._gather_context(user_message)
# THINK: Plan response
plan = self._create_plan(user_message, context)
# ACT: Execute plan
response = self._execute_plan(plan, user_message)
# LEARN: Store interaction
self._learn_from_interaction(user_message, response)
return response
except Exception as e:
error_response = f"I encountered an issue: {str(e)}"
self._learn_from_interaction(user_message, error_response)
return error_response
```
### Context Gathering (PERCEIVE)
```python
def _gather_context(self, user_message: str) -> Dict[str, Any]:
"""PERCEIVE: Collect relevant context"""
# Recent conversation history
recent_history = ""
if self.conversation_history:
recent = self.conversation_history[-3:] # Last 3 exchanges
history_parts = []
for exchange in recent:
history_parts.append(f"User: {exchange['user']}")
history_parts.append(f"Assistant: {exchange['assistant']}")
recent_history = "\n".join(history_parts)
# Search for relevant facts
relevant_facts = []
user_words = user_message.lower().split()
for key, fact in self.learned_facts.items():
if any(word in key for word in user_words):
relevant_facts.append(f"{key}: {fact['value']}")
return {
'conversation_history': recent_history,
'available_tools': list(self.tools.keys()),
'relevant_facts': relevant_facts,
'current_time': datetime.now().isoformat()
}
```
### Planning System (THINK)
```python
def _create_plan(self, user_message: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""THINK: Analyze request and plan response"""
planning_prompt = f"""
Analyze this user request and determine if tools are needed:
User: "{user_message}"
Context:
- Recent conversation: {context['conversation_history']}
- Available tools: {', '.join(context['available_tools'])}
- Relevant facts: {'; '.join(context['relevant_facts'])}
If tools needed, respond with:
TOOLS_NEEDED: tool_name(parameter="value")
If no tools needed:
TOOLS_NEEDED: none
Examples:
- calculator(expression="15 * 23")
- remember_fact(key="budget", value="1500")
"""
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": f"You are {self.name}, planning responses."},
{"role": "user", "content": planning_prompt}
],
temperature=0.2
)
plan_text = response.choices[0].message.content
# Parse plan
plan = {'approach': 'direct_response', 'tools_needed': []}
if "TOOLS_NEEDED:" in plan_text:
tools_line = plan_text.split("TOOLS_NEEDED:")[1].strip()
if tools_line.lower() != "none":
# Extract tool calls
tool_pattern = r'(\w+)\((.*?)\)'
matches = re.findall(tool_pattern, tools_line)
plan['tools_needed'] = matches
plan['approach'] = 'tool_assisted'
return plan
```
### Tool Implementations
```python
# Tool implementations
def _calculator_tool(self, expression: str) -> str:
"""Safely perform calculations"""
try:
if re.match(r'^[0-9+\-*/().\s]+$', expression):
result = eval(expression)
return f"Result: {result}"
else:
return "Error: Invalid characters in expression"
except Exception as e:
return f"Error: {str(e)}"
def _remember_fact_tool(self, key: str, value: str) -> str:
"""Store a fact in memory"""
self.learned_facts[key.lower()] = {
'value': value,
'timestamp': datetime.now(),
'source': 'user_instruction'
}
return f"Remembered: {key} = {value}"
def _recall_fact_tool(self, key: str) -> str:
"""Recall a stored fact"""
fact = self.learned_facts.get(key.lower())
if fact:
return f"Recalled: {key} = {fact['value']}"
else:
return f"No fact found for: {key}"
def _time_tool(self, timezone: str = "UTC") -> str:
"""Get current time"""
return f"Current time ({timezone}): {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
```
### Learning System (LEARN)
```python
def _learn_from_interaction(self, user_message: str, agent_response: str):
"""LEARN: Store interaction and extract knowledge"""
# Store conversation
self.conversation_history.append({
'user': user_message,
'assistant': agent_response,
'timestamp': datetime.now()
})
# Keep last 10 exchanges
if len(self.conversation_history) > 10:
self.conversation_history.pop(0)
# Learn preferences
if "i like" in user_message.lower() or "i prefer" in user_message.lower():
self.learned_facts[f"preference_{len(self.learned_facts)}"] = {
'value': user_message,
'timestamp': datetime.now(),
'source': 'conversation'
}
# Learn facts from statements
if " is " in user_message and "?" not in user_message:
parts = user_message.split(" is ")
if len(parts) == 2:
subject = parts[0].strip()
info = parts[1].strip()
self.learned_facts[subject.lower()] = {
'value': info,
'timestamp': datetime.now(),
'source': 'user_provided'
}
```
### Testing Your Agent
```python
# Complete test example
def test_agent():
"""Test the agent with various scenarios"""
# Initialize agent (replace with your API key)
agent = MyFirstAgent(api_key="your-openai-api-key", name="TestBot")
# Test conversations
test_cases = [
"Hello! What can you help me with?",
"What's 157 * 23?",
"Remember that my budget is $2000",
"What's my budget?",
"My name is Alice and I work at TechCorp",
"What do you know about me?",
"What time is it?"
]
for i, message in enumerate(test_cases, 1):
print(f"\n--- Test {i} ---")
print(f"User: {message}")
response = agent.chat(message)
print(f"Agent: {response}")
# Show final status
print(f"\nFacts learned: {len(agent.learned_facts)}")
print(f"Conversations: {len(agent.conversation_history)}")
# Run the test (uncomment when ready)
# test_agent()
```
**Key Features Demonstrated:**
- ✅ Complete Agent Loop (PERCEIVE → THINK → ACT → LEARN)
- ✅ Tool Integration (calculator, memory, time)
- ✅ Context-aware responses using conversation history
- ✅ Learning from interactions and storing facts
- ✅ Error handling and recovery
---
## 4. Professional Tool System
### Tool Architecture
```python
from abc import ABC, abstractmethod
from typing import Dict, Any
from enum import Enum
class ToolCategory(Enum):
INFORMATION = "information" # Web search, databases, APIs
COMMUNICATION = "communication" # Email, messaging, social
COMPUTATION = "computation" # Math, data analysis, code
DATA = "data" # File operations, storage
class BaseTool(ABC):
"""Base class for all agent tools"""
def __init__(self, name: str, description: str, category: ToolCategory):
self.name = name
self.description = description
self.category = category
self.usage_count = 0
@abstractmethod
async def execute(self, **kwargs) -> Dict[str, Any]:
"""Execute the tool"""
pass
@abstractmethod
def get_schema(self) -> Dict[str, Any]:
"""Return parameter schema for validation"""
pass
```
### Web Search Tool Example
```python
class WebSearchTool(BaseTool):
"""Production-ready web search tool"""
def __init__(self, api_key: str, provider: str = "google"):
super().__init__(
name="web_search",
description="Search the internet for current information",
category=ToolCategory.INFORMATION
)
self.api_key = api_key
self.provider = provider
async def execute(self, query: str, num_results: int = 5) -> Dict[str, Any]:
"""Execute web search with error handling"""
try:
# Validate parameters
if not query.strip():
return {"success": False, "error": "Query cannot be empty"}
if num_results > 10:
num_results = 10 # Limit results
# In production, integrate with:
# - Google Custom Search API
# - Bing Search API
# - SerpAPI
# Mock implementation for demo
results = [
{
"title": f"Result {i} for '{query}'",
"url": f"https://example{i}.com",
"snippet": f"Relevant information about {query}..."
}
for i in range(1, num_results + 1)
]
self.usage_count += 1
return {
"success": True,
"results": results,
"query": query,
"total_results": len(results)
}
except Exception as e:
return {
"success": False,
"error": f"Search failed: {str(e)}"
}
def get_schema(self) -> Dict[str, Any]:
"""Parameter schema for validation"""
return {
"query": {
"type": "string",
"required": True,
"description": "Search query text"
},
"num_results": {
"type": "integer",
"default": 5,
"min": 1,
"max": 10,
"description": "Number of results to return"
}
}
```
### Tool Registry System
**Tool Execution Flow:**Agent calls: registry.execute("web_search", user_id="123", query="AI agents") | v +---------------------------------------------------------------+
```python
class ToolRegistry:
"""Centralized tool management with security"""
def __init__(self):
self.tools: Dict[str, BaseTool] = {}
self.usage_stats = {}
self.security_log = []
def register(self, tool: BaseTool):
"""Register a tool with validation"""
if tool.name in self.tools:
raise ValueError(f"Tool {tool.name} already registered")
self.tools[tool.name] = tool
self.usage_stats[tool.name] = {
'calls': 0,
'errors': 0,
'avg_duration': 0
}
print(f"✅ Registered tool: {tool.name}")
async def execute(self, tool_name: str, user_id: str, **kwargs) -> Dict[str, Any]:
"""Execute tool with security and monitoring"""
if tool_name not in self.tools:
return {"success": False, "error": f"Tool {tool_name} not found"}
# Security check
if not self._check_permissions(user_id, tool_name):
self._log_security_event(user_id, tool_name, "permission_denied")
return {"success": False, "error": "Insufficient permissions"}
tool = self.tools[tool_name]
start_time = time.time()
try:
# Validate parameters against schema
validation_result = self._validate_parameters(tool, kwargs)
if not validation_result['valid']:
return {"success": False, "error": validation_result['error']}
# Execute tool
result = await tool.execute(**kwargs)
# Update statistics
duration = time.time() - start_time
self._update_stats(tool_name, duration, True)
# Log successful execution
self._log_tool_usage(user_id, tool_name, kwargs, True)
return result
except Exception as e:
duration = time.time() - start_time
self._update_stats(tool_name, duration, False)
self._log_tool_usage(user_id, tool_name, kwargs, False, str(e))
return {"success": False, "error": "Tool execution failed"}
def _validate_parameters(self, tool: BaseTool, params: Dict) -> Dict[str, Any]:
"""Validate parameters against tool schema"""
schema = tool.get_schema()
# Check required parameters
for param_name, param_config in schema.items():
if param_config.get('required', False) and param_name not in params:
return {
'valid': False,
'error': f"Missing required parameter: {param_name}"
}
# Type validation (simplified)
for param_name, value in params.items():
if param_name in schema:
expected_type = schema[param_name].get('type')
if expected_type == 'string' and not isinstance(value, str):
return {
'valid': False,
'error': f"Parameter {param_name} must be string"
}
if expected_type == 'integer' and not isinstance(value, int):
return {
'valid': False,
'error': f"Parameter {param_name} must be integer"
}
return {'valid': True}
def _check_permissions(self, user_id: str, tool_name: str) -> bool:
"""Check if user has permission to use tool"""
# Implement your permission logic here
# For demo, allow all users
return True
def get_tool_descriptions(self) -> str:
"""Get formatted tool descriptions for LLM context"""
descriptions = []
for tool in self.tools.values():
schema = tool.get_schema()
params = []
for param, config in schema.items():
param_desc = f"{param}: {config.get('type', 'any')}"
if config.get('required', False):
param_desc += " (required)"
else:
param_desc += " (optional)"
params.append(param_desc)
descriptions.append(
f"- **{tool.name}**({', '.join(params)}): {tool.description}"
)
return "\n".join(descriptions)
```
### Security Best Practices
```python
class SecureToolExecution:
"""Security wrapper for tool execution"""
def __init__(self, max_calls_per_minute: int = 30):
self.rate_limits = {}
self.max_calls_per_minute = max_calls_per_minute
def secure_execute(self, func):
"""Decorator for secure tool execution"""
@wraps(func)
async def wrapper(self, user_id: str, **kwargs):
# Rate limiting
if not self._check_rate_limit(user_id):
return {"success": False, "error": "Rate limit exceeded"}
# Input sanitization
sanitized_kwargs = self._sanitize_inputs(kwargs)
# Execute with timeout
try:
result = await asyncio.wait_for(
func(self, **sanitized_kwargs),
timeout=30 # 30 second timeout
)
return result
except asyncio.TimeoutError:
return {"success": False, "error": "Tool execution timeout"}
except Exception as e:
logging.error(f"Tool execution error: {str(e)}")
return {"success": False, "error": "Execution failed"}
return wrapper
def _sanitize_inputs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize input parameters"""
sanitized = {}
for key, value in kwargs.items():
if isinstance(value, str):
# Remove potentially dangerous characters
sanitized_value = re.sub(r'[<>"\';]', '', value)
# Limit string length
sanitized[key] = sanitized_value[:1000]
else:
sanitized[key] = value
return sanitized
def _check_rate_limit(self, user_id: str) -> bool:
"""Simple rate limiting"""
now = time.time()
if user_id not in self.rate_limits:
self.rate_limits[user_id] = []
# Remove old timestamps
self.rate_limits[user_id] = [
timestamp for timestamp in self.rate_limits[user_id]
if now - timestamp < 60 # Last minute
]
# Check if under limit
if len(self.rate_limits[user_id]) < self.max_calls_per_minute:
self.rate_limits[user_id].append(now)
return True
return False
```
---
## 5. Advanced Memory Systems
### Database-Backed Memory
```python
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Optional
class AgentMemory:
"""Production-ready memory system with persistence"""
def __init__(self, db_path: str = "agent_memory.db"):
self.db_path = db_path
self.working_memory = {'conversation': [], 'context': {}}
self._initialize_database()
def _initialize_database(self):
"""Set up memory database with proper indexes"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Conversations table
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
user_id TEXT NOT NULL,
user_message TEXT NOT NULL,
agent_response TEXT NOT NULL,
timestamp DATETIME NOT NULL,
metadata TEXT,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Facts table
cursor.execute('''
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
key TEXT NOT NULL,
value TEXT NOT NULL,
category TEXT DEFAULT 'general',
confidence REAL DEFAULT 1.0,
created_at DATETIME NOT NULL,
last_updated DATETIME NOT NULL,
source TEXT,
UNIQUE(user_id, key)
)
''')
# User preferences table
cursor.execute('''
CREATE TABLE IF NOT EXISTS preferences (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
preference_key TEXT NOT NULL,
preference_value TEXT NOT NULL,
created_at DATETIME NOT NULL,
UNIQUE(user_id, preference_key)
)
''')
# Create indexes for performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_conversations_user_time ON conversations(user_id, timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_facts_user_key ON facts(user_id, key)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_preferences_user ON preferences(user_id)')
conn.commit()
conn.close()
print("✅ Memory database initialized")
```
### Context-Aware Memory Retrieval
**Memory Context Assembly Process:**Agent calls: memory.get_relevant_context(user_id, "What's my budget?") | v +---------------------------------------------------------------+
```python
def get_relevant_context(self, user_id: str, current_message: str,
max_conversations: int = 5) -> Dict[str, Any]:
"""Get contextually relevant information"""
# Get recent conversation history
recent_conversations = self._get_recent_conversations(user_id, max_conversations)
# Get relevant facts based on current message
relevant_facts = self._search_relevant_facts(user_id, current_message)
# Get user preferences
preferences = self.get_user_preferences(user_id)
# Search conversation history for similar topics
similar_conversations = self._search_conversation_history(user_id, current_message)
return {
'recent_conversations': recent_conversations,
'relevant_facts': relevant_facts,
'user_preferences': preferences,
'similar_past_conversations': similar_conversations,
'context_timestamp': datetime.now().isoformat()
}
def _search_relevant_facts(self, user_id: str, message: str) -> List[Dict]:
"""Find facts relevant to current message using keyword matching"""
message_words = set(message.lower().split())
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT key, value, category, confidence, created_at
FROM facts
WHERE user_id = ? OR user_id IS NULL
ORDER BY confidence DESC, last_updated DESC
''', (user_id,))
relevant_facts = []
for row in cursor.fetchall():
key, value, category, confidence, created_at = row
# Simple relevance scoring based on keyword overlap
fact_words = set((key + " " + value).lower().split())
overlap = len(message_words.intersection(fact_words))
if overlap > 0:
relevant_facts.append({
'key': key,
'value': value,
'category': category,
'confidence': confidence,
'relevance_score': overlap,
'created_at': created_at
})
conn.close()
# Sort by relevance score and return top facts
relevant_facts.sort(key=lambda x: x['relevance_score'], reverse=True)
return relevant_facts[:5]
def _search_conversation_history(self, user_id: str, query: str,
limit: int = 3) -> List[Dict]:
"""Search past conversations for similar content"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Simple text search (in production, use full-text search or vector search)
query_terms = query.lower().split()
search_pattern = '%' + '%'.join(query_terms) + '%'
cursor.execute('''
SELECT user_message, agent_response, timestamp
FROM conversations
WHERE user_id = ?
AND (LOWER(user_message) LIKE ? OR LOWER(agent_response) LIKE ?)
ORDER BY timestamp DESC
LIMIT ?
''', (user_id, search_pattern, search_pattern, limit))
results = []
for row in cursor.fetchall():
results.append({
'user_message': row[0],
'agent_response': row[1],
'timestamp': row[2]
})
conn.close()
return results
```
### Learning and Adaptation
**Fact Extraction and Storage Process:**User says: "My name is Alice and I work at TechCorp" | v +---------------------------------------------------------------+
```python
def learn_from_interaction(self, user_id: str, user_message: str,
agent_response: str, session_id: str):
"""Extract and store learnings from interaction"""
# Store conversation
self.store_conversation(session_id, user_id, user_message, agent_response)
# Extract entities and facts
extracted_facts = self._extract_facts_from_message(user_message)
for fact in extracted_facts:
self.store_fact(
user_id=user_id,
key=fact['key'],
value=fact['value'],
category=fact.get('category', 'extracted'),
confidence=fact.get('confidence', 0.8),
source='conversation_extraction'
)
# Learn preferences
preferences = self._extract_preferences(user_message)
for pref in preferences:
self.store_preference(user_id, pref['key'], pref['value'])
def _extract_facts_from_message(self, message: str) -> List[Dict]:
"""Extract factual information from user message"""
facts = []
# Pattern: "My X is Y"
pattern = r'my (\w+) is (.+?)(?:\.|$|,)'
matches = re.findall(pattern, message.lower())
for match in matches:
facts.append({
'key': f"user_{match[0]}",
'value': match[1].strip(),
'category': 'personal',
'confidence': 0.9
})
# Pattern: "I work at/for X"
work_pattern = r'i work (?:at|for) (.+?)(?:\.|$|,)'
work_matches = re.findall(work_pattern, message.lower())
for match in work_matches:
facts.append({
'key': 'user_workplace',
'value': match.strip(),
'category': 'personal',
'confidence': 0.9
})
# Pattern: "I live in X"
location_pattern = r'i live in (.+?)(?:\.|$|,)'
location_matches = re.findall(location_pattern, message.lower())
for match in location_matches:
facts.append({
'key': 'user_location',
'value': match.strip(),
'category': 'personal',
'confidence': 0.9
})
return facts
def _extract_preferences(self, message: str) -> List[Dict]:
"""Extract user preferences from message"""
preferences = []
# Pattern: "I like/prefer X"
like_pattern = r'i (?:like|prefer|enjoy) (.+?)(?:\.|$|,)'
matches = re.findall(like_pattern, message.lower())
for match in matches:
preferences.append({
'key': 'likes',
'value': match.strip()
})
# Pattern: "I don't like/hate X"
dislike_pattern = r'i (?:don\'t like|hate|dislike) (.+?)(?:\.|$|,)'
matches = re.findall(dislike_pattern, message.lower())
for match in matches:
preferences.append({
'key': 'dislikes',
'value': match.strip()
})
return preferences
```
---
## 6. Production Deployment
### Production-Ready Agent Framework
```python
import logging
import time
import asyncio
from dataclasses import dataclass
from typing import Optional, Dict, Any
import uuid
@dataclass
class AgentConfig:
"""Production configuration"""
name: str
max_requests_per_minute: int = 60
timeout_seconds: int = 30
retry_attempts: int = 3
enable_caching: bool = True
log_level: str = "INFO"
class ProductionAgent:
"""Production-ready agent with monitoring and reliability"""
def __init__(self, config: AgentConfig):
self.config = config
self.logger = self._setup_logging()
self.rate_limiter = RateLimiter(config.max_requests_per_minute)
self.health_monitor = HealthMonitor()
self.metrics = MetricsCollector()
# Initialize core components
self.memory = AgentMemory()
self.tools = ToolRegistry()
self.logger.info(f"🚀 {config.name} initialized")
async def process_request(self, request: str, user_id: str) -> Dict[str, Any]:
"""Process request with full production safeguards"""
correlation_id = str(uuid.uuid4())
start_time = time.time()
self.logger.info(f"Processing request", extra={
'correlation_id': correlation_id,
'user_id': user_id,
'request_length': len(request)
})
try:
# Health check
if not self.health_monitor.is_healthy():
return self._error_response("Service unavailable", 503, correlation_id)
# Rate limiting
if not await self.rate_limiter.allow_request(user_id):
self.metrics.record_rate_limit(user_id)
return self._error_response("Rate limit exceeded", 429, correlation_id)
# Process with timeout and retries
response = await self._process_with_retries(request, user_id, correlation_id)
# Record metrics
duration = time.time() - start_time
self.metrics.record_request(user_id, duration, "success")
self.logger.info(f"Request completed", extra={
'correlation_id': correlation_id,
'duration': duration
})
return response
except Exception as e:
duration = time.time() - start_time
self.metrics.record_request(user_id, duration, "error")
self.logger.error(f"Request failed: {str(e)}", extra={
'correlation_id': correlation_id,
'duration': duration
}, exc_info=True)
return self._error_response("Internal error", 500, correlation_id)
def _error_response(self, message: str, code: int, correlation_id: str) -> Dict[str, Any]:
"""Create standardized error response"""
return {
'success': False,
'error': message,
'code': code,
'correlation_id': correlation_id,
'timestamp': datetime.now().isoformat()
}
```
### Monitoring and Metrics
```python
class MetricsCollector:
"""Comprehensive metrics collection"""
def __init__(self):
self.metrics = {
'requests_total': 0,
'requests_success': 0,
'requests_error': 0,
'requests_timeout': 0,
'rate_limits_hit': 0,
'average_duration': 0.0,
'active_users': set(),
'tool_usage': {},
'memory_operations': 0
}
self.durations = []
self.start_time = time.time()
def record_request(self, user_id: str, duration: float, status: str):
"""Record request metrics with user tracking"""
self.metrics['requests_total'] += 1
self.metrics['active_users'].add(user_id)
if status == 'success':
self.metrics['requests_success'] += 1
elif status == 'error':
self.metrics['requests_error'] += 1
elif status == 'timeout':
self.metrics['requests_timeout'] += 1
# Track response times
self.durations.append(duration)
if len(self.durations) > 1000: # Keep last 1000
self.durations.pop(0)
self.metrics['average_duration'] = sum(self.durations) / len(self.durations)
def record_tool_usage(self, tool_name: str):
"""Track tool usage statistics"""
if tool_name not in self.metrics['tool_usage']:
self.metrics['tool_usage'][tool_name] = 0
self.metrics['tool_usage'][tool_name] += 1
def get_health_metrics(self) -> Dict[str, Any]:
"""Get metrics for health monitoring"""
uptime = time.time() - self.start_time
return {
'uptime_seconds': uptime,
'requests_per_minute': self.metrics['requests_total'] / (uptime / 60) if uptime > 0 else 0,
'error_rate': self.metrics['requests_error'] / max(self.metrics['requests_total'], 1),
'average_response_time': self.metrics['average_duration'],
'active_users_count': len(self.metrics['active_users']),
'most_used_tools': sorted(
self.metrics['tool_usage'].items(),
key=lambda x: x[1],
reverse=True
)[:5]
}
class HealthMonitor:
"""System health monitoring"""
def __init__(self):
self.last_check = time.time()
self.is_healthy_status = True
self.health_history = []
def is_healthy(self) -> bool:
"""Comprehensive health check"""
now = time.time()
# Run checks every 30 seconds
if now - self.last_check > 30:
self._run_health_checks()
self.last_check = now
return self.is_healthy_status
def _run_health_checks(self):
"""Execute all health checks"""
checks = {
'database': self._check_database(),
'memory': self._check_memory_usage(),
'disk': self._check_disk_space(),
'external_apis': self._check_external_apis()
}
# Store health history
health_record = {
'timestamp': datetime.now(),
'checks': checks,
'overall_healthy': all(checks.values())
}
self.health_history.append(health_record)
if len(self.health_history) > 100: # Keep last 100 checks
self.health_history.pop(0)
self.is_healthy_status = health_record['overall_healthy']
if not self.is_healthy_status:
logging.warning(f"Health check failed: {checks}")
def _check_database(self) -> bool:
"""Check database connectivity"""
try:
# Test database connection
conn = sqlite3.connect("agent_memory.db", timeout=5)
cursor = conn.cursor()
cursor.execute("SELECT 1")
conn.close()
return True
except Exception as e:
logging.error(f"Database health check failed: {e}")
return False
def _check_memory_usage(self) -> bool:
"""Check system memory usage"""
try:
import psutil
memory_percent = psutil.virtual_memory().percent
return memory_percent < 90 # Fail if over 90% memory usage
except ImportError:
return True # Skip if psutil not available
except Exception:
return False
def _check_disk_space(self) -> bool:
"""Check available disk space"""
try:
import psutil
disk_percent = psutil.disk_usage('/').percent
return disk_percent < 90 # Fail if over 90% disk usage
except ImportError:
return True # Skip if psutil not available
except Exception:
return False
def _check_external_apis(self) -> bool:
"""Check external API availability"""
# Implement checks for your external dependencies
# e.g., OpenAI API, database services, etc.
return True
class RateLimiter:
"""Production rate limiter with multiple strategies"""
def __init__(self, max_requests_per_minute: int):
self.max_requests = max_requests_per_minute
self.user_buckets = {}
self.global_bucket = {'tokens': max_requests_per_minute * 10, 'last_update': time.time()}
async def allow_request(self, user_id: str) -> bool:
"""Check rate limits (per-user and global)"""
now = time.time()
# Check global rate limit first
if not self._check_global_limit(now):
return False
# Check per-user rate limit
return self._check_user_limit(user_id, now)
def _check_global_limit(self, now: float) -> bool:
"""Check global system rate limit"""
bucket = self.global_bucket
# Refill global bucket
time_passed = now - bucket['last_update']
refill_rate = (self.max_requests * 10) / 60 # Per second
bucket['tokens'] = min(
self.max_requests * 10,
bucket['tokens'] + (time_passed * refill_rate)
)
bucket['last_update'] = now
if bucket['tokens'] >= 1:
bucket['tokens'] -= 1
return True
return False
def _check_user_limit(self, user_id: str, now: float) -> bool:
"""Check per-user rate limit"""
if user_id not in self.user_buckets:
self.user_buckets[user_id] = {
'tokens': self.max_requests,
'last_update': now
}
return True
bucket = self.user_buckets[user_id]
# Refill user bucket
time_passed = now - bucket['last_update']
refill_rate = self.max_requests / 60 # Per second
bucket['tokens'] = min(
self.max_requests,
bucket['tokens'] + (time_passed * refill_rate)
)
bucket['last_update'] = now
if bucket['tokens'] >= 1:
bucket['tokens'] -= 1
return True
return False
```
### FastAPI Production Server
**Complete Request Lifecycle:**Client Request: POST /chat with JWT token | v +---------------------------------------------------------------+
HEALTH CHECK FLOW: Client --> GET /health --> Agent.health_monitor.is_healthy() --> Agent.metrics.get_health_metrics() --> Return status + metrics
METRICS FLOW:
Client --> GET /metrics --> Agent.metrics.get_health_metrics()
--> Return comprehensive system metrics
```python
# production_server.py
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import uvicorn
import jwt
import os
# Request/Response models
class ChatRequest(BaseModel):
message: str
user_id: str
session_id: str = None
class ChatResponse(BaseModel):
success: bool
response: str = None
error: str = None
correlation_id: str
timestamp: str
# Initialize FastAPI app
app = FastAPI(
title="AI Agent API",
description="Production AI Agent Service",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure for your domains
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Security
security = HTTPBearer()
# Initialize agent
config = AgentConfig(
name="production_agent",
max_requests_per_minute=int(os.getenv("MAX_REQUESTS_PER_MINUTE", "60")),
timeout_seconds=int(os.getenv("TIMEOUT_SECONDS", "30"))
)
agent = ProductionAgent(config)
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token"""
try:
# Implement your JWT verification logic
# For demo, we'll skip verification
return {"user_id": "authenticated_user"}
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(
request: ChatRequest,
background_tasks: BackgroundTasks,
user_info: dict = Depends(verify_token)
):
"""Main chat endpoint with background processing"""
try:
# Process request
result = await agent.process_request(
request=request.message,
user_id=request.user_id
)
# Schedule background tasks
background_tasks.add_task(
log_user_interaction,
request.user_id,
request.message,
result
)
return ChatResponse(
success=result['success'],
response=result.get('response'),
error=result.get('error'),
correlation_id=result['correlation_id'],
timestamp=result['timestamp']
)
except Exception as e:
logging.error(f"Chat endpoint error: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/health")
async def health_check():
"""Health check endpoint for load balancers"""
health_metrics = agent.metrics.get_health_metrics()
is_healthy = agent.health_monitor.is_healthy()
if is_healthy:
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"metrics": health_metrics
}
else:
raise HTTPException(
status_code=503,
detail="Service unhealthy"
)
@app.get("/metrics")
async def metrics_endpoint():
"""Metrics endpoint for monitoring systems"""
return agent.metrics.get_health_metrics()
@app.get("/admin/users/{user_id}/context")
async def get_user_context(user_id: str, user_info: dict = Depends(verify_token)):
"""Admin endpoint to view user context"""
# Check admin permissions
if not user_info.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin access required")
context = agent.memory.get_relevant_context(user_id, "")
return context
async def log_user_interaction(user_id: str, message: str, result: dict):
"""Background task to log interactions"""
# Implement your logging logic
pass
if __name__ == "__main__":
uvicorn.run(
"production_server:app",
host="0.0.0.0",
port=int(os.getenv("PORT", "8000")),
workers=int(os.getenv("WORKERS", "1")),
log_level=os.getenv("LOG_LEVEL", "info").lower()
)
```
### Docker Configuration
```dockerfile
# Dockerfile
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user for security
RUN useradd --create-home --shell /bin/bash --user-group agent
RUN chown -R agent:agent /app
USER agent
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Expose port
EXPOSE 8000
# Command to run the application
CMD ["python", "production_server.py"]
```
### Docker Compose for Local Development
```yaml
# docker-compose.yml
version: '3.8'
services:
ai-agent:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=sqlite:///data/agent.db
- LOG_LEVEL=INFO
- MAX_REQUESTS_PER_MINUTE=60
volumes:
- ./data:/app/data
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis_data:/data
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
restart: unless-stopped
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
restart: unless-stopped
volumes:
redis_data:
grafana_data:
```
### Kubernetes Deployment
```yaml
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-agent
labels:
app: ai-agent
spec:
replicas: 3
selector:
matchLabels:
app: ai-agent
template:
metadata:
labels:
app: ai-agent
spec:
containers:
- name: ai-agent
image: your-registry/ai-agent:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: openai-api-key
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: agent-secrets
key: database-url
- name: LOG_LEVEL
value: "INFO"
- name: MAX_REQUESTS_PER_MINUTE
value: "60"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
---
apiVersion: v1
kind: Service
metadata:
name: ai-agent-service
spec:
selector:
app: ai-agent
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ai-agent-ingress
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
tls:
- hosts:
- your-domain.com
secretName: ai-agent-tls
rules:
- host: your-domain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: ai-agent-service
port:
number: 80
```
---
## 7. Next Steps & Best Practices
### Implementation Roadmap
**Phase 1: Foundation (Week 1-2)**
- ✅ Set up basic agent with OpenAI integration
- ✅ Implement core agent loop (PERCEIVE → THINK → ACT → LEARN)
- ✅ Add 2-3 basic tools (calculator, time, memory)
- ✅ Basic error handling and logging
**Phase 2: Enhancement (Week 3-4)**
- ✅ Add persistent memory with SQLite
- ✅ Implement user authentication
- ✅ Add rate limiting and basic security
- ✅ Create simple web interface
**Phase 3: Production (Week 5-6)**
- ✅ Deploy with Docker
- ✅ Add comprehensive monitoring
- ✅ Implement health checks
- ✅ Set up CI/CD pipeline
**Phase 4: Scale (Week 7-8)**
- ✅ Add more sophisticated tools
- ✅ Implement caching strategies
- ✅ Scale with Kubernetes
- ✅ Optimize costs and performance
### Key Success Metrics
**Technical Metrics:**
- Response time < 2 seconds (95th percentile)
- Uptime > 99.9%
- Error rate < 1%
- Cost per request optimized
**User Metrics:**
- User satisfaction scores
- Task completion rates
- Feature adoption
- Retention rates
### Common Pitfalls to Avoid
**❌ Don't Do:**
- Start with complex multi-agent systems
- Ignore rate limiting and costs
- Skip monitoring and logging
- Store secrets in code
- Deploy without health checks
**✅ Do Instead:**
- Start simple, add complexity gradually
- Monitor costs from day one
- Implement observability early
- Use proper secret management
- Test thoroughly before production
### Recommended Tools & Services
**Development:**
- **LLM APIs**: OpenAI, Anthropic, Google AI
- **Frameworks**: FastAPI, Flask, Django
- **Databases**: PostgreSQL, SQLite, Redis
- **Testing**: pytest, unittest, locust
**Production:**
- **Containers**: Docker, Kubernetes
- **Monitoring**: Prometheus, Grafana, DataDog
- **Logging**: ELK Stack, Fluentd
- **Security**: Vault, AWS Secrets Manager
**AI/ML:**
- **Vector Databases**: Pinecone, Weaviate, Chroma
- **Embedding Models**: OpenAI, Sentence Transformers
- **Fine-tuning**: Weights & Biases, MLflow
### Learning Resources
**Books:**
- "Designing Data-Intensive Applications" by Martin Kleppmann
- "Site Reliability Engineering" by Google
- "Building Microservices" by Sam Newman
**Documentation:**
- OpenAI API Documentation
- FastAPI Documentation
- Kubernetes Documentation
**Communities:**
- AI/ML Discord servers
- Reddit: r/MachineLearning, r/ArtificialIntelligence
- LangChain Community
---
## Conclusion
You now have everything needed to build production-ready AI agents:
**🏗️ Architecture**: Understanding of agent loops and system design
**💻 Implementation**: Complete, working code examples
**🔧 Tools**: Professional tool system with security
**🧠 Memory**: Persistent learning and context awareness
**🚀 Production**: Deployment-ready infrastructure
**📊 Monitoring**: Comprehensive observability
**Start building today!** Begin with the basic agent, add tools one by one, implement memory, then scale to production. Focus on solving real user problems rather than technical complexity.
Remember: The best agent is the one that provides real value to users. Start simple, iterate quickly, and scale based on actual usage patterns.
**Happy building! 🚀**
---
*This guide provides a complete foundation for AI agent development. Bookmark it, share it with your team, and reference it as you build amazing AI-powered applications.*