from fastapi import FastAPI, HTTPException from pydantic import BaseModel import os import google.generativeai as genai import faiss import numpy as np from sentence_transformers import SentenceTransformer from pymongo import MongoClient import threading import time import uvicorn from fastapi.middleware.cors import CORSMiddleware app = FastAPI() # CORS Middleware Configuration app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allow all origins (Frontend to Backend Communication) allow_methods=["*"], # Allow all HTTP methods (GET, POST, etc.) allow_headers=["*"], # Allow all headers ) # ✅ Configure Gemini API genai.configure(api_key=os.getenv("GEMINI_API_KEY")) # ✅ Sentence Transformer model = SentenceTransformer("all-MiniLM-L6-v2") # ✅ MongoDB MONGO_URI = os.getenv("MONGO_URI") client = MongoClient(MONGO_URI) db = client["AiWork"] class QueryRequest(BaseModel): email: str query: str # FAISS Index (Per User) user_indexes = {} # Stores FAISS index per user {email: FAISS index} user_sentence_mapping = {} # Maps user emails to (id, sentence) pairs def fetch_latest_data(): return { "users": list(db.users.find()), "teams": list(db.teams.find()), "projects": list(db.projects.find()), "modules": list(db.modules.find()), "documents": list(db.documents.find()), "schedules": list(db.schedules.find()) } def generate_sentences(data): users, teams, projects, modules, documents, schedules = ( data["users"], data["teams"], data["projects"], data["modules"], data["documents"], data["schedules"] ) user_sentences = {} # Store categorized sentences per user for user in users: username = user.get("username", "Unknown User") email = user.get("email", "Unknown Email") if email not in user_sentences: user_sentences[email] = { "Teams": [], "Projects": [], "Modules & Tasks": [], "Documents": [], "Schedules": [] } if not any(user_sentences[email].values()): # No data found for this user user_sentences[email]["General"] = [f"User {username} is registered but has no assigned data."] # User team ownership and membership owned_teams = [team for team in teams if team.get("owner", {}).get("email") == email] if owned_teams: team_names = ", ".join(f'"{team["teamName"]}"' for team in owned_teams) user_sentences[email]["Teams"].append(f"User {username} owns the teams: {team_names}.") member_teams = [team for team in teams if any(m["email"] == email for m in team.get("members", []))] if member_teams: team_names = ", ".join(f'"{team["teamName"]}"' for team in member_teams) user_sentences[email]["Teams"].append(f"User {username} is a member of the teams: {team_names}.") # Find projects in teams they own or are part of relevant_teams = owned_teams + member_teams team_ids = [str(team["_id"]) for team in relevant_teams] user_projects = [p for p in projects if str(p.get("owner", {}).get("teamId")) in team_ids] if user_projects: for project in user_projects: proj_name = project["projName"] team_creator = next((t["teamName"] for t in teams if str(t["_id"]) == str(project.get("owner", {}).get("teamId"))), "Unknown Team") user_sentences[email]["Projects"].append(f"User {username} is involved in project {proj_name}, created by team {team_creator}.") # Find modules under this project proj_modules = [m for m in modules if str(m.get("projId")) == str(project["_id"])] if proj_modules: for module in proj_modules: module_name = module["moduleName"] user_sentences[email]["Modules & Tasks"].append(f"In project {proj_name}, module {module_name} exists.") # Find tasks in this module assigned to the user assigned_tasks = [ task for task in module.get("tasks", []) if any(a["email"] == email for a in task.get("assignedTo", [])) ] if assigned_tasks: task_details = ", ".join( f'"{t["taskName"]}" (Status: {"Inactive" if t.get("status", False) else "Active"})' for t in assigned_tasks ) user_sentences[email]["Modules & Tasks"].append(f"Tasks assigned to {username} in {module_name}: {task_details}.") # Find documents in this project proj_docs = [d for d in documents if str(d.get("owner", {}).get("projId")) == str(project["_id"])] if proj_docs: doc_names = ", ".join(f'"{d["title"]}"' for d in proj_docs) user_sentences[email]["Documents"].append(f"Documents related to project {proj_name}: {doc_names}.") # Find meeting schedules related to their teams user_schedules = [s for s in schedules if str(s.get("teamId")) in team_ids] if user_schedules: for schedule in user_schedules: related_team = next((t["teamName"] for t in teams if str(t["_id"]) == str(schedule.get("teamId"))), "Unknown Team") related_project = next((p["projName"] for p in projects if str(p["_id"]) == str(schedule.get("projId"))), "Unknown Project") schedule_detail = f'{schedule["moto"]} scheduled on {schedule["date"]} at {schedule["time"]} for team {related_team} in project {related_project}.' user_sentences[email]["Schedules"].append(schedule_detail) return user_sentences def fetch_initial_data(): data = { "users": list(db.users.find()), "teams": list(db.teams.find()), "projects": list(db.projects.find()), "modules": list(db.modules.find()), "documents": list(db.documents.find()), "schedules": list(db.schedules.find()) } user_sentences = generate_sentences(data) user_count = 0 # Track users added to FAISS for email, categories in user_sentences.items(): sentences = sum(categories.values(), []) # Flatten categorized sentences print(f"User: {email}, Sentences Count: {len(sentences)}") # Debugging Output if sentences: user_count += 1 embedding_dim = model.get_sentence_embedding_dimension() user_indexes[email] = faiss.IndexFlatL2(embedding_dim) embeddings = model.encode(sentences, convert_to_numpy=True) user_indexes[email].add(embeddings) user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)] print(f"Total Users Indexed in FAISS: {user_count} / {len(data['users'])}") def update_user_embeddings(email): """ Regenerate structured sentences for the user, update FAISS index. """ data = { "users": list(db.users.find({"email": email})), "teams": list(db.teams.find()), "projects": list(db.projects.find()), "modules": list(db.modules.find()), "documents": list(db.documents.find()), "schedules": list(db.schedules.find()) } user_sentences = generate_sentences(data) if email in user_sentences: sentences = sum(user_sentences[email].values(), []) # Flatten structured sentences if sentences: embeddings = model.encode(sentences, convert_to_numpy=True) embedding_dim = model.get_sentence_embedding_dimension() # Rebuild FAISS index for this user user_indexes[email] = faiss.IndexFlatL2(embedding_dim) user_indexes[email].add(embeddings) user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)] print(f"Updated embeddings for {email}. Total sentences: {len(sentences)}") def watch_changes(): """Monitor MongoDB for changes, identify affected users, and update embeddings dynamically.""" print("Watching MongoDB for changes...") while True: try: with db.watch() as stream: # Watch the entire database for change in stream: print("Detected Change:", change) # Debugging print operation = change["operationType"] collection_name = change["ns"]["coll"] # Get the collection that changed doc_id = change["documentKey"]["_id"] emails = set() # Store affected user emails # Fetch user email based on the collection that was updated if collection_name == "users": full_doc = change.get("fullDocument", {}) if full_doc and "email" in full_doc: emails.add(full_doc["email"]) elif collection_name == "teams": team_doc = db.teams.find_one({"_id": doc_id}) if team_doc and "owner" in team_doc: emails.add(team_doc["owner"].get("email")) elif collection_name == "projects": project_doc = db.projects.find_one({"_id": doc_id}) if project_doc and "owner" in project_doc: emails.add(project_doc["owner"].get("email")) elif collection_name == "modules": module_doc = db.modules.find_one({"_id": doc_id}) if module_doc: # Fetch users assigned to the module for user in module_doc.get("assignedTo", []): if "email" in user: emails.add(user["email"]) elif collection_name == "documents": doc = db.documents.find_one({"_id": doc_id}) if doc and "owner" in doc: emails.add(doc["owner"].get("email")) elif collection_name == "schedules": schedule_doc = db.schedules.find_one({"_id": doc_id}) if schedule_doc: team_id = schedule_doc.get("teamId") team_doc = db.teams.find_one({"_id": team_id}) if team_doc and "owner" in team_doc: emails.add(team_doc["owner"].get("email")) if emails: for email in emails: print(f"Detected {operation} for user: {email}") if operation in ["insert", "update", "delete"]: update_user_embeddings(email) print(f"Updated user {email} due to {operation} operation.") else: print(f"Change detected in {collection_name}, but no associated email found.") except Exception as e: print(f"Error in watch_changes(): {e}") print("Reconnecting to MongoDB Change Stream in 5 seconds...") time.sleep(5) # Prevent infinite error loops def get_relevant_sentences(email, query): """Retrieve relevant sentences using FAISS for the given user and query.""" if email not in user_indexes: return ["User not found or no data available."] index = user_indexes[email] # FAISS index for the user sentence_data = user_sentence_mapping.get(email, []) # Sentence mapping if not sentence_data: return ["No stored sentences for this user."] # Compute query embedding query_embedding = model.encode([query], convert_to_numpy=True) # Perform FAISS search (top-k nearest neighbors) k = min(100, len(sentence_data)) # Limit k to available sentences distances, indices = index.search(query_embedding, k) # Set a similarity threshold to filter results threshold = 1.5 relevant_sentences = [sentence_data[idx][1] for dist, idx in zip(distances[0], indices[0]) if dist < threshold] return relevant_sentences if relevant_sentences else ["No relevant information found."] def generate_response(email, query): """Generate a natural language response using Gemini based on FAISS search results.""" relevant_sentences = get_relevant_sentences(email, query) # Construct prompt using relevant sentences prompt = f"Query: {query}\nContext:\n" + "\n".join(relevant_sentences) + "\nAnswer in a natural way." # Use Gemini API to generate response model = genai.GenerativeModel("gemini-1.5-flash") response = model.generate_content(prompt) return response.text if response.text else "I'm unable to find relevant information." @app.post("/chat") async def chat(request: QueryRequest): try: response = generate_response(request.email, request.query) return {"response": response} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/") def home(): return {"message": "AI Workspace Backend Running"} if __name__ == "__main__": # Fetch initial data for FAISS indexing fetch_initial_data() # Start watching for real-time changes in a separate thread threading.Thread(target=watch_changes, daemon=True).start() print(f"Active Threads: {threading.active_count()}") # Debugging thread count uvicorn.run(app, host="0.0.0.0", port=7860)