Spaces:
Runtime error
Runtime error
| from script.vector_db import IndexManager | |
| from script.document_uploader import Uploader | |
| from db.get_data import GetDatabase | |
| from db.delete_data import DeleteDatabase | |
| from db.update_data import UpdateDatabase | |
| from typing import Any | |
| from fastapi import UploadFile | |
| from fastapi import HTTPException | |
| from fastapi.responses import JSONResponse | |
| from llama_index.core.llms import MessageRole | |
| from core.chat.engine import Engine | |
| from core.chat.chatstore import ChatStore | |
| from core.parser import clean_text, update_response, renumber_sources | |
| from service.dto import BotResponseStreaming, ChatMessage | |
| from service.aws_loader import Loader | |
| from pymongo.mongo_client import MongoClient | |
| from dotenv import load_dotenv | |
| from typing import List | |
| from datetime import datetime | |
| import redis | |
| import logging | |
| import re | |
| import json | |
| import os | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| async def data_ingestion(category_id, reference, file: UploadFile) -> Any: | |
| try: | |
| # Upload to AWS | |
| file_name = f"{reference['title']}" | |
| aws_loader = Loader() | |
| file_obj = file | |
| aws_loader.upload_to_s3(file_obj, file_name) | |
| uploader = Uploader(reference, file) | |
| nodes_with_metadata = await uploader.process_documents() | |
| # Build indexes using IndexManager | |
| index = IndexManager() | |
| index.build_indexes(nodes_with_metadata) | |
| return json.dumps( | |
| {"status": "success", "message": "Vector Index loaded successfully."} | |
| ) | |
| except Exception as e: | |
| # Log the error and raise HTTPException for FastAPI | |
| logging.error("An error occurred in data ingestion: %s", e) | |
| return JSONResponse( | |
| status_code=500, | |
| content="An internal server error occurred in data ingestion.", | |
| ) | |
| async def get_data(db_conn, title=None, fetch_all_data=True): | |
| get_database = GetDatabase(db_conn) | |
| print(get_database) | |
| try: | |
| if fetch_all_data: | |
| results = await get_database.get_all_data() | |
| print(results) | |
| logging.info("Database fetched all data") | |
| return results | |
| else: | |
| results = await get_database.get_data(title) | |
| logging.info("Database fetched one data") | |
| return results | |
| except Exception as e: | |
| # Log the error and raise HTTPException for FastAPI | |
| logging.error("An error occurred in get data: %s", e) | |
| return JSONResponse( | |
| status_code=500, content="An internal server error occurred in get data." | |
| ) | |
| async def update_data(id: int, reference, db_conn): | |
| update_database = UpdateDatabase(db_conn) | |
| try: | |
| reference = reference.model_dump() | |
| print(reference) | |
| reference.update({"id": id}) | |
| print(reference) | |
| await update_database.update_record(reference) | |
| response = {"status": "Update Success"} | |
| return response | |
| except Exception as e: | |
| # Log the error and raise HTTPException for FastAPI | |
| logging.error("An error occurred in update data: %s", e) | |
| return JSONResponse( | |
| status_code=500, content="An internal server error occurred in update data." | |
| ) | |
| async def delete_data(id: int, db_conn): | |
| delete_database = DeleteDatabase(db_conn) | |
| try: | |
| params = {"id": id} | |
| await delete_database.delete_record(params) | |
| response = {"status": "Delete Success"} | |
| return response | |
| except Exception as e: | |
| # Log the error and raise HTTPException for FastAPI | |
| logging.error("An error occurred in get data: %s", e) | |
| return JSONResponse( | |
| status_code=500, content="An internal server error occurred in delete data." | |
| ) | |
| def generate_completion_non_streaming( | |
| session_id, user_request, titles: List = None, type_bot="general" | |
| ): | |
| uri = os.getenv("MONGO_URI") | |
| engine = Engine() | |
| index_manager = IndexManager() | |
| chatstore = ChatStore() | |
| client = MongoClient(uri) | |
| try: | |
| client.admin.command("ping") | |
| print("Pinged your deployment. You successfully connected to MongoDB!") | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content=f"Database Error as {e}") | |
| try: | |
| # Load existing indexes | |
| index = index_manager.load_existing_indexes() | |
| if type_bot == "general": | |
| # Retrieve the chat engine with the loaded index | |
| chat_engine = engine.get_chat_engine(session_id, index) | |
| else: | |
| # Retrieve the chat engine with the loaded index | |
| chat_engine = engine.get_chat_engine(session_id, index, titles, type_bot) | |
| # Generate completion response | |
| response = chat_engine.chat(user_request) | |
| sources = response.sources | |
| number_reference = list(set(re.findall(r"\[(\d+)\]", str(response)))) | |
| number_reference_sorted = sorted(number_reference) | |
| contents = [] | |
| metadata_collection = [] | |
| scores = [] | |
| if number_reference_sorted: | |
| for number in number_reference_sorted: | |
| # Konversi number ke integer untuk digunakan sebagai indeks | |
| number = int(number) | |
| # Pastikan sources tidak kosong dan memiliki elemen yang diperlukan | |
| if sources and len(sources) > 0: | |
| node = dict(sources[0])["raw_output"].source_nodes | |
| # Pastikan number valid sebagai indeks | |
| if 0 <= number - 1 < len(node): | |
| content = clean_text(node[number - 1].node.get_text()) | |
| contents.append(content) | |
| metadata = dict(node[number - 1].node.metadata) | |
| metadata_collection.append(metadata) | |
| score = node[number - 1].score | |
| scores.append(score) | |
| else: | |
| print(f"Invalid reference number: {number}") | |
| else: | |
| print("No sources available") | |
| else: | |
| print("There are no references") | |
| response = update_response(str(response)) | |
| contents = renumber_sources(contents) | |
| # Check the lengths of content and metadata | |
| num_content = len(contents) | |
| num_metadata = len(metadata_collection) | |
| # Add content to metadata | |
| for i in range(min(num_content, num_metadata)): | |
| metadata_collection[i]["content"] = re.sub(r"source \d+\:", "", contents[i]) | |
| message = ChatMessage( | |
| role=MessageRole.ASSISTANT, content=response, metadata=metadata_collection | |
| ) | |
| chatstore.delete_last_message(session_id) | |
| chatstore.add_message(session_id, message) | |
| chatstore.clean_message(session_id) | |
| except Exception as e: | |
| # Log the error and raise HTTPException for FastAPI | |
| logging.error("An error occurred in generate text: %s", e) | |
| return JSONResponse( | |
| status_code=500, | |
| content=f"An internal server error occurred in generate text as {e}.") | |
| try : | |
| chat_history_db = [ | |
| ChatMessage(role=MessageRole.SYSTEM, | |
| content=user_request, | |
| timestamp=datetime.now(), | |
| payment = "free" if type_bot=="general" else None | |
| ), | |
| ChatMessage( | |
| role=MessageRole.ASSISTANT, | |
| content=response, | |
| metadata=metadata_collection, | |
| timestamp=datetime.now(), | |
| payment = "free" if type_bot=="general" else None | |
| ) | |
| ] | |
| chat_history_json = [message.model_dump() for message in chat_history_db] | |
| db = client["bot_database"] # Replace with your database name | |
| collection = db[session_id] # Replace with your collection name | |
| result = collection.insert_many(chat_history_json) | |
| print("Data inserted with record ids", result.inserted_ids) | |
| return str(response), metadata_collection, scores | |
| except Exception as e: | |
| # Log the error and raise HTTPException for FastAPI | |
| logging.error("An error occurred in generate text: %s", e) | |
| return JSONResponse( | |
| status_code=500, | |
| content=f"An internal server error occurred in generate text as {e}.") | |
| async def generate_streaming_completion(user_request, session_id): | |
| try: | |
| engine = Engine() | |
| index_manager = IndexManager() | |
| # Load existing indexes | |
| index = index_manager.load_existing_indexes() | |
| # Retrieve the chat engine with the loaded index | |
| chat_engine = engine.get_chat_engine(index, session_id) | |
| # Generate completion response | |
| response = chat_engine.stream_chat(user_request) | |
| completed_response = "" | |
| for gen in response.response_gen: | |
| completed_response += gen # Concatenate the new string | |
| yield BotResponseStreaming( | |
| content=gen, completed_content=completed_response | |
| ) | |
| nodes = response.source_nodes | |
| for node in nodes: | |
| reference = str(clean_text(node.node.get_text())) | |
| metadata = dict(node.node.metadata) | |
| score = float(node.score) | |
| yield BotResponseStreaming( | |
| completed_content=completed_response, | |
| reference=reference, | |
| metadata=metadata, | |
| score=score, | |
| ) | |
| except Exception as e: | |
| yield {"error": str(e)} | |
| except Exception as e: | |
| # Log the error and raise HTTPException for FastAPI | |
| logging.error(f"An error occurred in generate text: {e}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail="An internal server error occurred in generate text.", | |
| ) from e | |