Spaces:
Sleeping
Sleeping
| import io | |
| import tempfile | |
| from ipaddress import ip_address | |
| from typing import Optional | |
| import base64 | |
| import jwt | |
| import json | |
| from click import option | |
| from jwt import ExpiredSignatureError, InvalidTokenError | |
| from starlette import status | |
| from functions import * | |
| import pandas as pd | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, Request, Query | |
| from pydantic import BaseModel | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from src.api.speech_api import speech_translator_router | |
| from functions import client as supabase | |
| from urllib.parse import urlparse | |
| from collections import Counter, defaultdict | |
| from datetime import datetime, timedelta | |
| from dateutil.parser import isoparse | |
| app = FastAPI(title="ConversAI", root_path="/api/v1") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.include_router(speech_translator_router, prefix="/speech") | |
| async def sign_up(email, username, password): | |
| res, _ = supabase.auth.sign_up( | |
| {"email": email, "password": password, "role": "user"} | |
| ) | |
| user_id = res[1].id | |
| r_ = createUser(user_id=user_id, username=username, email=email) | |
| if r_.get('code') == 409: | |
| return r_ | |
| elif r_.get('code') == 200: | |
| response = { | |
| "status": "success", | |
| "code": 200, | |
| "message": "Please check you email address for email verification", | |
| } | |
| else: | |
| response = { | |
| "status": "failed", | |
| "code": 400, | |
| "message": "Failed to sign up please try again later", | |
| } | |
| return response | |
| async def check_session(user_id: str): | |
| res = supabase.auth.get_session() | |
| if res == None: | |
| try: | |
| supabase.table("Stores").delete().eq( | |
| "StoreID", user_id | |
| ).execute() | |
| resp = supabase.auth.sign_out() | |
| response = {"message": "success", "code": 200, "Session": res} | |
| return response | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| return res | |
| async def get_user(access_token): | |
| res = supabase.auth.get_user(jwt=access_token) | |
| return res | |
| async def refresh_token(refresh_token): | |
| res = supabase.auth.refresh_token(refresh_token) | |
| return res | |
| async def sign_in(email, password): | |
| try: | |
| res = supabase.auth.sign_in_with_password( | |
| {"email": email, "password": password} | |
| ) | |
| user_id = res.user.id | |
| access_token = res.session.access_token | |
| refresh_token = res.session.refresh_token | |
| store_session_check = supabase.table("Stores").select("*").filter("StoreID", "eq", user_id).execute() | |
| store_id = None | |
| if store_session_check and store_session_check.data: | |
| store_id = store_session_check.data[0].get("StoreID") | |
| userData = supabase.table("ConversAI_UserInfo").select("*").filter("user_id", "eq", user_id).execute().data | |
| username = userData[0]["username"] | |
| if not store_id: | |
| response = ( | |
| supabase.table("Stores").insert( | |
| { | |
| "AccessToken": access_token, | |
| "StoreID": user_id, | |
| "RefreshToken": refresh_token, | |
| "email": email | |
| } | |
| ).execute() | |
| ) | |
| message = { | |
| "message": "Success", | |
| "code": status.HTTP_200_OK, | |
| "username": username, | |
| "user_id": user_id, | |
| "access_token": access_token, | |
| "refresh_token": refresh_token, | |
| } | |
| return message | |
| elif store_id == user_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="You are already signed in. Please sign out first to sign in again." | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Failed to sign in. Please check your credentials." | |
| ) | |
| except HTTPException as http_exc: | |
| raise http_exc | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"An unexpected error occurred during sign-in: {str(e)}" | |
| ) | |
| async def login_with_token(access_token: str, refresh_token: str): | |
| try: | |
| decoded_token = jwt.decode(access_token, options={"verify_signature": False}) | |
| user_id_oauth = decoded_token.get("sub") | |
| try: | |
| user_id = supabase.table("ConversAI_UserInfo").select("*").filter("user_id", "eq", user_id_oauth).execute() | |
| user_id = supabase.table("ConversAI_UserInfo").select("*").filter("email", "eq", user_id_oauth).execute() | |
| user_name = user_id.data[0]["username"] | |
| except: | |
| user_name = '' | |
| json = { | |
| "code": status.HTTP_200_OK, | |
| "user_id": decoded_token.get("sub"), | |
| "email": decoded_token.get("email"), | |
| "access_token": access_token, | |
| "refresh_token": refresh_token, | |
| "issued_at": decoded_token.get("iat"), | |
| "expires_at": decoded_token.get("exp"), | |
| "username": user_name | |
| } | |
| return json | |
| except (ExpiredSignatureError, InvalidTokenError) as e: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e)) | |
| async def user_name_(username: str, user_id: str, email: str): | |
| r_ = createUser(user_id=user_id, username=username, email=email) | |
| return r_ | |
| async def set_session_data(access_token, refresh_token, user_id): | |
| res = supabase.auth.set_session(access_token, refresh_token) | |
| store_session_check = supabase.table("Stores").select("*").filter("StoreID", "eq", user_id).execute() | |
| store_id = None | |
| if store_session_check and store_session_check.data: | |
| store_id = store_session_check.data[0].get("StoreID") | |
| if not store_id: | |
| response = ( | |
| supabase.table("Stores").insert( | |
| { | |
| "AccessToken": access_token, | |
| "StoreID": user_id, | |
| "RefreshToken": refresh_token, | |
| } | |
| ).execute() | |
| ) | |
| res = { | |
| "message": "success", | |
| "code": 200, | |
| "session_data": res, | |
| } | |
| return res | |
| async def sign_out(user_id): | |
| try: | |
| supabase.table("Stores").delete().eq( | |
| "StoreID", user_id | |
| ).execute() | |
| res = supabase.auth.sign_out() | |
| response = {"message": "success"} | |
| return response | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def oauth(): | |
| res = supabase.auth.sign_in_with_oauth( | |
| {"provider": "google", "options": {"redirect_to": "https://convers-ai-test.vercel.app/home"}}) | |
| return res | |
| async def newChatbot(chatbotName: str, username: str): | |
| currentBotCount = len(listTables(username=username)["output"]) | |
| limit = supabase.table("ConversAI_UserConfig").select("chatbotLimit").eq("user_id", username).execute().data[0][ | |
| "chatbotLimit"] | |
| if currentBotCount >= int(limit): | |
| return { | |
| "output": "CHATBOT LIMIT EXCEEDED" | |
| } | |
| supabase.table("ConversAI_ChatbotInfo").insert({"user_id": username, "chatbotname": chatbotName}).execute() | |
| chatbotName = f"convai${username}${chatbotName}" | |
| return createTable(tablename=chatbotName) | |
| async def loadPDF(vectorstore: str, pdf: UploadFile = File(...)): | |
| trackUsage(vectorstore=vectorstore, endpoint="/loadPDF") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| source = pdf.filename | |
| pdf = await pdf.read() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: | |
| temp_file.write(pdf) | |
| temp_file_path = temp_file.name | |
| text = extractTextFromPdf(temp_file_path) | |
| os.remove(temp_file_path) | |
| dct = { | |
| "output": text, | |
| "source": source | |
| } | |
| numTokens = len(" ".join([text[x] for x in text]).translate(str.maketrans('', '', string.punctuation)).split(" ")) | |
| dct = json.dumps(dct, indent=1).encode("utf-8", errors = "replace") | |
| fileName = createDataSourceName(sourceName=source) | |
| response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") | |
| response = ( | |
| supabase.table("ConversAI_ChatbotDataSources") | |
| .insert({"username": username, | |
| "chatbotName": chatbotName, | |
| "dataSourceName": fileName, | |
| "numTokens": numTokens, | |
| "sourceEndpoint": "/loadPDF", | |
| "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) | |
| .execute() | |
| ) | |
| return { | |
| "output": "SUCCESS" | |
| } | |
| async def loadImagePDF(vectorstore: str, pdf: UploadFile = File(...)): | |
| trackUsage(vectorstore=vectorstore, endpoint="/loadImagePDF") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| source = pdf.filename | |
| pdf = await pdf.read() | |
| text = getTextFromImagePDF(pdfBytes=pdf) | |
| dct = { | |
| "output": text, | |
| "source": source | |
| } | |
| dct = json.dumps(dct, indent=1).encode("utf-8", errors = "replace") | |
| fileName = createDataSourceName(sourceName=source) | |
| numTokens = len(" ".join([text[x] for x in text]).translate(str.maketrans('', '', string.punctuation)).split(" ")) | |
| response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") | |
| response = ( | |
| supabase.table("ConversAI_ChatbotDataSources") | |
| .insert({"username": username, | |
| "chatbotName": chatbotName, | |
| "dataSourceName": fileName, | |
| "numTokens": numTokens, | |
| "sourceEndpoint": "/loadImagePDF", | |
| "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) | |
| .execute() | |
| ) | |
| return { | |
| "output": "SUCCESS" | |
| } | |
| class AddText(BaseModel): | |
| vectorstore: str | |
| text: str | |
| async def loadText(addTextConfig: AddText): | |
| vectorstore, text = addTextConfig.vectorstore, addTextConfig.text | |
| trackUsage(vectorstore=vectorstore, endpoint="/loadText") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| text = cleanText(text = text) | |
| dct = { | |
| "output": text, | |
| "source": "Text" | |
| } | |
| numTokens = len(text.translate(str.maketrans('', '', string.punctuation)).split(" ")) | |
| dct = json.dumps(dct, indent=1).encode("utf-8", errors = "replace") | |
| fileName = createDataSourceName(sourceName="Text") | |
| response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") | |
| response = ( | |
| supabase.table("ConversAI_ChatbotDataSources") | |
| .insert({"username": username, | |
| "chatbotName": chatbotName, | |
| "dataSourceName": fileName, | |
| "numTokens": numTokens, | |
| "sourceEndpoint": "/loadText", | |
| "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) | |
| .execute() | |
| ) | |
| return { | |
| "output": "SUCCESS" | |
| } | |
| class AddQAPair(BaseModel): | |
| vectorstore: str | |
| question: str | |
| answer: str | |
| async def addQAPairData(addQaPair: AddQAPair): | |
| username, chatbotname = addQaPair.vectorstore.split("$")[1], addQaPair.vectorstore.split("$")[2] | |
| df = pd.DataFrame(supabase.table("ConversAI_ChatbotInfo").select("*").execute().data) | |
| currentCount = df[(df["user_id"] == username) & (df["chatbotname"] == chatbotname)]["charactercount"].iloc[0] | |
| qa = f"QUESTION: {addQaPair.question}\tANSWER: {addQaPair.answer}" | |
| newCount = currentCount + len(qa) | |
| limit = supabase.table("ConversAI_UserConfig").select("tokenLimit").eq("user_id", username).execute().data[0][ | |
| "tokenLimit"] | |
| if newCount < int(limit): | |
| supabase.table("ConversAI_ChatbotInfo").update({"charactercount": str(newCount)}).eq("user_id", username).eq( | |
| "chatbotname", chatbotname).execute() | |
| return addDocuments(text=qa, source="Q&A Pairs", vectorstore=addQaPair.vectorstore) | |
| else: | |
| return { | |
| "output": "WEBSITE EXCEEDING LIMITS, PLEASE TRY WITH A SMALLER DOCUMENT." | |
| } | |
| class LoadWebsite(BaseModel): | |
| vectorstore: str | |
| urls: list[str] | |
| source: str | |
| async def loadWebURLs(loadWebsite: LoadWebsite): | |
| vectorstore, urls, source = loadWebsite.vectorstore, loadWebsite.urls, loadWebsite.source | |
| trackUsage(vectorstore=vectorstore, endpoint="/loadWebURLs") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| text = extractTextFromUrlList(urls=urls) | |
| dct = { | |
| "output": text, | |
| "source": urlparse(source).netloc | |
| } | |
| numTokens = len(" ".join([text[x] for x in text]).translate(str.maketrans('', '', string.punctuation)).split(" ")) | |
| dct = json.dumps(dct, indent=1).encode("utf-8", errors = "replace") | |
| fileName = createDataSourceName(sourceName=urlparse(source).netloc) | |
| response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") | |
| response = ( | |
| supabase.table("ConversAI_ChatbotDataSources") | |
| .insert({"username": username, | |
| "chatbotName": chatbotName, | |
| "dataSourceName": fileName, | |
| "numTokens": numTokens, | |
| "sourceEndpoint": "/loadWebURLs", | |
| "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) | |
| .execute() | |
| ) | |
| return { | |
| "output": "SUCCESS" | |
| } | |
| async def answerQuestion(request: Request, query: str, vectorstore: str, llmModel: str = "llama3-70b-8192"): | |
| trackUsage(vectorstore=vectorstore, endpoint="/answerQuery") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| output = answerQuery(query=query, vectorstore=vectorstore, llmModel=llmModel) | |
| ip_address = request.client.host | |
| response_token_count = len(output["output"]) | |
| city = get_ip_info(ip_address) | |
| response = ( | |
| supabase.table("ConversAI_ChatHistory") | |
| .insert({"username": username, "chatbotName": chatbotName, "llmModel": llmModel, "question": query, | |
| "response": output["output"], "IpAddress": ip_address, "ResponseTokenCount": response_token_count, | |
| "vectorstore": vectorstore, "City": city}) | |
| .execute() | |
| ) | |
| return output | |
| async def deleteChatbot(vectorstore: str): | |
| trackUsage(vectorstore=vectorstore, endpoint="/deleteChatbot") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| supabase.table('ConversAI_ChatbotInfo').delete().eq('user_id', username).eq('chatbotname', chatbotName).execute() | |
| return deleteTable(tableName=vectorstore) | |
| async def listChatbots(username: str): | |
| return listTables(username=username) | |
| async def crawlUrl(baseUrl: str): | |
| return { | |
| "urls": getLinks(url=baseUrl, timeout=30), | |
| "source": urlparse(baseUrl).netloc | |
| } | |
| async def getCount(vectorstore: str): | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| df = pd.DataFrame(supabase.table("ConversAI_ChatbotInfo").select("*").execute().data) | |
| return { | |
| "currentCount": df[(df['user_id'] == username) & (df['chatbotname'] == chatbotName)]['charactercount'].iloc[0] | |
| } | |
| class YtTranscript(BaseModel): | |
| vectorstore: str | |
| urls: list[str] | |
| async def loadYoutubeTranscript(ytTranscript: YtTranscript): | |
| vectorstore, urls = ytTranscript.vectorstore, ytTranscript.urls | |
| trackUsage(vectorstore=vectorstore, endpoint="/loadYoutubeTranscript") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| text = getTranscript(urls=urls) | |
| dct = { | |
| "output": text, | |
| "source": "www.youtube.com" | |
| } | |
| numTokens = len(" ".join([text[x] for x in text]).translate(str.maketrans('', '', string.punctuation)).split(" ")) | |
| dct = json.dumps(dct, indent=1).encode("utf-8", errors = "replace") | |
| fileName = createDataSourceName(sourceName="youtube") | |
| response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") | |
| response = ( | |
| supabase.table("ConversAI_ChatbotDataSources") | |
| .insert({"username": username, | |
| "chatbotName": chatbotName, | |
| "dataSourceName": fileName, | |
| "numTokens": numTokens, | |
| "sourceEndpoint": "/getYoutubeTranscript", | |
| "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) | |
| .execute() | |
| ) | |
| return { | |
| "output": "SUCCESS" | |
| } | |
| async def analyzeAndAnswer(query: str, file: UploadFile = File(...)): | |
| extension = file.filename.split(".")[-1] | |
| try: | |
| if extension in ["xls", "xlsx", "xlsm", "xlsb"]: | |
| df = pd.read_excel(io.BytesIO(await file.read())) | |
| response = analyzeData(query=query, dataframe=df) | |
| elif extension == "csv": | |
| df = pd.read_csv(io.BytesIO(await file.read())) | |
| response = analyzeData(query=query, dataframe=df) | |
| else: | |
| response = "INVALID FILE TYPE" | |
| return { | |
| "output": response | |
| } | |
| except: | |
| return { | |
| "output": "UNABLE TO ANSWER QUERY" | |
| } | |
| async def chatHistory(vectorstore: str): | |
| trackUsage(vectorstore=vectorstore, endpoint="/getChatHistory") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| response = supabase.table("ConversAI_ChatHistory").select("timestamp", "question", "response").eq("username", | |
| username).eq( | |
| "chatbotName", chatbotName).execute().data | |
| return response | |
| async def listChatbotSources(vectorstore: str): | |
| trackUsage(vectorstore=vectorstore, endpoint="/listChatbotSources") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| result = supabase.table("ConversAI_ChatbotDataSources").select("*").eq("username", username).eq("chatbotName", | |
| chatbotName).execute().data | |
| return result | |
| async def getDataSource(vectorstore: str, sourceUrl: str): | |
| trackUsage(vectorstore=vectorstore, endpoint="/getDataSource") | |
| r = requests.get(sourceUrl) | |
| return encodeToBase64(eval(r.content.decode("utf-8", errors = "replace"))) | |
| async def deleteChatbotSource(vectorstore: str, dataSourceName: str): | |
| trackUsage(vectorstore=vectorstore, endpoint="/deleteChatbotSource") | |
| response = supabase.table("ConversAI_ChatbotDataSources").delete().eq("dataSourceName", dataSourceName).execute() | |
| response = supabase.storage.from_('ConversAI').remove(f"{dataSourceName}_data.json") | |
| return { | |
| "output": "SUCCESS" | |
| } | |
| class LoadEditedJson(BaseModel): | |
| vectorstore: str | |
| dataSourceName: str | |
| sourceEndpoint: str | |
| jsonData: dict | |
| async def loadEditedJson(loadEditedJsonConfig: LoadEditedJson): | |
| username, chatbotName = loadEditedJsonConfig.vectorstore.split("$")[1], loadEditedJsonConfig.vectorstore.split("$")[2] | |
| trackUsage(vectorstore=loadEditedJsonConfig.vectorstore, endpoint="/loadEditedJson") | |
| jsonData = decodeBase64(loadEditedJsonConfig.jsonData) | |
| jsonData = json.dumps(jsonData, indent = 1).encode("utf-8", errors = "replace") | |
| fileName = createDataSourceName(loadEditedJsonConfig.dataSourceName) | |
| response = supabase.storage.from_("ConversAI").upload(file=jsonData, path=f"{fileName}_data.json") | |
| response = ( | |
| supabase.table("ConversAI_ChatbotDataSources") | |
| .insert({"username": username, | |
| "chatbotName": chatbotName, | |
| "dataSourceName": fileName, | |
| "sourceEndpoint": loadEditedJsonConfig.sourceEndpoint, | |
| "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) | |
| .execute() | |
| ) | |
| return { | |
| "output": "SUCCESS" | |
| } | |
| async def activityLog(username: str): | |
| response = supabase.table("ConversAI_ActivityLog").select("*").eq("username", username).execute().data | |
| return response | |
| async def publicOrPrivate(vectorstore: str, mode: str | None = None): | |
| trackUsage(vectorstore=vectorstore, endpoint="/publicOrPrivate") | |
| username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] | |
| if mode == None: | |
| value = ( | |
| supabase.table("ConversAI_ChatbotInfo") | |
| .select("isPrivate") | |
| .eq("user_id", username) | |
| .eq("chatbotname", chatbotName) | |
| .execute() | |
| ) | |
| value = value.data[0]["isPrivate"] | |
| return { | |
| "output": value | |
| } | |
| else: | |
| response = ( | |
| supabase.table("ConversAI_ChatbotInfo") | |
| .update({"isPrivate": mode}) | |
| .eq("user_id", username) | |
| .eq("chatbotname", chatbotName) | |
| .execute() | |
| ) | |
| return { | |
| "output": "SUCCESS" | |
| } | |
| class TrainChatbot(BaseModel): | |
| vectorstore: str | |
| urls: list[str] | |
| async def trainChatbot(trainChatbotConfig: TrainChatbot): | |
| vectorstore, UrlSources = trainChatbotConfig.vectorstore, trainChatbotConfig.urls | |
| trackUsage(vectorstore=vectorstore, endpoint="/trainChatbot") | |
| texts = [] | |
| sources = [] | |
| fileTypes = [supabase.table("ConversAI_ChatbotDataSources").select("sourceEndpoint").eq("sourceContentURL", | |
| x).execute().data[0][ | |
| "sourceEndpoint"] for x in UrlSources] | |
| for source, fileType in zip(UrlSources, fileTypes): | |
| if ((fileType == "/loadPDF") | (fileType == "/loadImagePDF")): | |
| r = requests.get(source) | |
| file = eval(r.content.decode("utf-8", errors = "replace")) | |
| content = file["output"] | |
| fileSource = file["source"] | |
| texts.append(".".join( | |
| [content[key] for key in content.keys()]).replace( | |
| "\n", " ")) | |
| sources.append(fileSource) | |
| elif fileType == "/loadText": | |
| r = requests.get(source) | |
| file = eval(r.content.decode("utf-8", errors = "replace")) | |
| content = file["output"] | |
| fileSource = file["source"] | |
| texts.append(content.replace("\n", " ")) | |
| sources.append(fileSource) | |
| elif ((fileType == "/loadWebURLs") | (fileType == "/loadYoutubeTranscript")): | |
| r = requests.get(source) | |
| file = eval(r.content.decode("utf-8", errors = "replace")) | |
| content = file["output"] | |
| fileSource = file["source"] | |
| texts.append(".".join( | |
| [content[key] for key in content.keys()]).replace( | |
| "\n", " ")) | |
| sources.append(fileSource) | |
| else: | |
| pass | |
| texts = [(text, source) for text, source in zip(texts, sources)] | |
| return addDocuments(texts=texts, vectorstore=vectorstore) | |
| async def getPublicChatbots(): | |
| return supabase.table("ConversAI_ChatbotInfo").select("*").eq("isPrivate", False).execute().data | |
| def get_ip_info(ip: str): | |
| try: | |
| response = requests.get(f"https://ipinfo.io/{ip}/json") | |
| data = response.json() | |
| return data.get("city", "Unknown") | |
| except Exception as e: | |
| return "Unknown" | |
| async def daily_chat_count( | |
| start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), | |
| end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") | |
| ): | |
| if not start_date or not end_date: | |
| end_date = datetime.now().astimezone().date() | |
| start_date = end_date - timedelta(days=7) | |
| else: | |
| start_date = isoparse(start_date).date() | |
| end_date = isoparse(end_date).date() | |
| response = supabase.table("ConversAI_ChatHistory").select("*").execute().data | |
| dates = [ | |
| isoparse(i["timestamp"]).date() | |
| for i in response | |
| if start_date <= isoparse(i["timestamp"]).date() <= end_date | |
| ] | |
| date_count = Counter(dates) | |
| data = [{"date": date.isoformat(), "count": count} for date, count in date_count.items()] | |
| return {"data": data} | |
| async def daily_active_end_user( | |
| start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), | |
| end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") | |
| ): | |
| if not start_date or not end_date: | |
| end_date = datetime.now().astimezone().date() | |
| start_date = end_date - timedelta(days=7) | |
| else: | |
| start_date = isoparse(start_date).date() | |
| end_date = isoparse(end_date).date() | |
| response = supabase.table("ConversAI_ChatHistory").select("*").execute().data | |
| ip_by_date = defaultdict(set) | |
| for i in response: | |
| timestamp = isoparse(i["timestamp"]) | |
| ip_address = i["IpAddress"] | |
| if start_date <= timestamp.date() <= end_date: | |
| date = timestamp.date() | |
| ip_by_date[date].add(ip_address) | |
| data = [{"date": date.isoformat(), "terminal": len(ips)} for date, ips in ip_by_date.items() if len(ips) > 1] | |
| return {"data": data} | |
| async def average_session_interaction( | |
| start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), | |
| end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") | |
| ): | |
| if not start_date or not end_date: | |
| end_date = datetime.now().astimezone().date() | |
| start_date = end_date - timedelta(days=7) | |
| else: | |
| start_date = isoparse(start_date).date() | |
| end_date = isoparse(end_date).date() | |
| response = supabase.table("ConversAI_ChatHistory").select("*").execute().data | |
| total_messages_by_date = defaultdict(int) | |
| unique_ips_by_date = defaultdict(set) | |
| for i in response: | |
| timestamp = isoparse(i["timestamp"]) | |
| ip_address = i["IpAddress"] | |
| if start_date <= timestamp.date() <= end_date: | |
| date = timestamp.date() | |
| total_messages_by_date[date] += 1 | |
| unique_ips_by_date[date].add(ip_address) | |
| data = [] | |
| for date in sorted(total_messages_by_date.keys()): | |
| total_messages = total_messages_by_date[date] | |
| unique_ips = len(unique_ips_by_date[date]) | |
| average_interactions = total_messages / unique_ips if unique_ips > 0 else 0 | |
| data.append({"date": date.isoformat(), "interactions": average_interactions}) | |
| return {"data": data} | |
| async def token_usages( | |
| start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), | |
| end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") | |
| ): | |
| if not start_date or not end_date: | |
| end_date = datetime.now().astimezone().date() | |
| start_date = end_date - timedelta(days=7) | |
| else: | |
| start_date = isoparse(start_date).date() | |
| end_date = isoparse(end_date).date() | |
| response = supabase.table("ConversAI_ChatHistory").select("*").execute().data | |
| token_usage_by_date = defaultdict(int) | |
| for i in response: | |
| timestamp = isoparse(i["timestamp"]) | |
| if start_date <= timestamp.date() <= end_date: | |
| date = timestamp.date() | |
| response_token_count = i.get("ResponseTokenCount") | |
| if response_token_count is not None: | |
| token_usage_by_date[date] += response_token_count | |
| data = [{"date": date.isoformat(), "total_tokens": total_tokens} for date, total_tokens in | |
| token_usage_by_date.items()] | |
| return {"data": data} | |
| async def add_feedback(request: Request, feedback: str, user_id: str): | |
| client_ip = request.client.host | |
| city = get_ip_info(client_ip) | |
| response = supabase.table("ConversAI_Feedback").insert( | |
| {"feedback": feedback, "user_id": user_id, "city": city, "ip": client_ip}).execute() | |
| return {"message": "success"} | |
| async def user_satisfaction_rate( | |
| start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), | |
| end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") | |
| ): | |
| if not start_date or not end_date: | |
| end_date = datetime.now().astimezone().date() | |
| start_date = end_date - timedelta(days=7) | |
| else: | |
| start_date = isoparse(start_date).date() | |
| end_date = isoparse(end_date).date() | |
| response = supabase.table("ConversAI_Feedback").select("*").execute().data | |
| feedback_counts = defaultdict(lambda: {"like": 0, "dislike": 0}) | |
| for i in response: | |
| timestamp = isoparse(i["timestamp"]) | |
| if start_date <= timestamp.date() <= end_date: | |
| date = timestamp.date() | |
| feedback = i.get("feedback") | |
| if feedback == "like": | |
| feedback_counts[date]["like"] += 1 | |
| elif feedback == "dislike": | |
| feedback_counts[date]["dislike"] += 1 | |
| data = [] | |
| for date in sorted(feedback_counts.keys()): | |
| like_count = feedback_counts[date]["like"] | |
| dislike_count = feedback_counts[date]["dislike"] | |
| total_feedback = like_count + dislike_count | |
| satisfaction_rate = (like_count / total_feedback * 100) if total_feedback > 0 else 0 | |
| data.append({"date": date.isoformat(), "rate": satisfaction_rate}) | |
| return {"data": data} |