File size: 9,375 Bytes
1e7be54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import os
from pathlib import Path
from typing import Dict, Any
import shutil

from config import Config, FILE_TYPE_CONFIG

def setup_directories(config: Config = None):
    """Setup required directories"""
    config = config or Config()
    
    directories = [
        config.UPLOAD_DIR,
        config.VECTOR_STORE_DIR,
        config.TEMP_DIR,
        config.HF_CACHE_DIR
    ]
    
    for directory in directories:
        os.makedirs(directory, exist_ok=True)
        
        # Create .gitkeep for empty directories
        gitkeep_path = directory / ".gitkeep"
        if not gitkeep_path.exists():
            gitkeep_path.touch()
    
    print("βœ… Directory structure setup complete")

def get_file_icon(file_extension: str) -> str:
    """Get icon for file type"""
    return FILE_TYPE_CONFIG.get(file_extension.lower(), {}).get('icon', 'πŸ“„')

def get_file_description(file_extension: str) -> str:
    """Get description for file type"""
    return FILE_TYPE_CONFIG.get(file_extension.lower(), {}).get('description', 'Unknown file type')

def format_file_size(size_bytes: int) -> str:
    """Format file size in human readable format"""
    if size_bytes < 1024:
        return f"{size_bytes} B"
    elif size_bytes < 1024 * 1024:
        return f"{size_bytes / 1024:.1f} KB"
    elif size_bytes < 1024 * 1024 * 1024:
        return f"{size_bytes / (1024 * 1024):.1f} MB"
    else:
        return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"

def clean_filename(filename: str) -> str:
    """Clean filename for safe storage"""
    import re
    # Remove or replace unsafe characters
    filename = re.sub(r'[^\w\-_\.]', '_', filename)
    # Remove multiple underscores
    filename = re.sub(r'_+', '_', filename)
    # Remove leading/trailing underscores
    filename = filename.strip('_')
    
    return filename

def get_safe_filepath(directory: Path, filename: str) -> Path:
    """Get safe filepath avoiding conflicts"""
    safe_filename = clean_filename(filename)
    filepath = directory / safe_filename
    
    # Handle duplicates
    counter = 1
    base_name = filepath.stem
    extension = filepath.suffix
    
    while filepath.exists():
        new_name = f"{base_name}_{counter}{extension}"
        filepath = directory / new_name
        counter += 1
    
    return filepath

def validate_file_type(filename: str, allowed_extensions: set = None) -> bool:
    """Validate if file type is supported"""
    config = Config()
    allowed = allowed_extensions or config.ALLOWED_EXTENSIONS
    
    extension = Path(filename).suffix.lower()
    return extension in allowed

def estimate_processing_time(file_size: int, file_type: str) -> str:
    """Estimate processing time based on file size and type"""
    # Simple heuristic estimates in seconds
    base_times = {
        '.txt': 0.1,
        '.csv': 0.2,
        '.pdf': 0.5,
        '.docx': 0.3,
        '.jpg': 2.0,  # OCR is slower
        '.jpeg': 2.0,
        '.png': 2.0,
        '.db': 0.5
    }
    
    base_time = base_times.get(file_type.lower(), 1.0)
    
    # Scale by file size (MB)
    size_mb = file_size / (1024 * 1024)
    estimated_seconds = base_time * max(1, size_mb)
    
    if estimated_seconds < 5:
        return "a few seconds"
    elif estimated_seconds < 30:
        return "less than 30 seconds"
    elif estimated_seconds < 60:
        return "about a minute"
    else:
        return f"about {int(estimated_seconds / 60)} minutes"

def cleanup_temp_files(temp_dir: Path, max_age_hours: int = 24):
    """Clean up temporary files older than specified age"""
    import time
    
    if not temp_dir.exists():
        return
    
    current_time = time.time()
    max_age_seconds = max_age_hours * 3600
    
    cleaned_count = 0
    for file_path in temp_dir.iterdir():
        if file_path.is_file():
            file_age = current_time - file_path.stat().st_mtime
            if file_age > max_age_seconds:
                try:
                    file_path.unlink()
                    cleaned_count += 1
                except Exception as e:
                    print(f"Warning: Could not delete {file_path}: {e}")
    
    if cleaned_count > 0:
        print(f"🧹 Cleaned up {cleaned_count} temporary files")

def get_system_info() -> Dict[str, Any]:
    """Get system information for debugging"""
    import platform
    import psutil
    import torch
    
    info = {
        'platform': platform.platform(),
        'python_version': platform.python_version(),
        'cpu_count': os.cpu_count(),
        'memory_gb': round(psutil.virtual_memory().total / (1024**3), 2),
        'torch_version': torch.__version__,
        'cuda_available': torch.cuda.is_available(),
    }
    
    if torch.cuda.is_available():
        info['cuda_version'] = torch.version.cuda
        info['gpu_count'] = torch.cuda.device_count()
        info['gpu_name'] = torch.cuda.get_device_name(0) if torch.cuda.device_count() > 0 else None
    
    return info

def create_sample_files(sample_dir: Path):
    """Create sample files for testing"""
    sample_dir.mkdir(exist_ok=True)
    
    # Create sample text file
    text_content = """
Smart RAG API - Sample Document

This is a sample text document for testing the Smart RAG API system.

Key Features:
- Multi-format document processing
- Vector-based search using FAISS
- Free Hugging Face models
- OCR support for images
- RESTful API interface

The system can process various file formats including PDF, Word documents,
plain text, images with OCR, CSV data, and SQLite databases.

Example Questions:
1. What are the key features of this system?
2. Which file formats are supported?
3. What models does it use?

This document serves as test data to verify that the document processing
and question-answering pipeline works correctly.
"""
    
    with open(sample_dir / "sample.txt", "w") as f:
        f.write(text_content)
    
    # Create sample CSV
    csv_content = """Name,Age,City,Occupation
John Doe,30,New York,Engineer
Jane Smith,25,London,Designer
Bob Johnson,35,Tokyo,Manager
Alice Brown,28,Paris,Developer
Charlie Wilson,32,Berlin,Analyst
"""
    
    with open(sample_dir / "sample.csv", "w") as f:
        f.write(csv_content)
    
    print(f"βœ… Sample files created in {sample_dir}")

def log_performance(operation: str, duration: float, details: Dict[str, Any] = None):
    """Log performance metrics"""
    print(f"⏱️ {operation}: {duration:.2f}s")
    if details:
        for key, value in details.items():
            print(f"   {key}: {value}")

def check_dependencies():
    """Check if all required dependencies are available"""
    dependencies = {
        'torch': 'PyTorch',
        'transformers': 'Hugging Face Transformers',
        'sentence_transformers': 'Sentence Transformers',
        'faiss': 'FAISS',
        'gradio': 'Gradio',
        'pytesseract': 'Tesseract OCR',
        'PIL': 'Pillow',
        'pandas': 'Pandas',
        'docx': 'python-docx',
        'pdfplumber': 'pdfplumber'
    }
    
    missing = []
    for module, name in dependencies.items():
        try:
            __import__(module)
        except ImportError:
            missing.append(name)
    
    if missing:
        print(f"❌ Missing dependencies: {', '.join(missing)}")
        return False
    else:
        print("βœ… All dependencies are available")
        return True

def format_context_for_display(contexts: list, max_length: int = 200) -> list:
    """Format context chunks for display in UI"""
    formatted_contexts = []
    
    for i, context in enumerate(contexts):
        # Truncate long contexts
        if len(context) > max_length:
            truncated = context[:max_length] + "..."
        else:
            truncated = context
        
        # Add context number
        formatted = f"**[Context {i+1}]**\n{truncated}"
        formatted_contexts.append(formatted)
    
    return formatted_contexts

def extract_keywords(text: str, max_keywords: int = 10) -> list:
    """Extract key terms from text (simple implementation)"""
    import re
    from collections import Counter
    
    # Simple keyword extraction
    # Remove punctuation and convert to lowercase
    words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower())
    
    # Common stop words to filter out
    stop_words = {
        'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with',
        'by', 'from', 'up', 'about', 'into', 'through', 'during', 'before',
        'after', 'above', 'below', 'between', 'among', 'is', 'are', 'was',
        'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does',
        'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must',
        'shall', 'can', 'this', 'that', 'these', 'those', 'i', 'me', 'my',
        'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours'
    }
    
    # Filter out stop words and count frequency
    filtered_words = [word for word in words if word not in stop_words]
    word_counts = Counter(filtered_words)
    
    # Return top keywords
    keywords = [word for word, count in word_counts.most_common(max_keywords)]
    return keywords

def create_gradio_theme():
    """Create custom Gradio theme"""
    return {
        'primary_hue': 'blue',
        'secondary_hue': 'gray',
        'neutral_hue': 'gray',
        'spacing_size': 'md',
        'radius_size': 'md'
    }