Spaces:
Runtime error
Runtime error
| import os | |
| import logging | |
| import hashlib | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime | |
| import json | |
| def setup_logging(): | |
| """Setup logging configuration""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('app.log') | |
| ] | |
| ) | |
| def ensure_directory(path: str) -> bool: | |
| """Ensure directory exists, create if it doesn't""" | |
| try: | |
| os.makedirs(path, exist_ok=True) | |
| return True | |
| except Exception as e: | |
| logging.error(f"Failed to create directory {path}: {str(e)}") | |
| return False | |
| def generate_file_hash(content: str) -> str: | |
| """Generate SHA-256 hash for content""" | |
| return hashlib.sha256(content.encode()).hexdigest() | |
| def sanitize_filename(filename: str) -> str: | |
| """Sanitize filename for safe file system operations""" | |
| import re | |
| # Remove invalid characters | |
| filename = re.sub(r'[<>:"/\\|?*]', '_', filename) | |
| # Limit length | |
| if len(filename) > 255: | |
| name, ext = os.path.splitext(filename) | |
| filename = name[:255-len(ext)] + ext | |
| return filename | |
| def format_file_size(size_bytes: int) -> str: | |
| """Format file size in human readable format""" | |
| if size_bytes == 0: | |
| return "0B" | |
| size_names = ["B", "KB", "MB", "GB", "TB"] | |
| import math | |
| i = int(math.floor(math.log(size_bytes, 1024))) | |
| p = math.pow(1024, i) | |
| s = round(size_bytes / p, 2) | |
| return f"{s} {size_names[i]}" | |
| def validate_environment(): | |
| """Validate required environment variables""" | |
| required_vars = ['OPENAI_API_KEY'] | |
| missing_vars = [] | |
| for var in required_vars: | |
| if not os.getenv(var): | |
| missing_vars.append(var) | |
| if missing_vars: | |
| logging.warning(f"Missing environment variables: {', '.join(missing_vars)}") | |
| return False | |
| return True | |
| def save_json_file(data: Dict[Any, Any], filepath: str) -> bool: | |
| """Save data to JSON file""" | |
| try: | |
| ensure_directory(os.path.dirname(filepath)) | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| return True | |
| except Exception as e: | |
| logging.error(f"Failed to save JSON file {filepath}: {str(e)}") | |
| return False | |
| def load_json_file(filepath: str) -> Optional[Dict[Any, Any]]: | |
| """Load data from JSON file""" | |
| try: | |
| if not os.path.exists(filepath): | |
| return None | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logging.error(f"Failed to load JSON file {filepath}: {str(e)}") | |
| return None | |
| def cleanup_old_files(directory: str, max_age_hours: int = 24) -> int: | |
| """Clean up old files in directory""" | |
| try: | |
| if not os.path.exists(directory): | |
| return 0 | |
| now = datetime.now() | |
| removed_count = 0 | |
| for filename in os.listdir(directory): | |
| filepath = os.path.join(directory, filename) | |
| if os.path.isfile(filepath): | |
| file_age = now - datetime.fromtimestamp(os.path.getmtime(filepath)) | |
| if file_age.total_seconds() > max_age_hours * 3600: | |
| try: | |
| os.remove(filepath) | |
| removed_count += 1 | |
| logging.info(f"Removed old file: {filepath}") | |
| except Exception as e: | |
| logging.error(f"Failed to remove file {filepath}: {str(e)}") | |
| return removed_count | |
| except Exception as e: | |
| logging.error(f"Failed to cleanup directory {directory}: {str(e)}") | |
| return 0 | |
| def get_system_info() -> Dict[str, Any]: | |
| """Get basic system information""" | |
| import platform | |
| import psutil | |
| try: | |
| return { | |
| 'platform': platform.system(), | |
| 'python_version': platform.python_version(), | |
| 'cpu_count': os.cpu_count(), | |
| 'memory_gb': round(psutil.virtual_memory().total / (1024**3), 2), | |
| 'disk_usage': { | |
| 'total_gb': round(psutil.disk_usage('/').total / (1024**3), 2), | |
| 'free_gb': round(psutil.disk_usage('/').free / (1024**3), 2) | |
| } | |
| } | |
| except Exception as e: | |
| logging.error(f"Failed to get system info: {str(e)}") | |
| return {'error': str(e)} | |
| def measure_execution_time(func): | |
| """Decorator to measure function execution time""" | |
| import functools | |
| import time | |
| def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| result = func(*args, **kwargs) | |
| end_time = time.time() | |
| execution_time = end_time - start_time | |
| logging.info(f"{func.__name__} executed in {execution_time:.2f} seconds") | |
| return result | |
| return wrapper | |
| # Initialize logging when module is imported | |
| setup_logging() | |
| # Validate environment on import | |
| if not validate_environment(): | |
| logging.warning("Environment validation failed. Some features may not work properly.") | |