Content is user-generated and unverified.
# ============================================================================ # FASTAPI TRAVEL BACKEND WITH RAG & API INTEGRATION # ============================================================================ from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import List, Dict, Optional, Any, AsyncGenerator import asyncio import json import logging from datetime import datetime, date from contextlib import asynccontextmanager import uvicorn from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from slowapi.middleware import SlowAPIMiddleware import redis.asyncio as redis # Import our custom modules (from previous code) # from rag_pipeline import TravelRAGPipeline, TravelQuery, TravelDocument # from api_abstraction import TravelAPIService, AmadeusFlightProvider, BookingComHotelProvider # ============================================================================ # PYDANTIC MODELS FOR API # ============================================================================ class TravelQueryRequest(BaseModel): """Request model for travel queries""" text: str = Field(..., description="The travel query text", min_length=1, max_length=500) location: Optional[str] = Field(None, description="Location for the query") budget: Optional[float] = Field(None, description="Budget in USD", gt=0) duration: Optional[int] = Field(None, description="Duration in days", gt=0, le=365) preferences: Optional[List[str]] = Field(default=[], description="User preferences") class Config: schema_extra = { "example": { "text": "Best family-friendly activities in Tokyo with kids under 10", "location": "Tokyo", "budget": 1000.0, "duration": 5, "preferences": ["family-friendly", "educational", "cultural"] } } class TravelQueryResponse(BaseModel): """Response model for travel queries""" query_id: str answer: str confidence: float sources: List[Dict[str, Any]] processing_time_ms: int cached: bool = False class FlightSearchRequest(BaseModel): """Request model for flight searches""" origin: str = Field(..., description="Origin airport code", min_length=3, max_length=3) destination: str = Field(..., description="Destination airport code", min_length=3, max_length=3) departure_date: date = Field(..., description="Departure date") return_date: Optional[date] = Field(None, description="Return date for round trip") passengers: int = Field(default=1, description="Number of passengers", ge=1, le=9) class Config: schema_extra = { "example": { "origin": "NYC", "destination": "LAX", "departure_date": "2024-07-15", "return_date": "2024-07-22", "passengers": 2 } } class HotelSearchRequest(BaseModel): """Request model for hotel searches""" location: str = Field(..., description="Hotel location", min_length=2) checkin_date: date = Field(..., description="Check-in date") checkout_date: date = Field(..., description="Check-out date") guests: int = Field(default=2, description="Number of guests", ge=1, le=10) rooms: int = Field(default=1, description="Number of rooms", ge=1, le=5) class Config: schema_extra = { "example": { "location": "Los Angeles", "checkin_date": "2024-07-15", "checkout_date": "2024-07-22", "guests": 2, "rooms": 1 } } class DocumentUploadRequest(BaseModel): """Request model for uploading travel documents""" content: str = Field(..., description="Document content", min_length=10) location: str = Field(..., description="Location related to the document") category: str = Field(..., description="Document category") metadata: Dict[str, Any] = Field(default={}, description="Additional metadata") class Config: schema_extra = { "example": { "content": "The Louvre Museum in Paris is the world's largest art museum...", "location": "Paris", "category": "attraction", "metadata": { "duration": "half_day", "cost": "moderate", "age_group": "all_ages" } } } class HealthResponse(BaseModel): """Health check response""" status: str timestamp: datetime version: str services: Dict[str, str] # ============================================================================ # FASTAPI APPLICATION SETUP # ============================================================================ # Rate limiting setup limiter = Limiter(key_func=get_remote_address) # Global services (will be initialized in lifespan) rag_pipeline = None api_service = None redis_client = None @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan management""" global rag_pipeline, api_service, redis_client # Startup logging.info("Starting up Travel API...") # Initialize Redis redis_client = redis.Redis(host='localhost', port=6379, db=0) # Initialize RAG Pipeline rag_pipeline = TravelRAGPipeline( openai_api_key="your-openai-api-key" # Replace with actual key ) # Initialize API Service api_service = TravelAPIService() # Add API providers amadeus_provider = AmadeusFlightProvider("amadeus_key", "amadeus_secret") booking_provider = BookingComHotelProvider("booking_key") api_service.add_flight_provider(amadeus_provider) api_service.add_hotel_provider(booking_provider) # Load initial documents (could be from database) await load_initial_documents() logging.info("Travel API startup complete") yield # Shutdown logging.info("Shutting down Travel API...") if redis_client: await redis_client.close() logging.info("Travel API shutdown complete") # Create FastAPI app app = FastAPI( title="TravelWise API", description="AI-powered travel planning API with RAG and multi-provider integration", version="1.0.0", lifespan=lifespan ) # Add middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware(SlowAPIMiddleware) # Rate limiting error handler app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # ============================================================================ # DEPENDENCY INJECTION # ============================================================================ async def get_rag_pipeline(): """Dependency to get RAG pipeline""" if rag_pipeline is None: raise HTTPException(status_code=503, detail="RAG pipeline not initialized") return rag_pipeline async def get_api_service(): """Dependency to get API service""" if api_service is None: raise HTTPException(status_code=503, detail="API service not initialized") return api_service async def get_redis(): """Dependency to get Redis client""" if redis_client is None: raise HTTPException(status_code=503, detail="Redis not initialized") return redis_client # ============================================================================ # HELPER FUNCTIONS # ============================================================================ async def load_initial_documents(): """Load initial travel documents into the knowledge base""" initial_docs = [ { "id": "paris_louvre", "content": "The Louvre Museum in Paris houses the Mona Lisa and Venus de Milo. Best visited early morning to avoid crowds. Allow 3-4 hours for a proper visit.", "location": "Paris", "category": "attraction", "metadata": {"duration": "half_day", "cost": "moderate", "age_group": "all_ages"} }, { "id": "tokyo_tsukiji", "content": "Tsukiji Outer Market in Tokyo offers the best sushi breakfast experience. Visit early morning (5-8 AM) for fresh fish and authentic atmosphere.", "location": "Tokyo", "category": "dining", "metadata": {"duration": "2_hours", "cost": "budget", "cuisine": "japanese"} }, { "id": "nyc_central_park", "content": "Central Park in NYC is perfect for families. Features playgrounds, boat rentals, and the Central Park Zoo. Great for picnics and outdoor activities.", "location": "New York", "category": "attraction", "metadata": {"duration": "full_day", "cost": "free", "age_group": "family"} } ] for doc_data in initial_docs: doc = TravelDocument(**doc_data) await rag_pipeline.add_document(doc) logging.info(f"Loaded {len(initial_docs)} initial documents") def convert_to_travel_query(request: TravelQueryRequest) -> TravelQuery: """Convert request model to internal query model""" return TravelQuery( text=request.text, location=request.location, budget=request.budget, duration=request.duration, preferences=request.preferences or [] ) # ============================================================================ # API ENDPOINTS # ============================================================================ @app.get("/", response_model=HealthResponse) async def root(): """Root endpoint with basic health check""" return HealthResponse( status="healthy", timestamp=datetime.now(), version="1.0.0", services={ "rag_pipeline": "active" if rag_pipeline else "inactive", "api_service": "active" if api_service else "inactive", "redis": "active" if redis_client else "inactive" } ) @app.get("/health", response_model=HealthResponse) async def health_check(): """Detailed health check endpoint""" services = {} # Check RAG pipeline try: if rag_pipeline: services["rag_pipeline"] = "healthy" else: services["rag_pipeline"] = "not_initialized" except Exception as e: services["rag_pipeline"] = f"error: {str(e)}" # Check API service try: if api_service: services["api_service"] = "healthy" else: services["api_service"] = "not_initialized" except Exception as e: services["api_service"] = f"error: {str(e)}" # Check Redis try: if redis_client: await redis_client.ping() services["redis"] = "healthy" else: services["redis"] = "not_initialized" except Exception as e: services["redis"] = f"error: {str(e)}" return HealthResponse( status="healthy", timestamp=datetime.now(), version="1.0.0", services=services ) # ============================================================================ # RAG ENDPOINTS # ============================================================================ @app.post("/api/v1/travel/ask", response_model=TravelQueryResponse) @limiter.limit("30/minute") async def ask_travel_question( request: TravelQueryRequest, background_tasks: BackgroundTasks, rag_service: TravelRAGPipeline = Depends(get_rag_pipeline) ): """Ask a travel-related question using RAG""" start_time = datetime.now() try: # Convert request to internal model travel_query = convert_to_travel_query(request) # Process through RAG pipeline response = await rag_service.process_query(travel_query) # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() * 1000 # Log query for analytics (background task) background_tasks.add_task( log_query_analytics, query_id=response.query_id, query_text=request.text, location=request.location, confidence=response.confidence, processing_time_ms=int(processing_time) ) return TravelQueryResponse( query_id=response.query_id, answer=response.answer, confidence=response.confidence, sources=response.sources, processing_time_ms=int(processing_time), cached=False ) except Exception as e: logging.error(f"Error processing travel query: {str(e)}") raise HTTPException(status_code=500, detail=f"Error processing query: {str(e)}") @app.post("/api/v1/travel/ask/stream") @limiter.limit("10/minute") async def ask_travel_question_stream( request: TravelQueryRequest, rag_service: TravelRAGPipeline = Depends(get_rag_pipeline) ): """Stream a travel-related question response""" async def generate_response() -> AsyncGenerator[str, None]: try: travel_query = convert_to_travel_query(request) # In a real implementation, you'd modify the RAG pipeline to support streaming # For now, we'll simulate streaming by chunking the response response = await rag_service.process_query(travel_query) # Simulate streaming by breaking response into chunks words = response.answer.split() chunk_size = 5 for i in range(0, len(words), chunk_size): chunk = " ".join(words[i:i + chunk_size]) yield f"data: {json.dumps({'text': chunk, 'done': False})}\n\n" await asyncio.sleep(0.1) # Simulate processing delay # Send final metadata yield f"data: {json.dumps({'sources': response.sources, 'confidence': response.confidence, 'done': True})}\n\n" except Exception as e: yield f"data: {json.dumps({'error': str(e), 'done': True})}\n\n" return StreamingResponse( generate_response(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } ) @app.post("/api/v1/travel/documents") @limiter.limit("100/hour") async def upload_travel_document( request: DocumentUploadRequest, background_tasks: BackgroundTasks, rag_service: TravelRAGPipeline = Depends(get_rag_pipeline) ): """Upload a new travel document to the knowledge base""" try: # Create document doc_id = f"doc_{datetime.now().timestamp()}" document = TravelDocument( id=doc_id, content=request.content, location=request.location, category=request.category, metadata=request.metadata ) # Add to knowledge base (background task for performance) background_tasks.add_task(rag_service.add_document, document) return {"message": "Document uploaded successfully", "document_id": doc_id} except Exception as e: logging.error(f"Error uploading document: {str(e)}") raise HTTPException(status_code=500, detail=f"Error uploading document: {str(e)}") # ============================================================================ # FLIGHT SEARCH ENDPOINTS # ============================================================================ @app.post("/api/v1/flights/search") @limiter.limit("60/hour") async def search_flights( request: FlightSearchRequest, background_tasks: BackgroundTasks, api_service: TravelAPIService = Depends(get_api_service) ): """Search for flights across multiple providers""" start_time = datetime.now() try: # Convert dates to strings departure_str = request.departure_date.strftime("%Y-%m-%d") return_str = request.return_date.strftime("%Y-%m-%d") if request.return_date else None # Search flights results = await api_service.search_flights_aggregated( origin=request.origin, destination=request.destination, departure_date=departure_str, return_date=return_str, passengers=request.passengers ) # Add processing time processing_time = (datetime.now() - start_time).total_seconds() * 1000 results["processing_time_ms"] = int(processing_time) # Log search for analytics background_tasks.add_task( log_flight_search, origin=request.origin, destination=request.destination, departure_date=departure_str, passengers=request.passengers, results_count=results["total_results"] ) return results except Exception as e: logging.error(f"Error searching flights: {str(e)}") raise HTTPException(status_code=500, detail=f"Error searching flights: {str(e)}") @app.get("/api/v1/flights/popular-routes") @limiter.limit("100/hour") async def get_popular_routes( limit: int = Query(default=10, ge=1, le=50), redis_client: redis.Redis = Depends(get_redis) ): """Get popular flight routes based on search history""" try: # Get popular routes from Redis (would be populated by background analytics) routes_data = await redis_client.get("popular_routes") if routes_data: routes = json.loads(routes_data) return {"routes": routes[:limit]} else: # Return default popular routes return { "routes": [ {"route": "NYC-LAX", "searches": 1250}, {"route": "NYC-MIA", "searches": 890}, {"route": "LAX-LAS", "searches": 675}, {"route": "NYC-BOS", "searches": 540}, {"route": "CHI-LAX", "searches": 435} ][:limit] } except Exception as e: logging.error(f"Error getting popular routes: {str(e)}") raise HTTPException(status_code=500, detail="Error retrieving popular routes") # ============================================================================ # HOTEL SEARCH ENDPOINTS # ============================================================================ @app.post("/api/v1/hotels/search") @limiter.limit("60/hour") async def search_hotels( request: HotelSearchRequest, background_tasks: BackgroundTasks, api_service: TravelAPIService = Depends(get_api_service) ): """Search for hotels across multiple providers""" start_time = datetime.now() try: # Convert dates to strings checkin_str = request.checkin_date.strftime("%Y-%m-%d") checkout_str = request.checkout_date.strftime("%Y-%m-%d") # Search hotels results = await api_service.search_hotels_aggregated( location=request.location, checkin=checkin_str, checkout=checkout_str, guests=request.guests, rooms=request.rooms ) # Add processing time processing_time = (datetime.now() - start_time).total_seconds() * 1000 results["processing_time_ms"] = int(processing_time) # Log search for analytics background_tasks.add_task( log_hotel_search, location=request.location, checkin_date=checkin_str, checkout_date=checkout_str, guests=request.guests, results_count=results["total_results"] ) return results except Exception as e: logging.error(f"Error searching hotels: {str(e)}") raise HTTPException(status_code=500, detail=f"Error searching hotels: {str(e)}") # ============================================================================ # ANALYTICS AND BACKGROUND TASKS # ============================================================================ async def log_query_analytics(query_id: str, query_text: str, location: str, confidence: float, processing_time_ms: int): """Log query analytics for monitoring and improvement""" try: analytics_data = { "query_id": query_id, "query_text": query_text, "location": location, "confidence": confidence, "processing_time_ms": processing_time_ms, "timestamp": datetime.now().isoformat() } # In production, you'd send this to your analytics service logging.info(f"Query analytics: {analytics_data}") # Store in Redis for short-term analytics if redis_client: await redis_client.lpush("query_analytics", json.dumps(analytics_data)) await redis_client.expire("query_analytics", 86400) # 24 hours except Exception as e: logging.error(f"Error logging query analytics: {str(e)}") async def log_flight_search(origin: str, destination: str, departure_date: str, passengers: int, results_count: int): """Log flight search analytics""" try: analytics_data = { "type": "flight_search", "origin": origin, "destination": destination, "departure_date": departure_date, "passengers": passengers, "results_count": results_count, "timestamp": datetime.now().isoformat() } logging.info(f"Flight search analytics: {analytics_data}") if redis_client: await redis_client.lpush("flight_analytics", json.dumps(analytics_data)) await redis_client.expire("flight_analytics", 86400) except Exception as e: logging.error(f"Error logging flight analytics: {str(e)}") async def log_hotel_search(location: str, checkin_date: str, checkout_date: str, guests: int, results_count: int): """Log hotel search analytics""" try: analytics_data = { "type": "hotel_search", "location": location, "checkin_date": checkin_date, "checkout_date": checkout_date, "guests": guests, "results_count": results_count, "timestamp": datetime.now().isoformat() } logging.info(f"Hotel search analytics: {analytics_data}") if redis_client: await redis_client.lpush("hotel_analytics", json.dumps(analytics_data)) await redis_client.expire("hotel_analytics", 86400) except Exception as e: logging.error(f"Error logging hotel analytics: {str(e)}") # ============================================================================ # ADMIN ENDPOINTS # ============================================================================ @app.get("/api/v1/admin/analytics") @limiter.limit("10/minute") async def get_analytics_summary( redis_client: redis.Redis = Depends(get_redis) ): """Get analytics summary (admin only - add authentication in production)""" try: # Get recent analytics data query_analytics = await redis_client.lrange("query_analytics", 0, 99) flight_analytics = await redis_client.lrange("flight_analytics", 0, 99) hotel_analytics = await redis_client.lrange("hotel_analytics", 0, 99) # Parse and summarize summary = { "total_queries": len(query_analytics), "total_flight_searches": len(flight_analytics), "total_hotel_searches": len(hotel_analytics), "avg_query_confidence": 0.0, "avg_processing_time": 0.0, "top_locations": {}, "last_updated": datetime.now().isoformat() } # Calculate averages if query_analytics: confidences = [] processing_times = [] locations = {} for item in query_analytics: data = json.loads(item) confidences.append(data.get("confidence", 0)) processing_times.append(data.get("processing_time_ms", 0)) location = data.get("location") if location: locations[location] = locations.get(location, 0) + 1 summary["avg_query_confidence"] = sum(confidences) / len(confidences) summary["avg_processing_time"] = sum(processing_times) / len(processing_times) summary["top_locations"] = dict(sorted(locations.items(), key=lambda x: x[1], reverse=True)[:10]) return summary except Exception as e: logging.error(f"Error getting analytics: {str(e)}") raise HTTPException(status_code=500, detail="Error retrieving analytics") @app.post("/api/v1/admin/cache/clear") @limiter.limit("5/minute") async def clear_cache( cache_type: str = Query(default="all", regex="^(all|flights|hotels|rag)$"), redis_client: redis.Redis = Depends(get_redis) ): """Clear cache (admin only - add authentication in production)""" try: if cache_type == "all": await redis_client.flushdb() return {"message": "All cache cleared"} elif cache_type == "flights": # Clear flight-related cache keys = await redis_client.keys("flights:*") if keys: await redis_client.delete(*keys) return {"message": "Flight cache cleared"} elif cache_type == "hotels": # Clear hotel-related cache keys = await redis_client.keys("hotels:*") if keys: await redis_client.delete(*keys) return {"message": "Hotel cache cleared"} elif cache_type == "rag": # Clear RAG-related cache keys = await redis_client.keys("context:*") if keys: await redis_client.delete(*keys) return {"message": "RAG cache cleared"} except Exception as e: logging.error(f"Error clearing cache: {str(e)}") raise HTTPException(status_code=500, detail="Error clearing cache") # ============================================================================ # MAIN APPLICATION RUNNER # ============================================================================ if __name__ == "__main__": # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) # Run the application uvicorn.run( "main:app", # Replace with your filename host="0.0.0.0", port=8000, reload=True, # Remove in production workers=1, # Increase for production log_level="info" ) # ============================================================================ # EXAMPLE USAGE AND TESTING # ============================================================================ """ Example API calls using curl or httpx: 1. Ask a travel question: curl -X POST "http://localhost:8000/api/v1/travel/ask" \ -H "Content-Type: application/json" \ -d '{ "text": "Best restaurants in Paris for dinner", "location": "Paris", "budget": 200, "preferences": ["romantic", "french_cuisine"] }' 2. Search flights: curl -X POST "http://localhost:8000/api/v1/flights/search" \ -H "Content-Type: application/json" \ -d '{ "origin": "NYC", "destination": "PAR", "departure_date": "2024-07-15", "return_date": "2024-07-22", "passengers": 2 }' 3. Search hotels: curl -X POST "http://localhost:8000/api/v1/hotels/search" \ -H "Content-Type: application/json" \ -d '{ "location": "Paris", "checkin_date": "2024-07-15", "checkout_date": "2024-07-22", "guests": 2, "rooms": 1 }' 4. Upload document: curl -X POST "http://localhost:8000/api/v1/travel/documents" \ -H "Content-Type: application/json" \ -d '{ "content": "The Eiffel Tower is best visited at sunset...", "location": "Paris", "category": "attraction", "metadata": {"duration": "2_hours", "cost": "moderate"} }' 5. Get analytics: curl -X GET "http://localhost:8000/api/v1/admin/analytics" 6. Health check: curl -X GET "http://localhost:8000/health" """
Content is user-generated and unverified.
    FastAPI Travel Backend with RAG & API Integration | Claude