Spaces:
Running
Running
| import logging | |
| from datetime import datetime | |
| import json | |
| from pathlib import Path | |
| import os | |
| from typing import Optional | |
| # Configure logging | |
| # Use /tmp for logs in production (e.g. Hugging Face Spaces) or local logs dir in development | |
| log_dir = Path("/tmp/schematic_ai_logs") if os.environ.get("SPACE_ID") else Path(__file__).parent.parent / "logs" | |
| log_dir.mkdir(exist_ok=True, parents=True) | |
| # Configure file handler for general logs | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() # Always log to console | |
| ] | |
| ) | |
| # Only add file handler if we can write to the directory | |
| try: | |
| if os.access(log_dir, os.W_OK): | |
| file_handler = logging.FileHandler(log_dir / "app.log") | |
| file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) | |
| logging.getLogger().addHandler(file_handler) | |
| except Exception as e: | |
| logging.warning(f"Could not set up file logging: {e}") | |
| def log_category_usage(category: Optional[str] = None, endpoint: Optional[str] = None, success: bool = True): | |
| """Log the usage of a category with endpoint and success information.""" | |
| if not os.access(log_dir, os.W_OK): | |
| logging.warning("Log directory is not writable, skipping category usage logging") | |
| return | |
| stats_file = log_dir / "category_stats.json" | |
| timestamp = datetime.now().isoformat() | |
| try: | |
| if stats_file.exists(): | |
| with open(stats_file, 'r') as f: | |
| stats = json.load(f) | |
| else: | |
| stats = {} | |
| # Initialize category if not exists | |
| category = category or "default" | |
| if category not in stats: | |
| stats[category] = { | |
| "total_requests": 0, | |
| "successful_requests": 0, | |
| "failed_requests": 0, | |
| "endpoints": {}, | |
| "last_used": None | |
| } | |
| # Update category stats | |
| stats[category]["total_requests"] += 1 | |
| if success: | |
| stats[category]["successful_requests"] += 1 | |
| else: | |
| stats[category]["failed_requests"] += 1 | |
| # Update endpoint stats | |
| if endpoint: | |
| if "endpoints" not in stats[category]: | |
| stats[category]["endpoints"] = {} | |
| if endpoint not in stats[category]["endpoints"]: | |
| stats[category]["endpoints"][endpoint] = 0 | |
| stats[category]["endpoints"][endpoint] += 1 | |
| # Update timestamp | |
| stats[category]["last_used"] = timestamp | |
| # Save updated stats | |
| with open(stats_file, 'w') as f: | |
| json.dump(stats, f, indent=4) | |
| except Exception as e: | |
| logging.error(f"Error logging category usage: {e}") | |
| def get_category_statistics(): | |
| """Get the usage statistics for all categories.""" | |
| if not os.access(log_dir, os.W_OK): | |
| logging.warning("Log directory is not writable, cannot read category statistics") | |
| return {} | |
| stats_file = log_dir / "category_stats.json" | |
| try: | |
| if stats_file.exists(): | |
| with open(stats_file, 'r') as f: | |
| return json.load(f) | |
| return {} | |
| except Exception as e: | |
| logging.error(f"Error reading category statistics: {e}") | |
| return {} | |