Spaces:
Sleeping
Sleeping
File size: 13,823 Bytes
313b367 afcbab5 8c2204f 313b367 a8eddfb 313b367 a8eddfb 313b367 afcbab5 313b367 afcbab5 313b367 afcbab5 313b367 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 d7f4d04 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 8c2204f afcbab5 313b367 afcbab5 8c2204f afcbab5 8c2204f afcbab5 313b367 afcbab5 313b367 afcbab5 313b367 8c2204f 313b367 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 | from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os
import google.generativeai as genai
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from pymongo import MongoClient
import threading
import time
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# CORS Middleware Configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins (Frontend to Backend Communication)
allow_methods=["*"], # Allow all HTTP methods (GET, POST, etc.)
allow_headers=["*"], # Allow all headers
)
# ✅ Configure Gemini API
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
# ✅ Sentence Transformer
model = SentenceTransformer("all-MiniLM-L6-v2")
# ✅ MongoDB
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client["AiWork"]
class QueryRequest(BaseModel):
email: str
query: str
# FAISS Index (Per User)
user_indexes = {} # Stores FAISS index per user {email: FAISS index}
user_sentence_mapping = {} # Maps user emails to (id, sentence) pairs
def fetch_latest_data():
return {
"users": list(db.users.find()),
"teams": list(db.teams.find()),
"projects": list(db.projects.find()),
"modules": list(db.modules.find()),
"documents": list(db.documents.find()),
"schedules": list(db.schedules.find())
}
def generate_sentences(data):
users, teams, projects, modules, documents, schedules = (
data["users"], data["teams"], data["projects"], data["modules"], data["documents"], data["schedules"]
)
user_sentences = {} # Store categorized sentences per user
for user in users:
username = user.get("username", "Unknown User")
email = user.get("email", "Unknown Email")
if email not in user_sentences:
user_sentences[email] = {
"Teams": [],
"Projects": [],
"Modules & Tasks": [],
"Documents": [],
"Schedules": []
}
if not any(user_sentences[email].values()): # No data found for this user
user_sentences[email]["General"] = [f"User {username} is registered but has no assigned data."]
# User team ownership and membership
owned_teams = [team for team in teams if team.get("owner", {}).get("email") == email]
if owned_teams:
team_names = ", ".join(f'"{team["teamName"]}"' for team in owned_teams)
user_sentences[email]["Teams"].append(f"User {username} owns the teams: {team_names}.")
member_teams = [team for team in teams if any(m["email"] == email for m in team.get("members", []))]
if member_teams:
team_names = ", ".join(f'"{team["teamName"]}"' for team in member_teams)
user_sentences[email]["Teams"].append(f"User {username} is a member of the teams: {team_names}.")
# Find projects in teams they own or are part of
relevant_teams = owned_teams + member_teams
team_ids = [str(team["_id"]) for team in relevant_teams]
user_projects = [p for p in projects if str(p.get("owner", {}).get("teamId")) in team_ids]
if user_projects:
for project in user_projects:
proj_name = project["projName"]
team_creator = next((t["teamName"] for t in teams if str(t["_id"]) == str(project.get("owner", {}).get("teamId"))), "Unknown Team")
user_sentences[email]["Projects"].append(f"User {username} is involved in project {proj_name}, created by team {team_creator}.")
# Find modules under this project
proj_modules = [m for m in modules if str(m.get("projId")) == str(project["_id"])]
if proj_modules:
for module in proj_modules:
module_name = module["moduleName"]
user_sentences[email]["Modules & Tasks"].append(f"In project {proj_name}, module {module_name} exists.")
# Find tasks in this module assigned to the user
assigned_tasks = [
task for task in module.get("tasks", [])
if any(a["email"] == email for a in task.get("assignedTo", []))
]
if assigned_tasks:
task_details = ", ".join(
f'"{t["taskName"]}" (Status: {"Inactive" if t.get("status", False) else "Active"})'
for t in assigned_tasks
)
user_sentences[email]["Modules & Tasks"].append(f"Tasks assigned to {username} in {module_name}: {task_details}.")
# Find documents in this project
proj_docs = [d for d in documents if str(d.get("owner", {}).get("projId")) == str(project["_id"])]
if proj_docs:
doc_names = ", ".join(f'"{d["title"]}"' for d in proj_docs)
user_sentences[email]["Documents"].append(f"Documents related to project {proj_name}: {doc_names}.")
# Find meeting schedules related to their teams
user_schedules = [s for s in schedules if str(s.get("teamId")) in team_ids]
if user_schedules:
for schedule in user_schedules:
related_team = next((t["teamName"] for t in teams if str(t["_id"]) == str(schedule.get("teamId"))), "Unknown Team")
related_project = next((p["projName"] for p in projects if str(p["_id"]) == str(schedule.get("projId"))), "Unknown Project")
schedule_detail = f'{schedule["moto"]} scheduled on {schedule["date"]} at {schedule["time"]} for team {related_team} in project {related_project}.'
user_sentences[email]["Schedules"].append(schedule_detail)
return user_sentences
def fetch_initial_data():
data = {
"users": list(db.users.find()),
"teams": list(db.teams.find()),
"projects": list(db.projects.find()),
"modules": list(db.modules.find()),
"documents": list(db.documents.find()),
"schedules": list(db.schedules.find())
}
user_sentences = generate_sentences(data)
user_count = 0 # Track users added to FAISS
for email, categories in user_sentences.items():
sentences = sum(categories.values(), []) # Flatten categorized sentences
print(f"User: {email}, Sentences Count: {len(sentences)}") # Debugging Output
if sentences:
user_count += 1
embedding_dim = model.get_sentence_embedding_dimension()
user_indexes[email] = faiss.IndexFlatL2(embedding_dim)
embeddings = model.encode(sentences, convert_to_numpy=True)
user_indexes[email].add(embeddings)
user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)]
print(f"Total Users Indexed in FAISS: {user_count} / {len(data['users'])}")
def update_user_embeddings(email):
"""
Regenerate structured sentences for the user, update FAISS index.
"""
data = {
"users": list(db.users.find({"email": email})),
"teams": list(db.teams.find()),
"projects": list(db.projects.find()),
"modules": list(db.modules.find()),
"documents": list(db.documents.find()),
"schedules": list(db.schedules.find())
}
user_sentences = generate_sentences(data)
if email in user_sentences:
sentences = sum(user_sentences[email].values(), []) # Flatten structured sentences
if sentences:
embeddings = model.encode(sentences, convert_to_numpy=True)
embedding_dim = model.get_sentence_embedding_dimension()
# Rebuild FAISS index for this user
user_indexes[email] = faiss.IndexFlatL2(embedding_dim)
user_indexes[email].add(embeddings)
user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)]
print(f"Updated embeddings for {email}. Total sentences: {len(sentences)}")
def watch_changes():
"""Monitor MongoDB for changes, identify affected users, and update embeddings dynamically."""
print("Watching MongoDB for changes...")
while True:
try:
with db.watch() as stream: # Watch the entire database
for change in stream:
print("Detected Change:", change) # Debugging print
operation = change["operationType"]
collection_name = change["ns"]["coll"] # Get the collection that changed
doc_id = change["documentKey"]["_id"]
emails = set() # Store affected user emails
# Fetch user email based on the collection that was updated
if collection_name == "users":
full_doc = change.get("fullDocument", {})
if full_doc and "email" in full_doc:
emails.add(full_doc["email"])
elif collection_name == "teams":
team_doc = db.teams.find_one({"_id": doc_id})
if team_doc and "owner" in team_doc:
emails.add(team_doc["owner"].get("email"))
elif collection_name == "projects":
project_doc = db.projects.find_one({"_id": doc_id})
if project_doc and "owner" in project_doc:
emails.add(project_doc["owner"].get("email"))
elif collection_name == "modules":
module_doc = db.modules.find_one({"_id": doc_id})
if module_doc:
# Fetch users assigned to the module
for user in module_doc.get("assignedTo", []):
if "email" in user:
emails.add(user["email"])
elif collection_name == "documents":
doc = db.documents.find_one({"_id": doc_id})
if doc and "owner" in doc:
emails.add(doc["owner"].get("email"))
elif collection_name == "schedules":
schedule_doc = db.schedules.find_one({"_id": doc_id})
if schedule_doc:
team_id = schedule_doc.get("teamId")
team_doc = db.teams.find_one({"_id": team_id})
if team_doc and "owner" in team_doc:
emails.add(team_doc["owner"].get("email"))
if emails:
for email in emails:
print(f"Detected {operation} for user: {email}")
if operation in ["insert", "update", "delete"]:
update_user_embeddings(email)
print(f"Updated user {email} due to {operation} operation.")
else:
print(f"Change detected in {collection_name}, but no associated email found.")
except Exception as e:
print(f"Error in watch_changes(): {e}")
print("Reconnecting to MongoDB Change Stream in 5 seconds...")
time.sleep(5) # Prevent infinite error loops
def get_relevant_sentences(email, query):
"""Retrieve relevant sentences using FAISS for the given user and query."""
if email not in user_indexes:
return ["User not found or no data available."]
index = user_indexes[email] # FAISS index for the user
sentence_data = user_sentence_mapping.get(email, []) # Sentence mapping
if not sentence_data:
return ["No stored sentences for this user."]
# Compute query embedding
query_embedding = model.encode([query], convert_to_numpy=True)
# Perform FAISS search (top-k nearest neighbors)
k = min(100, len(sentence_data)) # Limit k to available sentences
distances, indices = index.search(query_embedding, k)
# Set a similarity threshold to filter results
threshold = 1.5
relevant_sentences = [sentence_data[idx][1] for dist, idx in zip(distances[0], indices[0]) if dist < threshold]
return relevant_sentences if relevant_sentences else ["No relevant information found."]
def generate_response(email, query):
"""Generate a natural language response using Gemini based on FAISS search results."""
relevant_sentences = get_relevant_sentences(email, query)
# Construct prompt using relevant sentences
prompt = f"Query: {query}\nContext:\n" + "\n".join(relevant_sentences) + "\nAnswer in a natural way."
# Use Gemini API to generate response
model = genai.GenerativeModel("gemini-1.5-flash")
response = model.generate_content(prompt)
return response.text if response.text else "I'm unable to find relevant information."
@app.post("/chat")
async def chat(request: QueryRequest):
try:
response = generate_response(request.email, request.query)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
def home():
return {"message": "AI Workspace Backend Running"}
if __name__ == "__main__":
# Fetch initial data for FAISS indexing
fetch_initial_data()
# Start watching for real-time changes in a separate thread
threading.Thread(target=watch_changes, daemon=True).start()
print(f"Active Threads: {threading.active_count()}") # Debugging thread count
uvicorn.run(app, host="0.0.0.0", port=7860)
|