Spaces:
Runtime error
Runtime error
| import json | |
| import logging | |
| from typing import Any | |
| from dotenv import load_dotenv | |
| from fastapi import HTTPException, UploadFile | |
| from fastapi.responses import JSONResponse | |
| from core.chat.engine import Engine | |
| from core.parser import clean_text | |
| from langfuse.llama_index import LlamaIndexCallbackHandler | |
| from script.document_uploader import Uploader | |
| from script.vector_db import IndexManager | |
| from service.aws_loader import Loader | |
| from service.dto import BotResponseStreaming | |
| from utils.error_handlers import handle_exception | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| async def data_ingestion(reference, file: UploadFile, lang: str = "en") -> Any: | |
| try: | |
| # Assuming you have a Langfuse callback handler | |
| langfuse_callback_handler = LlamaIndexCallbackHandler() | |
| langfuse_callback_handler.set_trace_params( | |
| user_id="admin_book_uploaded", | |
| ) | |
| uploader = Uploader(reference, file, lang) | |
| nodes_with_metadata, file_stream = await uploader.process_documents() | |
| if isinstance(nodes_with_metadata, JSONResponse): | |
| return nodes_with_metadata # Return the error response directly | |
| # Build indexes using IndexManager | |
| index = IndexManager() | |
| index.build_indexes(nodes_with_metadata) | |
| # Upload AWS | |
| file_name = f"{reference['title']}" | |
| aws_loader = Loader() | |
| aws_loader.upload_to_s3(file_stream, file_name) | |
| return json.dumps( | |
| {"status": "success", "message": "Vector Index loaded successfully."} | |
| ) | |
| except Exception as e: | |
| # Log the error | |
| logging.error("An error occurred in data ingestion: %s", e) | |
| # Use handle_exception for structured error handling | |
| return handle_exception(e) | |
| async def generate_streaming_completion(user_request, session_id): | |
| try: | |
| engine = Engine() | |
| index_manager = IndexManager() | |
| # Load existing indexes | |
| index = index_manager.load_existing_indexes() | |
| # Retrieve the chat engine with the loaded index | |
| chat_engine = engine.get_chat_engine(index, session_id) | |
| # Generate completion response | |
| response = chat_engine.stream_chat(user_request) | |
| completed_response = "" | |
| for gen in response.response_gen: | |
| completed_response += gen # Concatenate the new string | |
| yield BotResponseStreaming( | |
| content=gen, completed_content=completed_response | |
| ) | |
| nodes = response.source_nodes | |
| for node in nodes: | |
| reference = str(clean_text(node.node.get_text())) | |
| metadata = dict(node.node.metadata) | |
| score = float(node.score) | |
| yield BotResponseStreaming( | |
| completed_content=completed_response, | |
| reference=reference, | |
| metadata=metadata, | |
| score=score, | |
| ) | |
| except Exception as e: | |
| yield {"error": str(e)} | |
| except Exception as e: | |
| # Log the error and raise HTTPException for FastAPI | |
| logging.error(f"An error occurred in generate text: {e}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail="An internal server error occurred in generate text.", | |
| ) from e | |