Spaces:
Runtime error
Runtime error
| import os | |
| from pathlib import Path | |
| from typing import Dict, Any | |
| import shutil | |
| from config import Config, FILE_TYPE_CONFIG | |
| def setup_directories(config: Config = None): | |
| """Setup required directories""" | |
| config = config or Config() | |
| directories = [ | |
| config.UPLOAD_DIR, | |
| config.VECTOR_STORE_DIR, | |
| config.TEMP_DIR, | |
| config.HF_CACHE_DIR | |
| ] | |
| for directory in directories: | |
| os.makedirs(directory, exist_ok=True) | |
| # Create .gitkeep for empty directories | |
| gitkeep_path = directory / ".gitkeep" | |
| if not gitkeep_path.exists(): | |
| gitkeep_path.touch() | |
| print("β Directory structure setup complete") | |
| def get_file_icon(file_extension: str) -> str: | |
| """Get icon for file type""" | |
| return FILE_TYPE_CONFIG.get(file_extension.lower(), {}).get('icon', 'π') | |
| def get_file_description(file_extension: str) -> str: | |
| """Get description for file type""" | |
| return FILE_TYPE_CONFIG.get(file_extension.lower(), {}).get('description', 'Unknown file type') | |
| def format_file_size(size_bytes: int) -> str: | |
| """Format file size in human readable format""" | |
| if size_bytes < 1024: | |
| return f"{size_bytes} B" | |
| elif size_bytes < 1024 * 1024: | |
| return f"{size_bytes / 1024:.1f} KB" | |
| elif size_bytes < 1024 * 1024 * 1024: | |
| return f"{size_bytes / (1024 * 1024):.1f} MB" | |
| else: | |
| return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" | |
| def clean_filename(filename: str) -> str: | |
| """Clean filename for safe storage""" | |
| import re | |
| # Remove or replace unsafe characters | |
| filename = re.sub(r'[^\w\-_\.]', '_', filename) | |
| # Remove multiple underscores | |
| filename = re.sub(r'_+', '_', filename) | |
| # Remove leading/trailing underscores | |
| filename = filename.strip('_') | |
| return filename | |
| def get_safe_filepath(directory: Path, filename: str) -> Path: | |
| """Get safe filepath avoiding conflicts""" | |
| safe_filename = clean_filename(filename) | |
| filepath = directory / safe_filename | |
| # Handle duplicates | |
| counter = 1 | |
| base_name = filepath.stem | |
| extension = filepath.suffix | |
| while filepath.exists(): | |
| new_name = f"{base_name}_{counter}{extension}" | |
| filepath = directory / new_name | |
| counter += 1 | |
| return filepath | |
| def validate_file_type(filename: str, allowed_extensions: set = None) -> bool: | |
| """Validate if file type is supported""" | |
| config = Config() | |
| allowed = allowed_extensions or config.ALLOWED_EXTENSIONS | |
| extension = Path(filename).suffix.lower() | |
| return extension in allowed | |
| def estimate_processing_time(file_size: int, file_type: str) -> str: | |
| """Estimate processing time based on file size and type""" | |
| # Simple heuristic estimates in seconds | |
| base_times = { | |
| '.txt': 0.1, | |
| '.csv': 0.2, | |
| '.pdf': 0.5, | |
| '.docx': 0.3, | |
| '.jpg': 2.0, # OCR is slower | |
| '.jpeg': 2.0, | |
| '.png': 2.0, | |
| '.db': 0.5 | |
| } | |
| base_time = base_times.get(file_type.lower(), 1.0) | |
| # Scale by file size (MB) | |
| size_mb = file_size / (1024 * 1024) | |
| estimated_seconds = base_time * max(1, size_mb) | |
| if estimated_seconds < 5: | |
| return "a few seconds" | |
| elif estimated_seconds < 30: | |
| return "less than 30 seconds" | |
| elif estimated_seconds < 60: | |
| return "about a minute" | |
| else: | |
| return f"about {int(estimated_seconds / 60)} minutes" | |
| def cleanup_temp_files(temp_dir: Path, max_age_hours: int = 24): | |
| """Clean up temporary files older than specified age""" | |
| import time | |
| if not temp_dir.exists(): | |
| return | |
| current_time = time.time() | |
| max_age_seconds = max_age_hours * 3600 | |
| cleaned_count = 0 | |
| for file_path in temp_dir.iterdir(): | |
| if file_path.is_file(): | |
| file_age = current_time - file_path.stat().st_mtime | |
| if file_age > max_age_seconds: | |
| try: | |
| file_path.unlink() | |
| cleaned_count += 1 | |
| except Exception as e: | |
| print(f"Warning: Could not delete {file_path}: {e}") | |
| if cleaned_count > 0: | |
| print(f"π§Ή Cleaned up {cleaned_count} temporary files") | |
| def get_system_info() -> Dict[str, Any]: | |
| """Get system information for debugging""" | |
| import platform | |
| import psutil | |
| import torch | |
| info = { | |
| 'platform': platform.platform(), | |
| 'python_version': platform.python_version(), | |
| 'cpu_count': os.cpu_count(), | |
| 'memory_gb': round(psutil.virtual_memory().total / (1024**3), 2), | |
| 'torch_version': torch.__version__, | |
| 'cuda_available': torch.cuda.is_available(), | |
| } | |
| if torch.cuda.is_available(): | |
| info['cuda_version'] = torch.version.cuda | |
| info['gpu_count'] = torch.cuda.device_count() | |
| info['gpu_name'] = torch.cuda.get_device_name(0) if torch.cuda.device_count() > 0 else None | |
| return info | |
| def create_sample_files(sample_dir: Path): | |
| """Create sample files for testing""" | |
| sample_dir.mkdir(exist_ok=True) | |
| # Create sample text file | |
| text_content = """ | |
| Smart RAG API - Sample Document | |
| This is a sample text document for testing the Smart RAG API system. | |
| Key Features: | |
| - Multi-format document processing | |
| - Vector-based search using FAISS | |
| - Free Hugging Face models | |
| - OCR support for images | |
| - RESTful API interface | |
| The system can process various file formats including PDF, Word documents, | |
| plain text, images with OCR, CSV data, and SQLite databases. | |
| Example Questions: | |
| 1. What are the key features of this system? | |
| 2. Which file formats are supported? | |
| 3. What models does it use? | |
| This document serves as test data to verify that the document processing | |
| and question-answering pipeline works correctly. | |
| """ | |
| with open(sample_dir / "sample.txt", "w") as f: | |
| f.write(text_content) | |
| # Create sample CSV | |
| csv_content = """Name,Age,City,Occupation | |
| John Doe,30,New York,Engineer | |
| Jane Smith,25,London,Designer | |
| Bob Johnson,35,Tokyo,Manager | |
| Alice Brown,28,Paris,Developer | |
| Charlie Wilson,32,Berlin,Analyst | |
| """ | |
| with open(sample_dir / "sample.csv", "w") as f: | |
| f.write(csv_content) | |
| print(f"β Sample files created in {sample_dir}") | |
| def log_performance(operation: str, duration: float, details: Dict[str, Any] = None): | |
| """Log performance metrics""" | |
| print(f"β±οΈ {operation}: {duration:.2f}s") | |
| if details: | |
| for key, value in details.items(): | |
| print(f" {key}: {value}") | |
| def check_dependencies(): | |
| """Check if all required dependencies are available""" | |
| dependencies = { | |
| 'torch': 'PyTorch', | |
| 'transformers': 'Hugging Face Transformers', | |
| 'sentence_transformers': 'Sentence Transformers', | |
| 'faiss': 'FAISS', | |
| 'gradio': 'Gradio', | |
| 'pytesseract': 'Tesseract OCR', | |
| 'PIL': 'Pillow', | |
| 'pandas': 'Pandas', | |
| 'docx': 'python-docx', | |
| 'pdfplumber': 'pdfplumber' | |
| } | |
| missing = [] | |
| for module, name in dependencies.items(): | |
| try: | |
| __import__(module) | |
| except ImportError: | |
| missing.append(name) | |
| if missing: | |
| print(f"β Missing dependencies: {', '.join(missing)}") | |
| return False | |
| else: | |
| print("β All dependencies are available") | |
| return True | |
| def format_context_for_display(contexts: list, max_length: int = 200) -> list: | |
| """Format context chunks for display in UI""" | |
| formatted_contexts = [] | |
| for i, context in enumerate(contexts): | |
| # Truncate long contexts | |
| if len(context) > max_length: | |
| truncated = context[:max_length] + "..." | |
| else: | |
| truncated = context | |
| # Add context number | |
| formatted = f"**[Context {i+1}]**\n{truncated}" | |
| formatted_contexts.append(formatted) | |
| return formatted_contexts | |
| def extract_keywords(text: str, max_keywords: int = 10) -> list: | |
| """Extract key terms from text (simple implementation)""" | |
| import re | |
| from collections import Counter | |
| # Simple keyword extraction | |
| # Remove punctuation and convert to lowercase | |
| words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower()) | |
| # Common stop words to filter out | |
| stop_words = { | |
| 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', | |
| 'by', 'from', 'up', 'about', 'into', 'through', 'during', 'before', | |
| 'after', 'above', 'below', 'between', 'among', 'is', 'are', 'was', | |
| 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', | |
| 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', | |
| 'shall', 'can', 'this', 'that', 'these', 'those', 'i', 'me', 'my', | |
| 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours' | |
| } | |
| # Filter out stop words and count frequency | |
| filtered_words = [word for word in words if word not in stop_words] | |
| word_counts = Counter(filtered_words) | |
| # Return top keywords | |
| keywords = [word for word, count in word_counts.most_common(max_keywords)] | |
| return keywords | |
| def create_gradio_theme(): | |
| """Create custom Gradio theme""" | |
| return { | |
| 'primary_hue': 'blue', | |
| 'secondary_hue': 'gray', | |
| 'neutral_hue': 'gray', | |
| 'spacing_size': 'md', | |
| 'radius_size': 'md' | |
| } |