| | import os |
| | import sys |
| | import logging |
| | import shutil |
| | from datetime import datetime |
| | from typing import List, Dict, Any |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | def sanitize_filename(filename: str) -> str: |
| | """Sanitize a filename by removing invalid characters.""" |
| | |
| | invalid_chars = '<>:"/\\|?*' |
| | for char in invalid_chars: |
| | filename = filename.replace(char, '_') |
| | |
| | if len(filename) > 200: |
| | base, ext = os.path.splitext(filename) |
| | filename = base[:195] + ext |
| | return filename |
| |
|
| | def get_document_path(filename: str) -> str: |
| | """Get the path to store a document.""" |
| | try: |
| | |
| | docs_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'documents') |
| | |
| | |
| | os.makedirs(docs_dir, exist_ok=True) |
| | |
| | |
| | try: |
| | |
| | test_file = os.path.join(docs_dir, '.test_write_access') |
| | with open(test_file, 'w') as f: |
| | f.write('test') |
| | os.remove(test_file) |
| | except Exception as e: |
| | logger.warning(f"Document directory may not be writable: {e}") |
| | |
| | docs_dir = '/tmp/documents' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'documents') |
| | os.makedirs(docs_dir, exist_ok=True) |
| | |
| | |
| | filename = sanitize_filename(filename) |
| | |
| | |
| | timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
| | base, ext = os.path.splitext(filename) |
| | unique_filename = f"{base}_{timestamp}{ext}" |
| | |
| | filepath = os.path.join(docs_dir, unique_filename) |
| | logger.info(f"Document will be stored at: {filepath}") |
| | return filepath |
| | except Exception as e: |
| | logger.error(f"Error getting document path: {e}") |
| | |
| | fallback_dir = '/tmp' if os.name != 'nt' else os.environ.get('TEMP', 'C:\\Temp') |
| | os.makedirs(fallback_dir, exist_ok=True) |
| | return os.path.join(fallback_dir, f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}") |
| |
|
| | def copy_uploaded_file(source_path: str, destination_path: str) -> bool: |
| | """Copy an uploaded file with proper error handling.""" |
| | try: |
| | shutil.copy2(source_path, destination_path) |
| | logger.info(f"File copied from {source_path} to {destination_path}") |
| | return True |
| | except Exception as e: |
| | logger.error(f"Error copying file: {e}") |
| | |
| | try: |
| | with open(source_path, 'rb') as src, open(destination_path, 'wb') as dst: |
| | dst.write(src.read()) |
| | logger.info(f"File copied using alternate method") |
| | return True |
| | except Exception as e2: |
| | logger.error(f"All methods of copying file failed: {e2}") |
| | return False |
| |
|
| | def format_sources(sources: List[Dict[str, Any]]) -> str: |
| | """Format source documents for display.""" |
| | try: |
| | if not sources: |
| | return "No sources found." |
| | |
| | formatted = [] |
| | for i, source in enumerate(sources, 1): |
| | source_str = f"{i}. {source.get('file_name', 'Unknown Source')} " |
| | if source.get('page'): |
| | source_str += f"(Page {source['page']}) " |
| | formatted.append(source_str) |
| | |
| | return "\n".join(formatted) |
| | except Exception as e: |
| | logger.error(f"Error formatting sources: {e}") |
| | return "Error displaying sources." |
| |
|
| | def save_conversation(question: str, answer: str, sources: List[Dict[str, Any]]) -> str: |
| | """Save a conversation to a file.""" |
| | try: |
| | |
| | conv_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'conversations') |
| | try: |
| | os.makedirs(conv_dir, exist_ok=True) |
| | except Exception as e: |
| | logger.warning(f"Could not create conversation directory: {e}") |
| | |
| | conv_dir = '/tmp/conversations' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'conversations') |
| | os.makedirs(conv_dir, exist_ok=True) |
| | |
| | |
| | timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
| | question_slug = "_".join((question or "empty_question").split()[:5]).lower() |
| | question_slug = sanitize_filename(question_slug) |
| | filename = f"{timestamp}_{question_slug}.txt" |
| | |
| | |
| | formatted_sources = format_sources(sources) |
| | content = f"Question: {question}\n\nAnswer: {answer}\n\nSources:\n{formatted_sources}\n" |
| | |
| | |
| | filepath = os.path.join(conv_dir, filename) |
| | with open(filepath, 'w') as f: |
| | f.write(content) |
| | |
| | logger.info(f"Conversation saved to {filepath}") |
| | return filepath |
| | except Exception as e: |
| | logger.error(f"Error saving conversation: {e}") |
| | return "" |