Content is user-generated and unverified.
#!/usr/bin/env python3 """ CloudFront to Loki Log Processor This script downloads new CloudFront logs from S3 and pushes them to Loki. It tracks processed files to avoid duplicates and filters for 404 responses. """ import boto3 import gzip import json import requests import time import os import argparse from datetime import datetime, timedelta from pathlib import Path # Try to import python-dotenv, but don't fail if it's not available try: from dotenv import load_dotenv DOTENV_AVAILABLE = True except ImportError: DOTENV_AVAILABLE = False def load_env_file(env_file=None): """Load environment variables from .env file if available""" if not DOTENV_AVAILABLE: return if env_file: if os.path.exists(env_file): load_dotenv(env_file) else: print(f"Warning: Specified .env file '{env_file}' not found") else: # Try common locations env_files = ['.env', '.env.local', os.path.expanduser('~/.cloudfront-loki.env')] for env_file in env_files: if os.path.exists(env_file): load_dotenv(env_file) break def get_env_default(key, default=None): """Get environment variable with fallback to default""" return os.getenv(key, default) class CloudFrontLogsProcessor: def __init__(self, config): self.config = config self.s3_client = boto3.client('s3') self.processed_files = self.load_processed_files() def load_processed_files(self): """Load the list of already processed files""" if os.path.exists(self.config.state_file): with open(self.config.state_file, 'r') as f: return set(line.strip() for line in f) return set() def cleanup_old_state_entries(self): """Remove state entries older than configured days""" if not os.path.exists(self.config.state_file): return days_to_keep = self.config.state_cleanup_days cutoff_date = datetime.now() - timedelta(days=days_to_keep) if self.config.verbose and not self.config.quiet: print(f"Cleaning up state file - keeping entries newer than {cutoff_date.strftime('%Y-%m-%d')}") # Read current entries with open(self.config.state_file, 'r') as f: lines = f.readlines() original_count = len(lines) # Filter entries - CloudFront filenames contain date info kept_lines = [] for line in lines: filename = line.strip() if not filename: continue # Extract date from filename like "blog.lmorchard.com/E5YXU82LZHZCM.2025-06-04-05.d024d283.gz" try: # Handle both with and without prefix basename = filename.split('/')[-1] # Get just the filename part date_part = basename.split('.')[1] # "2025-06-04-05" file_date = datetime.strptime(date_part[:10], '%Y-%m-%d') if file_date >= cutoff_date: kept_lines.append(line) elif self.config.debug and not self.config.quiet: print(f" Removing old state entry: {filename} (date: {file_date.strftime('%Y-%m-%d')})") except (IndexError, ValueError) as e: # Keep lines we can't parse (safer) - might be different filename format kept_lines.append(line) if self.config.debug and not self.config.quiet: print(f" Keeping unparseable state entry: {filename} ({e})") # Only rewrite if we actually removed something if len(kept_lines) < original_count: with open(self.config.state_file, 'w') as f: f.writelines(kept_lines) removed_count = original_count - len(kept_lines) if self.config.verbose and not self.config.quiet: print(f"Cleaned up state file: removed {removed_count} old entries, kept {len(kept_lines)}") elif self.config.verbose and not self.config.quiet: print(f"State file cleanup: no old entries to remove ({original_count} entries total)") def save_processed_file(self, filename): """Mark a file as processed""" self.processed_files.add(filename) with open(self.config.state_file, 'a') as f: f.write(f"{filename}\n") def get_new_log_files(self): """Get list of new log files from S3""" try: response = self.s3_client.list_objects_v2( Bucket=self.config.s3_bucket, Prefix=self.config.s3_prefix ) if 'Contents' not in response: if not self.config.quiet: print("No files found in S3 bucket") return [] new_files = [] for obj in response['Contents']: filename = obj['Key'] if filename.endswith('.gz') and filename not in self.processed_files: new_files.append({ 'key': filename, 'last_modified': obj['LastModified'], 'size': obj['Size'] }) # Sort by last modified time new_files.sort(key=lambda x: x['last_modified']) return new_files except Exception as e: print(f"Error listing S3 objects: {e}") return [] def download_and_parse_log(self, s3_key): """Download and parse a gzipped CloudFront log file""" try: if not self.config.quiet: print(f"Downloading {s3_key}...") # Download the file response = self.s3_client.get_object(Bucket=self.config.s3_bucket, Key=s3_key) # Decompress and parse JSON compressed_data = response['Body'].read() decompressed_data = gzip.decompress(compressed_data) # CloudFront real-time logs are newline-delimited JSON log_entries = [] for line_num, line in enumerate(decompressed_data.decode('utf-8').strip().split('\n'), 1): if line: try: entry = json.loads(line) log_entries.append(entry) # Debug output - show each parsed entry if self.config.debug and not self.config.quiet: print(f" Line {line_num}: {json.dumps(entry, separators=(',', ':'))}") except json.JSONDecodeError as e: print(f"Error parsing JSON line {line_num}: {e}") if self.config.debug and not self.config.quiet: print(f" Raw line: {line}") continue if not self.config.quiet: print(f"Parsed {len(log_entries)} log entries from {s3_key}") return log_entries except Exception as e: print(f"Error downloading/parsing {s3_key}: {e}") return [] def filter_logs(self, log_entries): """Filter logs based on status code if configured""" if self.config.filter_status_codes is None: if self.config.debug and not self.config.quiet: print(f" No status code filtering - keeping all {len(log_entries)} entries") return log_entries filtered = [] for entry in log_entries: # CloudFront real-time logs use 'sc-status' for status code status_code = entry.get('sc-status', entry.get('status', 0)) if status_code in self.config.filter_status_codes: filtered.append(entry) if self.config.debug and not self.config.quiet: print(f" Keeping entry with status {status_code}: {json.dumps(entry, separators=(',', ':'))}") elif self.config.debug and not self.config.quiet: print(f" Filtering out entry with status {status_code}") return filtered def format_for_loki(self, log_entries, source_file): """Format log entries for Loki ingestion""" loki_streams = {} for entry in log_entries: # Extract useful fields for labels status_code = entry.get('sc-status', entry.get('status', 'unknown')) method = entry.get('cs-method', entry.get('method', 'unknown')) # Create label set labels = { 'job': 'cloudfront', 'domain': self.config.domain, 'status_code': str(status_code), 'method': str(method) } # Convert labels to Loki format label_string = '{' + ','.join([f'{k}="{v}"' for k, v in labels.items()]) + '}' # Get timestamp (CloudFront logs include timestamp) timestamp = entry.get('timestamp') timestamp_ns = None if timestamp: # Handle ISO format timestamp if isinstance(timestamp, str): try: # CloudFront timestamps are in UTC if timestamp.endswith('Z'): dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) elif '+' in timestamp or timestamp.endswith('UTC'): dt = datetime.fromisoformat(timestamp.replace('UTC', '').strip()) else: # Assume UTC if no timezone specified dt = datetime.fromisoformat(timestamp) if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.now().astimezone().tzinfo.utc) timestamp_ns = int(dt.timestamp() * 1_000_000_000) if self.config.debug and not self.config.quiet: print(f" Parsed timestamp: {timestamp} -> {dt} -> {timestamp_ns}") except Exception as e: if self.config.debug and not self.config.quiet: print(f" Error parsing timestamp '{timestamp}': {e}") else: timestamp_ns = int(timestamp * 1_000_000_000) # Fallback to date + time fields if timestamp not available if timestamp_ns is None: date_str = entry.get('date') time_str = entry.get('time') if date_str and time_str: try: # Combine date and time: "2025-06-04" + "05:38:49" # CloudFront date/time is in UTC datetime_str = f"{date_str} {time_str}" dt = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') # Explicitly set as UTC dt = dt.replace(tzinfo=datetime.now().astimezone().tzinfo.utc) timestamp_ns = int(dt.timestamp() * 1_000_000_000) if self.config.debug and not self.config.quiet: print(f" Using date+time fallback: {datetime_str} UTC -> {timestamp_ns}") except Exception as e: if self.config.debug and not self.config.quiet: print(f" Error parsing date+time: {e}") # Final fallback to current time if timestamp_ns is None: timestamp_ns = int(time.time() * 1_000_000_000) if self.config.debug and not self.config.quiet: print(f" Using current time fallback") # Create log line with structured data log_line = json.dumps(entry) # Group by label set if label_string not in loki_streams: loki_streams[label_string] = [] loki_streams[label_string].append([str(timestamp_ns), log_line]) # Convert to Loki format streams = [] for labels, values in loki_streams.items(): # Parse labels back to dict label_dict = {} label_pairs = labels[1:-1].split(',') # Remove { and } for pair in label_pairs: k, v = pair.split('=', 1) label_dict[k] = v.strip('"') streams.append({ 'stream': label_dict, 'values': values }) return {'streams': streams} def send_to_loki(self, loki_data): """Send formatted data to Loki""" try: response = requests.post( self.config.loki_url, json=loki_data, headers={'Content-Type': 'application/json'}, timeout=30 ) if response.status_code == 204: if not self.config.quiet: print("Successfully sent logs to Loki") return True else: print(f"Loki responded with status {response.status_code}: {response.text}") return False except Exception as e: print(f"Error sending to Loki: {e}") return False def process_new_logs(self): """Main processing loop""" if not self.config.quiet: print(f"Checking for new CloudFront logs at {datetime.now()}") # Clean up old state entries first self.cleanup_old_state_entries() new_files = self.get_new_log_files() if not new_files: if not self.config.quiet: print("No new log files found") return if not self.config.quiet: print(f"Found {len(new_files)} new log files") for file_info in new_files: s3_key = file_info['key'] if not self.config.quiet: print(f"\nProcessing {s3_key} (size: {file_info['size']} bytes)") # Download and parse log_entries = self.download_and_parse_log(s3_key) if not log_entries: if not self.config.quiet: print(f"No log entries found in {s3_key}") self.save_processed_file(s3_key) continue # Filter if configured filtered_entries = self.filter_logs(log_entries) if self.config.filter_status_codes and not self.config.quiet: print(f"Filtered to {len(filtered_entries)} entries with status codes {self.config.filter_status_codes}") if not filtered_entries: if not self.config.quiet: print(f"No entries match filter criteria") self.save_processed_file(s3_key) continue # Format for Loki loki_data = self.format_for_loki(filtered_entries, s3_key) # Send to Loki if self.config.dry_run: if not self.config.quiet: print(f"DRY RUN: Would send {len(loki_data['streams'])} streams to Loki") self.save_processed_file(s3_key) if not self.config.quiet: print(f"Successfully processed {s3_key} (dry run)") elif self.send_to_loki(loki_data): self.save_processed_file(s3_key) if not self.config.quiet: print(f"Successfully processed {s3_key}") else: print(f"Failed to send {s3_key} to Loki - will retry next run") def parse_args(): """Parse command line arguments""" parser = argparse.ArgumentParser( description='Process CloudFront logs and send them to Loki', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Environment Variables (.env file support): CF_S3_BUCKET S3 bucket name CF_S3_PREFIX S3 prefix/path CF_DOMAIN Domain name for Loki labels CF_LOKI_URL Loki push URL CF_STATE_FILE File to track processed logs CF_STATUS_CODES Comma-separated status codes (e.g., "404,500") CF_ALL_STATUS_CODES Set to "true" to process all status codes CF_VERBOSE Set to "true" for verbose output CF_DEBUG Set to "true" for debug output (shows every log entry) CF_DRY_RUN Set to "true" for dry run mode CF_STATE_CLEANUP_DAYS Days to keep in state file (default: 30) CF_QUIET Set to "true" for quiet mode (only fatal errors) Examples: # Basic usage with defaults %(prog)s # Use custom .env file %(prog)s --env-file /path/to/my.env # Custom configuration %(prog)s --bucket my-logs --prefix mysite.com/ --domain mysite.com # Process all logs (not just 404s) %(prog)s --all-status-codes # Multiple status codes %(prog)s --status-codes 404 500 503 # Custom Loki URL (e.g., via Tailscale) %(prog)s --loki-url http://100.x.x.x:3100/loki/api/v1/push """ ) parser.add_argument( '--env-file', help='Path to .env file (default: looks for .env, .env.local, ~/.cloudfront-loki.env)' ) parser.add_argument( '--bucket', '-b', default=get_env_default('CF_S3_BUCKET', 'lmorchard-logs'), help='S3 bucket name (default: lmorchard-logs, env: CF_S3_BUCKET)' ) parser.add_argument( '--prefix', '-p', default=get_env_default('CF_S3_PREFIX', 'blog.lmorchard.com/'), help='S3 prefix/path (default: blog.lmorchard.com/, env: CF_S3_PREFIX)' ) parser.add_argument( '--domain', '-d', default=get_env_default('CF_DOMAIN', 'blog.lmorchard.com'), help='Domain name for Loki labels (default: blog.lmorchard.com, env: CF_DOMAIN)' ) parser.add_argument( '--loki-url', '-l', default=get_env_default('CF_LOKI_URL', 'http://localhost:3100/loki/api/v1/push'), help='Loki push URL (default: http://localhost:3100/loki/api/v1/push, env: CF_LOKI_URL)' ) parser.add_argument( '--state-file', '-s', default=get_env_default('CF_STATE_FILE', '/tmp/cloudfront_processed.txt'), help='File to track processed logs (default: /tmp/cloudfront_processed.txt, env: CF_STATE_FILE)' ) parser.add_argument( '--state-cleanup-days', type=int, default=int(get_env_default('CF_STATE_CLEANUP_DAYS', '30')), help='Days to keep entries in state file (default: 30, env: CF_STATE_CLEANUP_DAYS)' ) # Handle status codes from environment env_status_codes = get_env_default('CF_STATUS_CODES') env_all_status = get_env_default('CF_ALL_STATUS_CODES', '').lower() in ('true', '1', 'yes', 'on') # Default behavior: if no status codes specified anywhere, process all default_status_codes = [404] # Only used if explicitly setting status codes if env_status_codes: try: default_status_codes = [int(x.strip()) for x in env_status_codes.split(',')] except ValueError: print(f"Warning: Invalid CF_STATUS_CODES format '{env_status_codes}', using all status codes") env_all_status = True elif not env_all_status: # If neither CF_STATUS_CODES nor CF_ALL_STATUS_CODES is set, default to all env_all_status = True parser.add_argument( '--status-codes', type=int, nargs='+', default=default_status_codes, help='HTTP status codes to filter for (env: CF_STATUS_CODES as comma-separated)' ) # Handle boolean flags from environment parser.add_argument( '--all-status-codes', action='store_true', default=env_all_status, help='Process all status codes (overrides --status-codes, env: CF_ALL_STATUS_CODES)' ) env_verbose = get_env_default('CF_VERBOSE', '').lower() in ('true', '1', 'yes', 'on') parser.add_argument( '--verbose', '-v', action='store_true', default=env_verbose, help='Enable verbose output (env: CF_VERBOSE)' ) env_debug = get_env_default('CF_DEBUG', '').lower() in ('true', '1', 'yes', 'on') parser.add_argument( '--debug', action='store_true', default=env_debug, help='Enable debug output - shows every parsed log entry (env: CF_DEBUG)' ) env_quiet = get_env_default('CF_QUIET', '').lower() in ('true', '1', 'yes', 'on') parser.add_argument( '--quiet', '-q', action='store_true', default=env_quiet, help='Quiet mode - only show fatal errors (env: CF_QUIET)' ) env_dry_run = get_env_default('CF_DRY_RUN', '').lower() in ('true', '1', 'yes', 'on') parser.add_argument( '--dry-run', action='store_true', default=env_dry_run, help='Download and parse logs but do not send to Loki (env: CF_DRY_RUN)' ) args = parser.parse_args() # Handle mutually exclusive options if args.quiet and (args.verbose or args.debug): parser.error("--quiet cannot be used with --verbose or --debug") # Handle --all-status-codes flag if args.all_status_codes: args.filter_status_codes = None else: args.filter_status_codes = args.status_codes # Add attribute aliases for the config object args.s3_bucket = args.bucket args.s3_prefix = args.prefix args.loki_url = args.loki_url args.state_file = args.state_file return args def main(): # Load environment variables first load_env_file() args = parse_args() # Load specific .env file if provided if args.env_file: load_env_file(args.env_file) # Re-parse with updated environment - but we need to be careful here # Instead of re-parsing, just update the args with new env values updated_args = parse_args() args = updated_args if args.verbose: print(f"Configuration:") print(f" S3 Bucket: {args.bucket}") print(f" S3 Prefix: {args.prefix}") print(f" Domain: {args.domain}") print(f" Loki URL: {args.loki_url}") print(f" State File: {args.state_file}") print(f" State Cleanup Days: {args.state_cleanup_days}") print(f" Status Filter: {args.filter_status_codes}") print(f" Dry Run: {args.dry_run}") print(f" Debug: {args.debug}") print(f" Quiet: {args.quiet}") if DOTENV_AVAILABLE: print(f" .env support: Available") else: print(f" .env support: Not available (install python-dotenv)") print() processor = CloudFrontLogsProcessor(args) processor.process_new_logs() if __name__ == "__main__": main()
Content is user-generated and unverified.
    CloudFront to Loki Log Processor | Claude