Spaces:
Paused
Paused
| import datetime | |
| import json | |
| from typing import Dict, List, Optional, Any | |
| class MemoryStorage: | |
| """In-memory storage for tokens and usage data""" | |
| def __init__(self): | |
| self.tokens: Dict[str, Dict] = {} | |
| self.usage_logs: List[Dict] = [] | |
| self.admin_sessions: Dict[str, Dict] = {} | |
| def add_token(self, token_data: Dict) -> bool: | |
| """Add a new token to storage""" | |
| try: | |
| self.tokens[token_data['id']] = token_data | |
| return True | |
| except Exception: | |
| return False | |
| def get_token_by_api_key(self, api_key: str) -> Optional[Dict]: | |
| """Get token by API key""" | |
| for token_id, token_data in self.tokens.items(): | |
| if token_data.get('api_key') == api_key and token_data.get('is_active', True): | |
| return token_data | |
| return None | |
| def get_token_by_id(self, token_id: str) -> Optional[Dict]: | |
| """Get token by ID""" | |
| return self.tokens.get(token_id) | |
| def get_all_tokens(self) -> List[Dict]: | |
| """Get all tokens""" | |
| return list(self.tokens.values()) | |
| def update_token(self, token_id: str, update_data: Dict) -> Optional[Dict]: | |
| """Update a token""" | |
| if token_id in self.tokens: | |
| self.tokens[token_id].update(update_data) | |
| return self.tokens[token_id] | |
| return None | |
| def delete_token(self, token_id: str) -> bool: | |
| """Delete a token""" | |
| if token_id in self.tokens: | |
| del self.tokens[token_id] | |
| return True | |
| return False | |
| def update_token_usage(self, token_id: str, uses_consumed: int, model: str) -> bool: | |
| """Update token usage and log the usage""" | |
| try: | |
| if token_id in self.tokens: | |
| self.tokens[token_id]['current_uses'] += uses_consumed | |
| # Log the usage | |
| usage_log = { | |
| "token_id": token_id, | |
| "model": model, | |
| "uses_consumed": uses_consumed, | |
| "timestamp": datetime.datetime.utcnow().isoformat(), | |
| "request_id": f"req_{datetime.datetime.now().timestamp()}" | |
| } | |
| self.usage_logs.append(usage_log) | |
| # Keep only last 1000 logs to prevent memory bloat | |
| if len(self.usage_logs) > 1000: | |
| self.usage_logs = self.usage_logs[-1000:] | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| def get_usage_stats(self) -> Dict: | |
| """Get usage statistics""" | |
| total_tokens = len(self.tokens) | |
| active_tokens = sum(1 for token in self.tokens.values() if token.get('is_active', True)) | |
| total_usage = sum(token.get('current_uses', 0) for token in self.tokens.values()) | |
| # Model usage breakdown | |
| model_usage = {} | |
| for log in self.usage_logs: | |
| model = log['model'] | |
| model_usage[model] = model_usage.get(model, 0) + log['uses_consumed'] | |
| # Recent usage (last 24 hours) | |
| recent_cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=24) | |
| recent_usage = 0 | |
| for log in self.usage_logs: | |
| try: | |
| log_time = datetime.datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00')) | |
| if log_time > recent_cutoff: | |
| recent_usage += log['uses_consumed'] | |
| except: | |
| continue | |
| return { | |
| "total_tokens": total_tokens, | |
| "active_tokens": active_tokens, | |
| "total_usage": total_usage, | |
| "recent_24h_usage": recent_usage, | |
| "model_usage": model_usage, | |
| "total_requests": len(self.usage_logs) | |
| } | |
| def export_to_json(self) -> Dict: | |
| """Export all data to JSON format""" | |
| return { | |
| "export_info": { | |
| "exported_at": datetime.datetime.utcnow().isoformat(), | |
| "version": "1.0", | |
| "total_tokens": len(self.tokens), | |
| "total_usage_logs": len(self.usage_logs) | |
| }, | |
| "tokens": self.tokens, | |
| "usage_logs": self.usage_logs | |
| } | |
| def import_from_json(self, json_data: Dict) -> Dict: | |
| """Import data from JSON format""" | |
| try: | |
| imported_tokens = 0 | |
| imported_logs = 0 | |
| # Import tokens | |
| if "tokens" in json_data: | |
| for token_id, token_data in json_data["tokens"].items(): | |
| self.tokens[token_id] = token_data | |
| imported_tokens += 1 | |
| # Import usage logs | |
| if "usage_logs" in json_data: | |
| self.usage_logs.extend(json_data["usage_logs"]) | |
| imported_logs = len(json_data["usage_logs"]) | |
| # Keep only last 1000 logs | |
| if len(self.usage_logs) > 1000: | |
| self.usage_logs = self.usage_logs[-1000:] | |
| return { | |
| "tokens_imported": imported_tokens, | |
| "logs_imported": imported_logs, | |
| "import_timestamp": datetime.datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| raise Exception(f"Failed to import data: {str(e)}") | |
| def clear_all_data(self): | |
| """Clear all stored data""" | |
| self.tokens.clear() | |
| self.usage_logs.clear() | |
| self.admin_sessions.clear() | |
| # Global storage instance | |
| _storage_instance = None | |
| def get_storage() -> MemoryStorage: | |
| """Get the global storage instance""" | |
| global _storage_instance | |
| if _storage_instance is None: | |
| _storage_instance = MemoryStorage() | |
| return _storage_instance | |