Spaces:
Running
Running
| import os | |
| import tempfile | |
| import shutil | |
| from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request, Response | |
| from typing import Dict, List | |
| from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader, PDFLoader | |
| from aimakerspace.openai_utils.embedding import EmbeddingModel | |
| from aimakerspace.openai_utils.chatmodel import ChatOpenAI | |
| from aimakerspace.qdrant_vectordb import QdrantVectorDatabase | |
| from api.config import QDRANT_HOST, QDRANT_PORT, QDRANT_GRPC_PORT, QDRANT_PREFER_GRPC, QDRANT_COLLECTION, QDRANT_IN_MEMORY | |
| from api.models.pydantic_models import DocumentSummaryRequest, DocumentSummaryResponse | |
| from api.services.pipeline import RetrievalAugmentedQAPipeline | |
| from api.utils.user import get_or_create_user_id | |
| from api.utils.prompts import get_user_prompts | |
| # Storage for user sessions | |
| user_sessions = {} | |
| # Initialize text splitter | |
| text_splitter = CharacterTextSplitter() | |
| router = APIRouter() | |
| async def upload_file( | |
| file: UploadFile = File(...), | |
| session_id: str = Form(...), | |
| request: Request = None, | |
| response: Response = None | |
| ): | |
| """ | |
| Upload and process a document file | |
| Args: | |
| file: Uploaded file | |
| session_id: Session ID for this document | |
| request: FastAPI request object | |
| response: FastAPI response object | |
| Returns: | |
| Dictionary with file processing results | |
| """ | |
| if file.content_type not in ["text/plain", "application/pdf"]: | |
| raise HTTPException(status_code=400, detail="Only text and PDF files are supported") | |
| # Get or create user ID | |
| user_id = get_or_create_user_id(request, response) if request and response else None | |
| # Create a temporary file | |
| suffix = f".{file.filename.split('.')[-1]}" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: | |
| # Copy the uploaded file content to the temporary file | |
| file_content = await file.read() | |
| temp_file.write(file_content) | |
| temp_file.flush() | |
| # Create appropriate loader | |
| if file.filename.lower().endswith('.pdf'): | |
| loader = PDFLoader(temp_file.name) | |
| else: | |
| loader = TextFileLoader(temp_file.name) | |
| try: | |
| # Load and process the documents | |
| documents = loader.load_documents() | |
| texts = text_splitter.split_texts(documents) | |
| # Create vector database | |
| vector_db = QdrantVectorDatabase( | |
| collection_name=f"{QDRANT_COLLECTION}_{session_id}", | |
| host=QDRANT_HOST, | |
| port=QDRANT_PORT, | |
| grpc_port=QDRANT_GRPC_PORT, | |
| prefer_grpc=QDRANT_PREFER_GRPC, | |
| in_memory=QDRANT_IN_MEMORY | |
| ) | |
| vector_db = await vector_db.abuild_from_list(texts) | |
| # Create chat model | |
| chat_openai = ChatOpenAI() | |
| # Get user prompts | |
| user_prompt_templates = get_user_prompts(user_id) if user_id else { | |
| "system_template": DEFAULT_SYSTEM_TEMPLATE, | |
| "user_template": DEFAULT_USER_TEMPLATE | |
| } | |
| # Create the retrieval pipeline with user-specific prompts | |
| retrieval_pipeline = RetrievalAugmentedQAPipeline( | |
| vector_db_retriever=vector_db, | |
| llm=chat_openai, | |
| system_template=user_prompt_templates["system_template"], | |
| user_template=user_prompt_templates["user_template"] | |
| ) | |
| # Store the retrieval pipeline in the user session | |
| user_sessions[session_id] = retrieval_pipeline | |
| # Generate document description and suggested questions | |
| doc_content = "\n".join(texts[:5]) # Use first few chunks for summary | |
| description_prompt = f""" | |
| Please provide a brief description of this document in 2-3 sentences: | |
| {doc_content} | |
| """ | |
| questions_prompt = f""" | |
| Based on this document content, please suggest 3 specific questions that would be informative to ask: | |
| {doc_content} | |
| Format your response as a JSON array with 3 question strings. | |
| """ | |
| # Get document description | |
| description_response = await chat_openai.acreate_single_response(description_prompt) | |
| document_description = description_response.strip() | |
| # Get suggested questions | |
| questions_response = await chat_openai.acreate_single_response(questions_prompt) | |
| # Try to parse the questions as JSON, or extract them as best as possible | |
| try: | |
| import json | |
| suggested_questions = json.loads(questions_response) | |
| except: | |
| # Extract questions with a fallback method | |
| import re | |
| questions = re.findall(r'["\']([^"\']+)["\']', questions_response) | |
| if not questions or len(questions) < 3: | |
| questions = [q.strip() for q in questions_response.split("\n") if "?" in q] | |
| if not questions or len(questions) < 3: | |
| questions = ["What is the main topic of this document?", | |
| "What are the key points discussed in the document?", | |
| "How can I apply the information in this document?"] | |
| suggested_questions = questions[:3] | |
| result = { | |
| "status": "success", | |
| "message": f"Processed {file.filename}", | |
| "session_id": session_id, | |
| "document_description": document_description, | |
| "suggested_questions": suggested_questions | |
| } | |
| # Add user_id to result if available | |
| if user_id: | |
| result["user_id"] = user_id | |
| return result | |
| finally: | |
| # Clean up the temporary file | |
| try: | |
| os.unlink(temp_file.name) | |
| except Exception as e: | |
| print(f"Error cleaning up temporary file: {e}") | |
| async def get_document_summary(request: DocumentSummaryRequest): | |
| """ | |
| Get a summary of the document | |
| Args: | |
| request: Request containing session_id and optional user_id | |
| Returns: | |
| DocumentSummaryResponse with topics, entities, word cloud data, and structure | |
| """ | |
| session_id = request.session_id | |
| user_id = request.user_id | |
| # Check if session exists | |
| if session_id not in user_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found. Please upload a document first.") | |
| # Get the retrieval pipeline from the session | |
| retrieval_pipeline = user_sessions[session_id] | |
| # Update prompts if user_id is provided and different from current | |
| if user_id and retrieval_pipeline.system_template != get_user_prompts(user_id)["system_template"]: | |
| user_prompt_templates = get_user_prompts(user_id) | |
| retrieval_pipeline.update_templates( | |
| user_prompt_templates["system_template"], | |
| user_prompt_templates["user_template"] | |
| ) | |
| # Get access to the document content | |
| vector_db = retrieval_pipeline.vector_db_retriever | |
| # We'll use all the text chunks to create a comprehensive summary | |
| # Get all text chunks from the vector store | |
| all_texts = vector_db.get_all_texts() | |
| # Combine a sample of the texts (to avoid hitting token limits) | |
| sample_texts = all_texts[:10] if len(all_texts) > 10 else all_texts | |
| doc_content = "\n".join(sample_texts) | |
| # Create the LLM summary prompt | |
| summary_prompt = f""" | |
| Analyze the following document content and generate a structured summary in JSON format: | |
| ``` | |
| {doc_content} | |
| ``` | |
| Return ONLY a JSON object with the following structure: | |
| {{ | |
| "keyTopics": [list of 5-7 key topics in the document], | |
| "entities": [list of 5-8 important named entities such as organizations, technologies, or people], | |
| "wordCloudData": [ | |
| {{ "text": "word1", "value": frequency_score }}, | |
| {{ "text": "word2", "value": frequency_score }}, | |
| ... | |
| ], | |
| "documentStructure": [ | |
| {{ | |
| "title": "Section title", | |
| "subsections": ["Subsection1", "Subsection2", ...] | |
| }}, | |
| ... | |
| ] | |
| }} | |
| The wordCloudData should contain 15-20 important terms with their relative frequency scores (higher numbers = more important/frequent). | |
| The documentStructure should reflect the hierarchical organization of the document with main sections and their subsections. | |
| """ | |
| # Get LLM response | |
| try: | |
| llm = retrieval_pipeline.llm | |
| response = await llm.acreate_single_response(summary_prompt) | |
| # Parse the JSON | |
| # Find JSON content (sometimes the LLM adds extra text) | |
| import re | |
| import json | |
| json_match = re.search(r'({[\s\S]*})', response) | |
| if json_match: | |
| json_str = json_match.group(1) | |
| summary_data = json.loads(json_str) | |
| else: | |
| # If no JSON found, create a basic structure with an error message | |
| summary_data = { | |
| "keyTopics": ["Error parsing document structure"], | |
| "entities": ["Please try again"], | |
| "wordCloudData": [{"text": "Error", "value": 50}], | |
| "documentStructure": [{"title": "Document structure unavailable", "subsections": []}] | |
| } | |
| # Ensure the response has all required fields | |
| if "keyTopics" not in summary_data: | |
| summary_data["keyTopics"] = ["Topic extraction failed"] | |
| if "entities" not in summary_data: | |
| summary_data["entities"] = ["Entity extraction failed"] | |
| if "wordCloudData" not in summary_data: | |
| summary_data["wordCloudData"] = [{"text": "Data", "value": 50}] | |
| if "documentStructure" not in summary_data: | |
| summary_data["documentStructure"] = [{"title": "Structure unavailable", "subsections": []}] | |
| return summary_data | |
| except Exception as e: | |
| # Return a fallback summary on error | |
| return { | |
| "keyTopics": ["Error analyzing document"], | |
| "entities": ["Try refreshing the page"], | |
| "wordCloudData": [ | |
| {"text": "Error", "value": 60}, | |
| {"text": "Document", "value": 40}, | |
| {"text": "Analysis", "value": 30} | |
| ], | |
| "documentStructure": [ | |
| {"title": "Error in document analysis", "subsections": ["Please try again"]} | |
| ] | |
| } |