| from fastapi import APIRouter, Depends, status, HTTPException, File, UploadFile, Form |
| from fastapi.responses import JSONResponse |
| from config.db import SessionLocal |
| from sqlalchemy.orm import Session |
| from sqlalchemy import func |
| from decimal import Decimal |
| from typing import Annotated, Dict, List |
| from routers.auth import get_current_user, get_current_user_with_token |
| from models.tables import Chatbot_stats, Company, Queries, QueryUsers,Users |
| from models.schemas import QueryUserResponse |
| from pydantic import HttpUrl, BaseModel |
| import os |
| import logging |
| import pymupdf4llm |
| import pathlib |
| from models.schemas import UpdatePromptRequest |
| from sqlalchemy.exc import NoResultFound |
| from langchain_openai import OpenAIEmbeddings |
| from langchain_qdrant import QdrantVectorStore |
| from qdrant_client.models import Distance, VectorParams |
| from qdrant_client import QdrantClient |
| from uuid import uuid4 |
| import asyncio |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| import time |
| from langchain_core.documents import Document |
|
|
|
|
|
|
|
|
| router = APIRouter(prefix='/admin', tags=['Admin']) |
|
|
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| INPUT_TOKEN_RATE = 0.35 / 1_000_000 |
| OUTPUT_TOKEN_RATE = 0.40 / 1_000_000 |
|
|
| def company_to_dict(company, total_queries) -> Dict: |
| input_token_cost = float(company.input_tokens * INPUT_TOKEN_RATE) |
| output_token_cost = float(company.output_tokens * OUTPUT_TOKEN_RATE) |
|
|
| return { |
| "id": company.id, |
| "company_key": company.company_key, |
| "input_tokens": company.input_tokens, |
| "created_date": company.created_date.isoformat(), |
| "company_name": company.company_name, |
| "base_url": company.base_url, |
| "email": company.email, |
| "output_tokens": company.output_tokens, |
| "input_token_cost": input_token_cost, |
| "output_token_cost": output_token_cost, |
| "total_queries": float(total_queries) |
| } |
|
|
| def chunk_text(text, chunk_size=600, chunk_overlap=60): |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) |
| chunks = text_splitter.split_documents([text]) |
| return chunks |
|
|
| def retry_upsert(client, collection_name, text_chunks, uuids, embeddings, retries=3): |
| |
| for attempt in range(retries): |
| try: |
| logger.info(f"Upsert attempt {attempt + 1}") |
| vector_store = QdrantVectorStore( |
| client=client, |
| collection_name=collection_name, |
| embedding=embeddings, |
| ) |
| vector_store.add_documents(documents=text_chunks, ids=uuids) |
| logger.info(f"Successfully upserted {len(text_chunks)} documents.") |
| break |
| except Exception as e: |
| logger.error(f"Error during upsert attempt {attempt + 1}: {str(e)}") |
| if attempt < retries - 1: |
| logger.info(f"Retrying in {2 ** attempt} seconds...") |
| time.sleep(2 ** attempt) |
| else: |
| logger.error("Max retries reached. Failing the upsert operation.") |
| raise |
|
|
| def get_db(): |
| db = SessionLocal() |
| try: |
| yield db |
| finally: |
| db.close() |
|
|
| db_dependency = Annotated[Session, Depends(get_db)] |
| user_dependency = Annotated[dict, Depends(get_current_user_with_token)] |
|
|
| @router.get("/total-stats") |
| async def get_total_stats(db: db_dependency, user: user_dependency): |
| try: |
| total_input_tokens = db.query(func.sum(Company.input_tokens)).scalar() or 0 |
| total_output_tokens = db.query(func.sum(Company.output_tokens)).scalar() or 0 |
| total_queries = db.query(func.count(Queries.id)).scalar() or 0 |
| |
| total_input_tokens = Decimal(total_input_tokens) |
| total_output_tokens = Decimal(total_output_tokens) |
|
|
| dollar_spend_input = total_input_tokens * Decimal(INPUT_TOKEN_RATE) |
| dollar_spend_output = total_output_tokens * Decimal(OUTPUT_TOKEN_RATE) |
| total_dollar_spend = dollar_spend_input + dollar_spend_output |
|
|
| logger.info("Total stats retrieved successfully.") |
| |
| return JSONResponse(content={ |
| "status": "200", |
| "data": { |
| "input_tokens": float(total_input_tokens), |
| "output_tokens": float(total_output_tokens), |
| "requests": total_queries, |
| "dollar_spend_input": float(dollar_spend_input), |
| "dollar_spend_output": float(dollar_spend_output), |
| "dollar_spend_total": float(total_dollar_spend) |
| } |
| }) |
| except Exception as e: |
| logger.error("Error retrieving total stats: %s", str(e)) |
| raise HTTPException(status_code=500, detail="Something went wrong; not able to get the total stats") |
|
|
| @router.get("/companies", status_code=status.HTTP_200_OK) |
| async def all_companies(db: db_dependency, user: user_dependency): |
| try: |
| companies = db.query(Company).all() |
| total_queries_by_company = ( |
| db.query(Chatbot_stats.company_id, func.sum(Chatbot_stats.total_queries).label("total_queries")) |
| .group_by(Chatbot_stats.company_id) |
| .all() |
| ) |
|
|
| total_queries_dict = {company_id: total_queries for company_id, total_queries in total_queries_by_company} |
| companies_list = [ |
| company_to_dict(company, total_queries_dict.get(company.id, 0)) for company in companies |
| ] |
| logger.info("Fetched %d companies.", len(companies_list)) |
| |
| return JSONResponse(content={ |
| "status": status.HTTP_200_OK, |
| "data": companies_list |
| }) |
| except Exception as e: |
| logger.error("Error fetching companies: %s", str(e)) |
| raise HTTPException(status_code=500, detail="Unable to fetch all companies") |
|
|
| @router.get("/companies/{company_id}/chatbots", status_code=status.HTTP_200_OK) |
| async def get_chatbots(company_id: int, db: db_dependency, user: user_dependency): |
| try: |
| company = db.query(Company).filter(Company.id == company_id).first() |
| if not company: |
| logger.warning("Company not found: %d", company_id) |
| raise HTTPException(status_code=404, detail="Company not found") |
|
|
| chatbots = db.query(Chatbot_stats).filter(Chatbot_stats.company_id == company_id).all() |
| chatbots_list = [] |
|
|
| for chatbot in chatbots: |
| input_token_cost = Decimal(chatbot.total_input_tokens) * Decimal(INPUT_TOKEN_RATE) |
| output_token_cost = Decimal(chatbot.total_output_tokens) * Decimal(OUTPUT_TOKEN_RATE) |
|
|
| chatbot_info = { |
| "chatbot_id": chatbot.chatbot_id, |
| "chatbot_name": chatbot.chatbot_name, |
| "origin_url": chatbot.origin_url, |
| "total_input_tokens": chatbot.total_input_tokens, |
| "total_output_tokens": chatbot.total_output_tokens, |
| "total_queries": chatbot.total_queries, |
| "last_query_time": chatbot.last_query_time.isoformat() if chatbot.last_query_time else None, |
| "input_token_cost": float(input_token_cost), |
| "output_token_cost": float(output_token_cost), |
| "total_token_cost": float(input_token_cost + output_token_cost) |
| } |
|
|
| chatbots_list.append(chatbot_info) |
|
|
| logger.info("Fetched %d chatbots for company ID: %d", len(chatbots_list), company_id) |
| |
| return JSONResponse(content={ |
| "status": status.HTTP_200_OK, |
| "data": chatbots_list |
| }) |
| except Exception as e: |
| logger.error("Error fetching chatbots: %s", str(e)) |
| raise HTTPException(status_code=500, detail="Unable to fetch chatbots") |
|
|
| @router.get("/chatbots/{chatbot_id}/queries", status_code=status.HTTP_200_OK) |
| async def get_queries_by_chatbot(chatbot_id: str, db: db_dependency, user: user_dependency): |
| try: |
| queries = db.query(Queries).filter(Queries.chatbot_id == chatbot_id).all() |
| |
| if not queries: |
| logger.warning("No queries found for chatbot ID: %s", chatbot_id) |
| raise HTTPException(status_code=404, detail="No queries found for this chatbot") |
|
|
| queries_list = [] |
| for query in queries: |
| queries_info = { |
| "id": query.id, |
| "session_id": query.session_id, |
| "query_text_bot": query.query_text_bot, |
| "query_text_user": query.query_text_user, |
| "query_context": query.query_context, |
| "input_tokens": query.input_tokens, |
| "output_tokens": query.output_tokens, |
| "query_time": query.query_time.isoformat() if query.query_time else None, |
| "origin_url": query.origin_url |
| } |
|
|
| queries_list.append(queries_info) |
|
|
| logger.info("Fetched %d queries for chatbot ID: %s", len(queries_list), chatbot_id) |
| |
| return JSONResponse(content={ |
| "status": status.HTTP_200_OK, |
| "data": queries_list |
| }) |
| except Exception as e: |
| logger.error("Error fetching queries: %s", str(e)) |
| raise HTTPException(status_code=500, detail="Unable to fetch queries") |
| |
|
|
|
|
| @router.post("/upload/") |
| async def upload_and_process_files(user: user_dependency, db:db_dependency, files: List[UploadFile] = File(...)): |
| print(user) |
| userdb = db.query(Users).filter(Users.username == user["username"]).first() |
| company = db.query(Company).filter(Company.email == userdb.email).first() |
|
|
| print(user.get("email")) |
| folder_name = f"{company.company_name}-{company.company_key}" |
| dir_path = os.path.join("uploads", folder_name) |
| os.makedirs(dir_path, exist_ok=True) |
|
|
| processed_files = [] |
| try: |
| for file in files: |
| temp_file_path = os.path.join(dir_path, file.filename) |
|
|
| with open(temp_file_path, "wb") as buffer: |
| buffer.write(await file.read()) |
|
|
| logger.info(f"Processing file: {file.filename}") |
|
|
| with open(temp_file_path, "rb") as temp_file: |
| logger.info("Extracting markdown from the PDF") |
| pdf_text = pymupdf4llm.to_markdown(temp_file) |
|
|
| document = Document( |
| page_content=pdf_text, |
| metadata={"source": "Documents"} |
| ) |
| logger.info("Chunking data for processing") |
| text_chunks = chunk_text(document) |
|
|
| logger.info(f"Total text chunks generated: {len(text_chunks)}") |
|
|
| embeddings = OpenAIEmbeddings() |
| client = QdrantClient(url="http://localhost:6333", timeout=18000) |
| logger.info("Client Initialized") |
|
|
| collection_name = company.company_key |
| logger.info(f"Collection Name: {collection_name}") |
| if not client.collection_exists(collection_name): |
| print("hi") |
| logger.info(f"Collection '{collection_name}' does not exist. Creating it.") |
| client.create_collection( |
| collection_name=collection_name, |
| vectors_config=VectorParams(size=1536, distance=Distance.COSINE), |
| ) |
| print("Hi") |
|
|
| |
| logger.info("Storing text chunks in Qdrant") |
| batch_size = 100 |
| for i in range(0, len(text_chunks), batch_size): |
| batch_chunks = text_chunks[i : i + batch_size] |
| batch_uuids = [str(uuid4()) for _ in range(len(batch_chunks))] |
| logger.info(f"Storing batch {i // batch_size + 1}/{(len(text_chunks) // batch_size) + 1}") |
| retry_upsert(client, collection_name, batch_chunks, batch_uuids, embeddings) |
|
|
| logger.info(f"File {file.filename} processed successfully.") |
| processed_files.append(file.filename) |
|
|
| os.remove(temp_file_path) |
|
|
| if not os.listdir(dir_path): |
| os.rmdir(dir_path) |
|
|
| return {"processed_files": processed_files} |
|
|
| except Exception as e: |
| logger.error(f"Error processing files: {str(e)}") |
| raise HTTPException(status_code=500, detail="Error processing files") |
|
|
| finally: |
| if os.path.exists(dir_path) and not os.listdir(dir_path): |
| os.rmdir(dir_path) |
|
|
| |
| @router.put("/chatbot/{chatbot_id}/prompt") |
| async def update_chatbot_prompt(chatbot_id: str, prompt_request: UpdatePromptRequest, user: user_dependency, db: Session = Depends(get_db)): |
| try: |
| chatbot = db.query(Chatbot_stats).filter(Chatbot_stats.chatbot_id == chatbot_id).first() |
| if chatbot is None: |
| raise HTTPException(status_code=404, detail="Chatbot not found") |
| chatbot.chatbot_prompt = prompt_request.chatbot_prompt |
| db.commit() |
| db.refresh(chatbot) |
| |
| return {"message": "Chatbot prompt updated successfully", "chatbot_id": chatbot.chatbot_id, "chatbot_prompt": chatbot.chatbot_prompt} |
|
|
| except NoResultFound: |
| raise HTTPException(status_code=404, detail="Chatbot not found") |
| except Exception as e: |
| db.rollback() |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") |
| |
|
|
| @router.get("/chatbots/{chatbot_id}/query-users", response_model=List[QueryUserResponse], status_code=status.HTTP_200_OK) |
| async def get_users_by_chatbot(chatbot_id: str, db: Session = Depends(get_db)): |
| try: |
| query_users = db.query(QueryUsers).filter(QueryUsers.chatbot_id == chatbot_id).all() |
|
|
| if not query_users: |
| logger.warning("No users found for chatbot ID: %s", chatbot_id) |
| raise HTTPException(status_code=404, detail="No users found for this chatbot") |
|
|
| users_list = [] |
| for query_user in query_users: |
| user_info = { |
| "id": query_user.id, |
| "session_id": query_user.session_id, |
| "ip_address": query_user.ip_address, |
| "origin_url": query_user.origin_url, |
| "timezone":query_user.timezone, |
| "language":query_user.language, |
| "is_mobile":query_user.is_mobile, |
| "user_agent":query_user.user_agent, |
| "platform":query_user.platform, |
| "referrer":query_user.referrer, |
| "location":query_user.location, |
| "network_type":query_user.network_type, |
| "email": query_user.email, |
| "phone_number": query_user.phone_number |
| } |
| users_list.append(user_info) |
|
|
| logger.info("Fetched %d users for chatbot ID: %s", len(users_list), chatbot_id) |
| |
| return users_list |
| except Exception as e: |
| logger.error("Error fetching users: %s", str(e)) |
| raise HTTPException(status_code=500, detail="Unable to fetch users") |