Spaces:
Runtime error
Runtime error
| from typing import Annotated, List, Optional | |
| from datetime import datetime | |
| from fastapi import APIRouter, Depends | |
| from fastapi.responses import JSONResponse | |
| from sse_starlette.sse import EventSourceResponse | |
| from sqlalchemy import select | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy.exc import SQLAlchemyError, NoResultFound | |
| from service.dto import UserPromptRequest, BotResponse, BotCreateRequest | |
| from core.chat.chatstore import ChatStore | |
| # from core.chat.bot_service import ChatCompletionService | |
| from core.chat.bot_service_multimodal import ChatCompletionService | |
| from db.database import get_db | |
| from db.models import Bot_Meta, Bot, Metadata | |
| from db.models import Session as SessionModel | |
| from langfuse.llama_index import LlamaIndexCallbackHandler | |
| from api.auth import check_user_authentication | |
| from api.router.user import user_dependency | |
| from api.function import generate_streaming_completion | |
| from utils.utils import generate_uuid | |
| from utils.error_handlers import handle_exception | |
| router = APIRouter(tags=["Bot_Specific"]) | |
| db_dependency = Annotated[Session, Depends(get_db)] | |
| def get_chat_store(): | |
| return ChatStore() | |
| async def create_bot_id( | |
| user: user_dependency, | |
| db: db_dependency, | |
| bot_request: BotCreateRequest, | |
| ): | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| # Create a new bot entry | |
| try: | |
| # Create a new bot entry | |
| new_bot = Bot( | |
| user_id=user.get("id"), bot_name=bot_request.name | |
| ) # Assuming user has an 'id' attribute | |
| db.add(new_bot) | |
| db.commit() # Commit the transaction | |
| db.refresh(new_bot) # Optional: Refresh the instance with the database state | |
| return {"status": "success", "bot_id": new_bot.id} | |
| except SQLAlchemyError as e: | |
| db.rollback() # Roll back the transaction in case of an error | |
| return JSONResponse(status_code=500, content=f"Database error: {str(e)}") | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=500, content=f"An unexpected error occurred: {str(e)}" | |
| ) | |
| async def create_bot_specific( | |
| user: user_dependency, | |
| db: db_dependency, | |
| bot_id: int, | |
| metadata_id: List[Optional[int]], # Use the Pydantic model | |
| ): | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| # Create BotMeta instances for each metadata_id | |
| bot_meta_entries = [ | |
| Bot_Meta(bot_id=bot_id, metadata_id=mid) for mid in metadata_id | |
| ] | |
| # Insert all entries into the database | |
| db.add_all(bot_meta_entries) | |
| db.commit() # Commit the transaction | |
| except SQLAlchemyError as e: | |
| return JSONResponse(status_code=500, content=f"Database error: {str(e)}") | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=500, content=f"An unexpected error occurred: {str(e)}" | |
| ) | |
| return {"status": "success", "bot_meta": [entry.id for entry in bot_meta_entries]} | |
| async def update_bot_specific( | |
| user: user_dependency, | |
| db: db_dependency, | |
| bot_id: int, | |
| metadata_id: List[Optional[int]], # Use the Pydantic model | |
| ): | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| # Fetch existing Bot_Meta entries related to bot_id | |
| existing_entries = db.query(Bot_Meta).filter(Bot_Meta.bot_id == bot_id).all() | |
| # Delete existing entries | |
| for entry in existing_entries: | |
| db.delete(entry) | |
| # Commit the deletion | |
| db.commit() | |
| # Insert the new metadata entries | |
| bot_meta_entries = [ | |
| Bot_Meta(bot_id=bot_id, metadata_id=mid) for mid in metadata_id | |
| ] | |
| db.add_all(bot_meta_entries) | |
| db.commit() | |
| return { | |
| "status": "success", | |
| "bot_meta": [entry.id for entry in bot_meta_entries], | |
| } | |
| except SQLAlchemyError as e: | |
| db.rollback() # Rollback in case of any database error | |
| return JSONResponse(status_code=500, content=f"Database error: {str(e)}") | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def delete_bot_specific( | |
| user: user_dependency, | |
| db: db_dependency, | |
| bot_id: int, | |
| metadata_id: int, # Changed to int to specify a single metadata_id | |
| ): | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| # Delete the specific metadata entry for the given bot_id | |
| bot_meta_entry = ( | |
| db.query(Bot_Meta) | |
| .filter(Bot_Meta.bot_id == bot_id, Bot_Meta.metadata_id == metadata_id) | |
| .first() # Use first() to get a single entry | |
| ) | |
| if not bot_meta_entry: | |
| return JSONResponse(status_code=404, content="No entry found to delete.") | |
| # Delete the found entry | |
| db.delete(bot_meta_entry) | |
| db.commit() | |
| return { | |
| "status": "success", | |
| "deleted_entry_id": bot_meta_entry.id, | |
| } | |
| except SQLAlchemyError as e: | |
| db.rollback() # Rollback in case of any database error | |
| return JSONResponse(status_code=500, content=f"Database error: {str(e)}") | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def delete_bot_id( | |
| user: user_dependency, | |
| db: db_dependency, | |
| bot_id: int, | |
| ): | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| # Fetch the bot to ensure it exists | |
| bot_entry = db.query(Bot).filter(Bot.id == bot_id).first() | |
| print("bot entry", bot_entry) | |
| if not bot_entry: | |
| return JSONResponse( | |
| status_code=404, content=f"Bot with id {bot_id} not found." | |
| ) | |
| db.query(SessionModel).filter(SessionModel.bot_id == bot_id).delete( | |
| synchronize_session="fetch" | |
| ) | |
| db.query(Bot_Meta).filter(Bot_Meta.bot_id == bot_id).delete( | |
| synchronize_session="fetch" | |
| ) | |
| db.delete(bot_entry) | |
| db.commit() # Commit all deletions | |
| return { | |
| "status": "success", | |
| "deleted_bot_id": bot_id, | |
| } | |
| except SQLAlchemyError as e: | |
| db.rollback() # Rollback in case of any database error | |
| return JSONResponse(status_code=500, content=f"Database error: {str(e)}") | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def create_new_session(user: user_dependency, db: db_dependency, bot_id: int): | |
| # Check if user is authenticated | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| user_id = user.get("id") | |
| # Ensure the bot belongs to the user | |
| bot_query = select(Bot).where(Bot.id == bot_id, Bot.user_id == user_id) | |
| try: | |
| bot = db.execute(bot_query).scalar_one() | |
| print(bot) | |
| except NoResultFound: | |
| return JSONResponse( | |
| status_code=404, content="Bot not found or unauthorized access." | |
| ) | |
| # Generate a new session ID (UUID) | |
| try: | |
| session_id = generate_uuid() | |
| # Create the new session | |
| new_session = SessionModel( | |
| id=session_id, | |
| user_id=user.get("id"), | |
| bot_id=bot_id, | |
| ) | |
| db.add(new_session) | |
| db.commit() # Commit the new session to the database | |
| return { | |
| "session_id": session_id, | |
| } | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def get_all_session_ids(user: user_dependency, db: db_dependency, bot_id: int): | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| query = select(SessionModel.id, SessionModel.updated_at).where( | |
| SessionModel.user_id == user.get("id"), SessionModel.bot_id == bot_id | |
| ) | |
| result = db.execute(query) | |
| sessions = result.all() | |
| session_data = [ | |
| {"id": session.id, "updated_at": session.updated_at} for session in sessions | |
| ] | |
| return session_data | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def bot_generator_spesific( | |
| user: user_dependency, | |
| db: db_dependency, | |
| bot_id: int, | |
| session_id: str, | |
| user_prompt_request: UserPromptRequest, | |
| ): | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| langfuse_callback_handler = LlamaIndexCallbackHandler() | |
| langfuse_callback_handler.set_trace_params( | |
| user_id=user.get("username"), session_id=session_id | |
| ) | |
| # Query to retrieve the titles | |
| try: | |
| query = ( | |
| select(Metadata.title) | |
| .join(Bot_Meta, Metadata.id == Bot_Meta.metadata_id) | |
| .join(SessionModel, Bot_Meta.bot_id == bot_id) | |
| .where( | |
| SessionModel.user_id == user.get("id"), SessionModel.id == session_id | |
| ) | |
| ) | |
| result = db.execute(query) | |
| titles = result.scalars().all() | |
| print(titles) | |
| if user_prompt_request.streaming: | |
| return EventSourceResponse( | |
| generate_streaming_completion( | |
| user_prompt_request.prompt, | |
| session_id, | |
| ) | |
| ) | |
| else: | |
| bot_service = ChatCompletionService(session_id, user_prompt_request.prompt, titles, type_bot="specific") | |
| response, metadata, scores = bot_service.generate_completion() | |
| existing_session = ( | |
| db.query(SessionModel).filter(SessionModel.id == session_id).first() | |
| ) | |
| existing_session.updated_at = datetime.now() | |
| db.commit() | |
| return BotResponse( | |
| content=response, | |
| metadata=metadata, | |
| scores=scores, | |
| ) | |
| except SQLAlchemyError as e: | |
| return JSONResponse(status_code=500, content=f"Database error: {str(e)}") | |
| except Exception as e: | |
| return handle_exception(e) | |