Spaces:
Build error
Build error
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Request, Query, status | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| import os | |
| import logging | |
| import uuid | |
| from datetime import datetime | |
| from pydantic import BaseModel, Field | |
| from typing import Optional, List, Any | |
| from urllib.parse import urlparse | |
| import shutil | |
| # from app.wrapper.llm_wrapper import * | |
| from app.crud.process_file import load_file_with_markitdown, process_uploaded_file | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def is_url(path: str) -> bool: | |
| """ | |
| Determines if the given path is a URL. | |
| Args: | |
| path (str): The path or URL to check. | |
| Returns: | |
| bool: True if it's a URL, False otherwise. | |
| """ | |
| try: | |
| result = urlparse(path) | |
| return all([result.scheme, result.netloc]) | |
| except Exception: | |
| return False | |
| file_router = APIRouter() | |
| # # Configure logging to file with date-based filenames | |
| # log_filename = f"document_logs_{datetime.now().strftime('%Y-%m-%d')}.txt" | |
| # file_handler = logging.FileHandler(log_filename) | |
| # formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
| # file_handler.setFormatter(formatter) | |
| # # Create a logger for document processing | |
| # doc_logger = logging.getLogger('document_logger') | |
| # doc_logger.setLevel(logging.INFO) | |
| # doc_logger.addHandler(file_handler) | |
| # Configure logging to a single file for indexed links | |
| INDEXED_LINKS_LOG = "indexed_links.txt" | |
| indexed_links_handler = logging.FileHandler(INDEXED_LINKS_LOG) | |
| indexed_links_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
| indexed_links_handler.setFormatter(indexed_links_formatter) | |
| # Create a logger for indexed links | |
| indexed_links_logger = logging.getLogger('indexed_links_logger') | |
| indexed_links_logger.setLevel(logging.INFO) | |
| indexed_links_logger.addHandler(indexed_links_handler) | |
| # Ensure that the general logger does not propagate to avoid duplicate logs | |
| indexed_links_logger.propagate = False | |
| # Also configure the general logger if not already configured | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| from app.search.rag_pipeline import RAGSystem | |
| from sentence_transformers import SentenceTransformer | |
| # Path to the indexed links log file | |
| INDEXED_LINKS_LOG = "indexed_links.txt" | |
| async def get_indexed_links( | |
| limit: Optional[int] = Query( | |
| None, | |
| description="Maximum number of indexed links to return. If not specified, all links are returned." | |
| ) | |
| ) -> dict: | |
| """ | |
| Retrieve a list of all indexed URLs from the single log file. | |
| Args: | |
| limit (int, optional): Maximum number of indexed links to return. | |
| Returns: | |
| dict: A dictionary containing the list of indexed URLs. | |
| """ | |
| try: | |
| if not os.path.exists(INDEXED_LINKS_LOG): | |
| raise HTTPException(status_code=404, detail="No indexed links found.") | |
| indexed_links = [] | |
| with open(INDEXED_LINKS_LOG, "r", encoding="utf-8") as log_file: | |
| for line in log_file: | |
| # Assuming each log entry is in the format: 'timestamp - level - URL' | |
| parts = line.strip().split(" - ") | |
| if len(parts) >= 3: | |
| url = parts[2] | |
| indexed_links.append(url) | |
| if limit is not None: | |
| if limit < 0: | |
| raise HTTPException(status_code=400, detail="Limit must be a positive integer.") | |
| indexed_links = indexed_links[:limit] | |
| return {"indexed_links": indexed_links} | |
| except HTTPException as he: | |
| raise he # Re-raise HTTP exceptions to be handled by FastAPI | |
| except Exception as e: | |
| # Log the exception details for debugging | |
| logger.exception("Error retrieving indexed links") | |
| raise HTTPException(status_code=500, detail="Internal server error while retrieving indexed links.") | |
| async def load_file_with_markdown(request: Request, filepaths: List[str]): | |
| try: | |
| # Ensure RAG system is initialized | |
| try: | |
| rag_system = request.app.state.rag_system | |
| if rag_system is None: | |
| raise AttributeError("RAG system is not initialized in app state") | |
| except AttributeError: | |
| logger.error("RAG system is not initialized in app state") | |
| raise HTTPException(status_code=500, detail="RAG system not initialized in app state") | |
| processed_files = [] | |
| pages = [] | |
| # Process each file path or URL | |
| for path in filepaths: | |
| if is_url(path): | |
| logger.info(f"Processing URL: {path}") | |
| try: | |
| # Generate a unique UUID for the document | |
| doc_id = str(uuid.uuid4()) | |
| # Process the URL | |
| document = await process_uploaded_file(id=doc_id, file_path=path, rag_system=rag_system) | |
| # Append the document details to pages | |
| pages.append({ | |
| "metadata": {"title": document.title}, | |
| "page_content": document.text_content, | |
| }) | |
| logger.info(f"Successfully processed URL: {path} with ID: {doc_id}") | |
| # Log the ID and a 100-character snippet of the document | |
| snippet = document.text_content[:100].replace('\n', ' ').replace('\r', ' ') | |
| # Ensure 'doc_logger' is defined; if not, use 'logger' or define 'doc_logger' | |
| # doc_logger.info(f"ID: {doc_id}_{path}, Snippet: {snippet}") | |
| # Log the indexed URL to the single log file | |
| indexed_links_logger.info(f"{doc_id}_{path}") | |
| except Exception as e: | |
| logger.error(f"Error processing URL {path}: {str(e)}") | |
| processed_files.append({"path": path, "status": "error", "message": str(e)}) | |
| else: | |
| logger.info(f"Processing local file: {path}") | |
| if os.path.exists(path): | |
| try: | |
| # Generate a unique UUID for the document | |
| doc_id = str(uuid.uuid4()) | |
| # Process the local file | |
| document = await process_uploaded_file(id=doc_id, file_path=path, rag_system=rag_system) | |
| # Append the document details to pages | |
| pages.append({ | |
| "metadata": {"title": document.title}, | |
| "page_content": document.text_content, | |
| }) | |
| logger.info(f"Successfully processed file: {path} with ID: {doc_id}") | |
| # Log the ID and a 100-character snippet of the document | |
| snippet = document.text_content[:100].replace('\n', ' ').replace('\r', ' ') | |
| # Ensure 'doc_logger' is defined; if not, use 'logger' or define 'doc_logger' | |
| logger.info(f"ID: {doc_id}, Snippet: {snippet}") | |
| except Exception as e: | |
| logger.error(f"Error processing file {path}: {str(e)}") | |
| processed_files.append({"path": path, "status": "error", "message": str(e)}) | |
| else: | |
| logger.error(f"File path does not exist: {path}") | |
| processed_files.append({"path": path, "status": "not found"}) | |
| # Get total tokens from RAG system | |
| total_tokens = rag_system.get_total_tokens() if hasattr(rag_system, "get_total_tokens") else 0 | |
| return { | |
| "message": "File processing completed", | |
| "total_tokens": total_tokens, | |
| "document_count": len(filepaths), | |
| "pages": pages, | |
| "errors": processed_files, # Include details about files that couldn't be processed | |
| } | |
| except Exception as e: | |
| logger.exception("Unexpected error during file processing") | |
| raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
| async def load_file_with_markdown_function(filepaths: List[str], | |
| rag_system: Any): | |
| try: | |
| # Ensure RAG system is initialized | |
| try: | |
| rag_system = rag_system | |
| except AttributeError: | |
| logger.error("RAG system is not initialized in app state") | |
| raise HTTPException(status_code=500, detail="RAG system not initialized in app state") | |
| processed_files = [] | |
| pages = [] | |
| # Process each file path or URL | |
| for path in filepaths: | |
| if is_url(path): | |
| logger.info(f"Processing URL: {path}") | |
| try: | |
| # Generate a unique UUID for the document | |
| doc_id = str(uuid.uuid4()) | |
| # Process the URL | |
| document = await process_uploaded_file(id=doc_id, file_path=path, rag_system=rag_system) | |
| # Append the document details to pages | |
| pages.append({ | |
| "metadata": {"title": document.title}, | |
| "page_content": document.text_content, | |
| }) | |
| logger.info(f"Successfully processed URL: {path} with ID: {doc_id}") | |
| # Log the ID and a 100-character snippet of the document | |
| snippet = document.text_content[:100].replace('\n', ' ').replace('\r', ' ') | |
| # Ensure 'doc_logger' is defined; if not, use 'logger' or define 'doc_logger' | |
| # doc_logger(f"ID: {doc_id}, Snippet: {snippet}") | |
| indexed_links_logger.info(f"{doc_id}_{path}") | |
| except Exception as e: | |
| logger.error(f"Error processing URL {path}: {str(e)}") | |
| processed_files.append({"path": path, "status": "error", "message": str(e)}) | |
| else: | |
| logger.info(f"Processing local file: {path}") | |
| if os.path.exists(path): | |
| try: | |
| # Generate a unique UUID for the document | |
| doc_id = str(uuid.uuid4()) | |
| # Process the local file | |
| document = await process_uploaded_file(id=doc_id, file_path=path, rag_system=rag_system) | |
| # Append the document details to pages | |
| pages.append({ | |
| "metadata": {"title": document.title}, | |
| "page_content": document.text_content, | |
| }) | |
| logger.info(f"Successfully processed file: {path} with ID: {doc_id}") | |
| # Log the ID and a 100-character snippet of the document | |
| snippet = document.text_content[:100].replace('\n', ' ').replace('\r', ' ') | |
| # Ensure 'doc_logger' is defined; if not, use 'logger' or define 'doc_logger' | |
| logger.info(f"ID: {doc_id}, Snippet: {snippet}") | |
| except Exception as e: | |
| logger.error(f"Error processing file {path}: {str(e)}") | |
| processed_files.append({"path": path, "status": "error", "message": str(e)}) | |
| else: | |
| logger.error(f"File path does not exist: {path}") | |
| processed_files.append({"path": path, "status": "not found"}) | |
| # Get total tokens from RAG system | |
| total_tokens = rag_system.get_total_tokens() if hasattr(rag_system, "get_total_tokens") else 0 | |
| return { | |
| "message": "File processing completed", | |
| "total_tokens": total_tokens, | |
| "document_count": len(filepaths), | |
| "pages": pages, | |
| "errors": processed_files, # Include details about files that couldn't be processed | |
| } | |
| except Exception as e: | |
| logger.exception("Unexpected error during file processing") | |
| raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
| async def document_exists(request: Request, doc_id: str): | |
| try: | |
| rag_system = request.app.state.rag_system | |
| except AttributeError: | |
| logger.error("RAG system is not initialized in app state") | |
| raise HTTPException(status_code=500, detail="RAG system not initialized in app state") | |
| exists = doc_id in rag_system.doc_ids | |
| return {"document_id": doc_id, "exists": exists} | |
| async def delete_document(request: Request, doc_id: str): | |
| try: | |
| rag_system = request.app.state.rag_system | |
| except AttributeError: | |
| logger.error("RAG system is not initialized in app state") | |
| raise HTTPException(status_code=500, detail="RAG system not initialized in app state") | |
| try: | |
| rag_system.delete_document(doc_id) | |
| logger.info(f"Deleted document with ID: {doc_id}") | |
| return {"message": f"Document with ID {doc_id} has been deleted."} | |
| except Exception as e: | |
| logger.error(f"Error deleting document with ID {doc_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to delete document: {str(e)}") | |