Spaces:
Sleeping
Sleeping
| from modules.functions import call_llm | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel, Field | |
| import os | |
| import sqlite3 | |
| import logging | |
| import asyncio | |
| import time | |
| from typing import List, Dict | |
| from typing_extensions import TypedDict | |
| app = FastAPI(debug=True) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.WARNING, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| handlers=[logging.FileHandler("app.log"), logging.StreamHandler()], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # SQLite setup | |
| DB_PATH = "app/data/conversations.db" | |
| # In-memory storage for conversations | |
| CONVERSATIONS: Dict[str, List[Dict[str, str]]] = {} | |
| LAST_ACTIVITY: Dict[str, float] = {} | |
| # initialize SQLite database | |
| def init_db(): | |
| logger.info("Initializing database") | |
| os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute( | |
| """CREATE TABLE IF NOT EXISTS conversations | |
| (id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| conversation_id TEXT, | |
| messages TEXT | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)""" | |
| ) | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database initialized successfully") | |
| init_db() | |
| def update_db(conversation_id, messages): | |
| logger.info(f"Updating database for conversation: {conversation_id}") | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute( | |
| "SELECT COUNT(*) FROM conversations WHERE conversation_id = ?", | |
| (conversation_id,), | |
| ) | |
| row_exists = c.fetchone()[0] | |
| if row_exists: | |
| c.execute( | |
| """UPDATE conversations SET messages = ? WHERE conversation_id = ?""", | |
| (str(messages), conversation_id), | |
| ) | |
| else: | |
| c.execute( | |
| f"INSERT INTO conversations (conversation_id, messages) VALUES (?, ?)", | |
| (conversation_id, str(messages)), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database updated successfully") | |
| def get_conversation_from_db(conversation_id): | |
| conn = sqlite3.connect(DB_PATH) | |
| try: | |
| c = conn.cursor() | |
| c.execute( | |
| """SELECT messages FROM conversations WHERE conversation_id = ?""", | |
| (conversation_id,), | |
| ) | |
| conversation = c.fetchone() | |
| if conversation: | |
| return conversation[0] | |
| else: | |
| return None | |
| finally: | |
| conn.close() | |
| async def clear_inactive_conversations(): | |
| while True: | |
| logger.info("Clearing inactive conversations") | |
| current_time = time.time() | |
| inactive_convos = [ | |
| conv_id | |
| for conv_id, last_time in LAST_ACTIVITY.items() | |
| if current_time - last_time > 1800 | |
| ] # 30 minutes | |
| for conv_id in inactive_convos: | |
| if conv_id in CONVERSATIONS: | |
| del CONVERSATIONS[conv_id] | |
| if conv_id in LAST_ACTIVITY: | |
| del LAST_ACTIVITY[conv_id] | |
| logger.info(f"Cleared {len(inactive_convos)} inactive conversations") | |
| await asyncio.sleep(60) # Check every minutes | |
| class Output(TypedDict): | |
| type: str | |
| content: str | |
| class UserInput(BaseModel): | |
| ConversationID: str = Field(examples=["123e4567-e89b-12d3-a456-426614174000"]) | |
| Query: str = Field(examples=["Nifty 50 Annual return for past 10 years"]) | |
| class Response(BaseModel): | |
| response: List[Output] = Field( | |
| examples=[ | |
| [ | |
| { | |
| "type": "text", | |
| "content": "### Nifty 50 Annual Return for Past 10 Years...", | |
| }, | |
| { | |
| "type": "plotly", | |
| "content": '{"data":[{"x":[null,6.75517596225125.....}', | |
| }, | |
| ] | |
| ] | |
| ) | |
| executed_code: List[str] = Field( | |
| examples=[ | |
| [ | |
| """import folium | |
| m = folium.Map(location=[35, 100....""", | |
| """from IPython.display import Image | |
| urls = ["https://up""", | |
| ] | |
| ] | |
| ) | |
| async def get_response(user_query: UserInput) -> Response: | |
| conv_id = user_query.ConversationID | |
| query = user_query.Query | |
| if conv_id in CONVERSATIONS: | |
| history = CONVERSATIONS[conv_id] + [{"role": "user", "content": query}] | |
| else: | |
| db_response = get_conversation_from_db(conv_id) | |
| if db_response: | |
| history = eval(db_response) + [{"role": "user", "content": query}] | |
| else: | |
| CONVERSATIONS[conv_id] = [] | |
| history = [{"role": "user", "content": query}] | |
| print(history) | |
| results, llm_response, python_code = call_llm(history) | |
| history += [{"role": "assistant", "content": llm_response}] | |
| CONVERSATIONS[conv_id] = history | |
| update_db(conversation_id=conv_id, messages=history) | |
| return {"response": results, "executed_code": python_code} # type:ignore | |