ADKU commited on
Commit
8c2204f
·
verified ·
1 Parent(s): a8eddfb

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +179 -48
app.py CHANGED
@@ -6,6 +6,8 @@ import faiss
6
  import numpy as np
7
  from sentence_transformers import SentenceTransformer
8
  from pymongo import MongoClient
 
 
9
  import uvicorn
10
  from fastapi.middleware.cors import CORSMiddleware
11
 
@@ -34,6 +36,10 @@ class QueryRequest(BaseModel):
34
  email: str
35
  query: str
36
 
 
 
 
 
37
  def fetch_latest_data():
38
  return {
39
  "users": list(db.users.find()),
@@ -44,18 +50,16 @@ def fetch_latest_data():
44
  "schedules": list(db.schedules.find())
45
  }
46
 
47
- def generate_sentences(db):
48
- # Your existing logic here...
49
-
50
  users, teams, projects, modules, documents, schedules = (
51
- db["users"], db["teams"], db["projects"], db["modules"], db["documents"], db["schedules"]
52
  )
53
  user_sentences = {} # Store categorized sentences per user
54
-
55
  for user in users:
56
  username = user.get("username", "Unknown User")
57
  email = user.get("email", "Unknown Email")
58
-
59
  if email not in user_sentences:
60
  user_sentences[email] = {
61
  "Teams": [],
@@ -64,105 +68,225 @@ def generate_sentences(db):
64
  "Documents": [],
65
  "Schedules": []
66
  }
67
-
 
 
 
68
  # User team ownership and membership
69
  owned_teams = [team for team in teams if team.get("owner", {}).get("email") == email]
70
  if owned_teams:
71
  team_names = ", ".join(f'"{team["teamName"]}"' for team in owned_teams)
72
  user_sentences[email]["Teams"].append(f"User {username} owns the teams: {team_names}.")
73
-
74
  member_teams = [team for team in teams if any(m["email"] == email for m in team.get("members", []))]
75
  if member_teams:
76
  team_names = ", ".join(f'"{team["teamName"]}"' for team in member_teams)
77
  user_sentences[email]["Teams"].append(f"User {username} is a member of the teams: {team_names}.")
78
-
79
  # Find projects in teams they own or are part of
80
  relevant_teams = owned_teams + member_teams
81
  team_ids = [str(team["_id"]) for team in relevant_teams]
82
  user_projects = [p for p in projects if str(p.get("owner", {}).get("teamId")) in team_ids]
83
-
84
  if user_projects:
85
  for project in user_projects:
86
  proj_name = project["projName"]
87
- user_sentences[email]["Projects"].append(f"User {username} is involved in project {proj_name}.")
88
-
 
89
  # Find modules under this project
90
  proj_modules = [m for m in modules if str(m.get("projId")) == str(project["_id"])]
91
  if proj_modules:
92
  for module in proj_modules:
93
  module_name = module["moduleName"]
94
  user_sentences[email]["Modules & Tasks"].append(f"In project {proj_name}, module {module_name} exists.")
95
-
96
  # Find tasks in this module assigned to the user
97
  assigned_tasks = [
98
  task for task in module.get("tasks", [])
99
  if any(a["email"] == email for a in task.get("assignedTo", []))
100
  ]
101
  if assigned_tasks:
102
- task_names = ", ".join(f'"{t["taskName"]}"' for t in assigned_tasks)
103
- user_sentences[email]["Modules & Tasks"].append(f"Tasks assigned to {username} in {module_name}: {task_names}.")
104
-
105
  # Find documents in this project
106
  proj_docs = [d for d in documents if str(d.get("owner", {}).get("projId")) == str(project["_id"])]
107
  if proj_docs:
108
  doc_names = ", ".join(f'"{d["title"]}"' for d in proj_docs)
109
  user_sentences[email]["Documents"].append(f"Documents related to project {proj_name}: {doc_names}.")
110
-
111
  # Find meeting schedules related to their teams
112
  user_schedules = [s for s in schedules if str(s.get("teamId")) in team_ids]
113
  if user_schedules:
114
  for schedule in user_schedules:
115
- schedule_detail = f'{schedule["moto"]} scheduled on {schedule["date"]} at {schedule["time"]}.'
 
 
116
  user_sentences[email]["Schedules"].append(schedule_detail)
117
-
118
  return user_sentences
119
 
120
- def update_faiss_index(user_sentences):
121
- # Your existing FAISS logic here...
122
- faiss_indices = {}
 
 
 
 
 
 
 
 
 
 
 
123
  for email, categories in user_sentences.items():
124
- sentences = sum(categories.values(), [])
125
-
126
- if not sentences:
127
- continue
128
 
129
- embeddings = model.encode(sentences, convert_to_numpy=True)
130
- embedding_dim = embeddings.shape[1]
131
-
132
- index = faiss.IndexFlatL2(embedding_dim)
133
- index.add(embeddings)
134
-
135
- faiss_indices[email] = {"index": index, "sentences": sentences}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
 
137
- return faiss_indices
 
 
 
 
138
 
139
- def get_relevant_sentences(email, query, faiss_indices):
140
- # Your FAISS query logic here...
141
- if email not in faiss_indices:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
142
  return ["User not found or no data available."]
143
-
144
- index_data = faiss_indices[email]
145
- index = index_data["index"]
146
- sentences = index_data["sentences"]
 
 
147
 
148
  # Compute query embedding
149
  query_embedding = model.encode([query], convert_to_numpy=True)
150
- k = 100
 
 
151
  distances, indices = index.search(query_embedding, k)
152
 
153
- # Filter sentences based on FAISS similarity threshold
154
  threshold = 1.5
155
- filtered_sentences = [sentences[idx] for dist, idx in zip(distances[0], indices[0]) if dist < threshold]
156
 
157
- return filtered_sentences if filtered_sentences else ["No relevant information found."]
158
 
159
 
160
  def generate_response(email, query):
161
- filtered_sentences = get_relevant_sentences(email, query, faiss_indices)
162
- prompt = f"Query: {query}\n\n" + "\n".join(filtered_sentences)
 
 
 
 
 
163
  model = genai.GenerativeModel("gemini-1.5-flash")
164
  response = model.generate_content(prompt)
165
- return response.text
 
166
 
167
  @app.post("/chat")
168
  async def chat(request: QueryRequest):
@@ -180,4 +304,11 @@ def home():
180
  faiss_indices = update_faiss_index(generate_sentences(fetch_latest_data()))
181
 
182
  if __name__ == "__main__":
 
 
 
 
 
 
 
183
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
6
  import numpy as np
7
  from sentence_transformers import SentenceTransformer
8
  from pymongo import MongoClient
9
+ import threading
10
+ import time
11
  import uvicorn
12
  from fastapi.middleware.cors import CORSMiddleware
13
 
 
36
  email: str
37
  query: str
38
 
39
+ # FAISS Index (Per User)
40
+ user_indexes = {} # Stores FAISS index per user {email: FAISS index}
41
+ user_sentence_mapping = {} # Maps user emails to (id, sentence) pairs
42
+
43
  def fetch_latest_data():
44
  return {
45
  "users": list(db.users.find()),
 
50
  "schedules": list(db.schedules.find())
51
  }
52
 
53
+ def generate_sentences(data):
 
 
54
  users, teams, projects, modules, documents, schedules = (
55
+ data["users"], data["teams"], data["projects"], data["modules"], data["documents"], data["schedules"]
56
  )
57
  user_sentences = {} # Store categorized sentences per user
58
+
59
  for user in users:
60
  username = user.get("username", "Unknown User")
61
  email = user.get("email", "Unknown Email")
62
+
63
  if email not in user_sentences:
64
  user_sentences[email] = {
65
  "Teams": [],
 
68
  "Documents": [],
69
  "Schedules": []
70
  }
71
+
72
+ if not any(user_sentences[email].values()): # No data found for this user
73
+ user_sentences[email]["General"] = [f"User {username} is registered but has no assigned data."]
74
+
75
  # User team ownership and membership
76
  owned_teams = [team for team in teams if team.get("owner", {}).get("email") == email]
77
  if owned_teams:
78
  team_names = ", ".join(f'"{team["teamName"]}"' for team in owned_teams)
79
  user_sentences[email]["Teams"].append(f"User {username} owns the teams: {team_names}.")
80
+
81
  member_teams = [team for team in teams if any(m["email"] == email for m in team.get("members", []))]
82
  if member_teams:
83
  team_names = ", ".join(f'"{team["teamName"]}"' for team in member_teams)
84
  user_sentences[email]["Teams"].append(f"User {username} is a member of the teams: {team_names}.")
85
+
86
  # Find projects in teams they own or are part of
87
  relevant_teams = owned_teams + member_teams
88
  team_ids = [str(team["_id"]) for team in relevant_teams]
89
  user_projects = [p for p in projects if str(p.get("owner", {}).get("teamId")) in team_ids]
90
+
91
  if user_projects:
92
  for project in user_projects:
93
  proj_name = project["projName"]
94
+ team_creator = next((t["teamName"] for t in teams if str(t["_id"]) == str(project.get("owner", {}).get("teamId"))), "Unknown Team")
95
+ user_sentences[email]["Projects"].append(f"User {username} is involved in project {proj_name}, created by team {team_creator}.")
96
+
97
  # Find modules under this project
98
  proj_modules = [m for m in modules if str(m.get("projId")) == str(project["_id"])]
99
  if proj_modules:
100
  for module in proj_modules:
101
  module_name = module["moduleName"]
102
  user_sentences[email]["Modules & Tasks"].append(f"In project {proj_name}, module {module_name} exists.")
103
+
104
  # Find tasks in this module assigned to the user
105
  assigned_tasks = [
106
  task for task in module.get("tasks", [])
107
  if any(a["email"] == email for a in task.get("assignedTo", []))
108
  ]
109
  if assigned_tasks:
110
+ task_details = ", ".join(f'"{t["taskName"]}" (Status: {t.get("status", "Unknown")})' for t in assigned_tasks)
111
+ user_sentences[email]["Modules & Tasks"].append(f"Tasks assigned to {username} in {module_name}: {task_details}.")
112
+
113
  # Find documents in this project
114
  proj_docs = [d for d in documents if str(d.get("owner", {}).get("projId")) == str(project["_id"])]
115
  if proj_docs:
116
  doc_names = ", ".join(f'"{d["title"]}"' for d in proj_docs)
117
  user_sentences[email]["Documents"].append(f"Documents related to project {proj_name}: {doc_names}.")
118
+
119
  # Find meeting schedules related to their teams
120
  user_schedules = [s for s in schedules if str(s.get("teamId")) in team_ids]
121
  if user_schedules:
122
  for schedule in user_schedules:
123
+ related_team = next((t["teamName"] for t in teams if str(t["_id"]) == str(schedule.get("teamId"))), "Unknown Team")
124
+ related_project = next((p["projName"] for p in projects if str(p["_id"]) == str(schedule.get("projId"))), "Unknown Project")
125
+ schedule_detail = f'{schedule["moto"]} scheduled on {schedule["date"]} at {schedule["time"]} for team {related_team} in project {related_project}.'
126
  user_sentences[email]["Schedules"].append(schedule_detail)
127
+
128
  return user_sentences
129
 
130
+ def fetch_initial_data():
131
+ data = {
132
+ "users": list(db.users.find()),
133
+ "teams": list(db.teams.find()),
134
+ "projects": list(db.projects.find()),
135
+ "modules": list(db.modules.find()),
136
+ "documents": list(db.documents.find()),
137
+ "schedules": list(db.schedules.find())
138
+ }
139
+
140
+ user_sentences = generate_sentences(data)
141
+
142
+ user_count = 0 # Track users added to FAISS
143
+
144
  for email, categories in user_sentences.items():
145
+ sentences = sum(categories.values(), []) # Flatten categorized sentences
146
+ print(f"User: {email}, Sentences Count: {len(sentences)}") # Debugging Output
 
 
147
 
148
+ if sentences:
149
+ user_count += 1
150
+ embedding_dim = model.get_sentence_embedding_dimension()
151
+ user_indexes[email] = faiss.IndexFlatL2(embedding_dim)
152
+ embeddings = model.encode(sentences, convert_to_numpy=True)
153
+ user_indexes[email].add(embeddings)
154
+ user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)]
155
+
156
+ print(f"Total Users Indexed in FAISS: {user_count} / {len(data['users'])}")
157
+
158
+ def update_user_embeddings(email):
159
+ """
160
+ Regenerate structured sentences for the user, update FAISS index.
161
+ """
162
+ data = {
163
+ "users": list(db.users.find({"email": email})),
164
+ "teams": list(db.teams.find()),
165
+ "projects": list(db.projects.find()),
166
+ "modules": list(db.modules.find()),
167
+ "documents": list(db.documents.find()),
168
+ "schedules": list(db.schedules.find())
169
+ }
170
+
171
+ user_sentences = generate_sentences(data)
172
+
173
+ if email in user_sentences:
174
+ sentences = sum(user_sentences[email].values(), []) # Flatten structured sentences
175
+ if sentences:
176
+ embeddings = model.encode(sentences, convert_to_numpy=True)
177
+ embedding_dim = model.get_sentence_embedding_dimension()
178
+
179
+ # Rebuild FAISS index for this user
180
+ user_indexes[email] = faiss.IndexFlatL2(embedding_dim)
181
+ user_indexes[email].add(embeddings)
182
+ user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)]
183
+
184
+ print(f"Updated embeddings for {email}. Total sentences: {len(sentences)}")
185
+
186
+ def watch_changes():
187
+ """Monitor MongoDB for changes, identify affected users, and update embeddings dynamically."""
188
+ print("Watching MongoDB for changes...")
189
+
190
+ while True:
191
+ try:
192
+ with db.watch() as stream: # Watch the entire database
193
+ for change in stream:
194
+ print("Detected Change:", change) # Debugging print
195
+
196
+ operation = change["operationType"]
197
+ collection_name = change["ns"]["coll"] # Get the collection that changed
198
+ doc_id = change["documentKey"]["_id"]
199
+
200
+ emails = set() # Store affected user emails
201
 
202
+ # Fetch user email based on the collection that was updated
203
+ if collection_name == "users":
204
+ full_doc = change.get("fullDocument", {})
205
+ if full_doc and "email" in full_doc:
206
+ emails.add(full_doc["email"])
207
 
208
+ elif collection_name == "teams":
209
+ team_doc = db.teams.find_one({"_id": doc_id})
210
+ if team_doc and "owner" in team_doc:
211
+ emails.add(team_doc["owner"].get("email"))
212
+
213
+ elif collection_name == "projects":
214
+ project_doc = db.projects.find_one({"_id": doc_id})
215
+ if project_doc and "owner" in project_doc:
216
+ emails.add(project_doc["owner"].get("email"))
217
+
218
+ elif collection_name == "modules":
219
+ module_doc = db.modules.find_one({"_id": doc_id})
220
+ if module_doc:
221
+ # Fetch users assigned to the module
222
+ for user in module_doc.get("assignedTo", []):
223
+ if "email" in user:
224
+ emails.add(user["email"])
225
+
226
+ elif collection_name == "documents":
227
+ doc = db.documents.find_one({"_id": doc_id})
228
+ if doc and "owner" in doc:
229
+ emails.add(doc["owner"].get("email"))
230
+
231
+ elif collection_name == "schedules":
232
+ schedule_doc = db.schedules.find_one({"_id": doc_id})
233
+ if schedule_doc:
234
+ team_id = schedule_doc.get("teamId")
235
+ team_doc = db.teams.find_one({"_id": team_id})
236
+ if team_doc and "owner" in team_doc:
237
+ emails.add(team_doc["owner"].get("email"))
238
+
239
+ if emails:
240
+ for email in emails:
241
+ print(f"Detected {operation} for user: {email}")
242
+ if operation in ["insert", "update", "delete"]:
243
+ update_user_embeddings(email)
244
+ print(f"Updated user {email} due to {operation} operation.")
245
+ else:
246
+ print(f"Change detected in {collection_name}, but no associated email found.")
247
+
248
+ except Exception as e:
249
+ print(f"Error in watch_changes(): {e}")
250
+ print("Reconnecting to MongoDB Change Stream in 5 seconds...")
251
+ time.sleep(5) # Prevent infinite error loops
252
+
253
+ def get_relevant_sentences(email, query):
254
+ """Retrieve relevant sentences using FAISS for the given user and query."""
255
+ if email not in user_indexes:
256
  return ["User not found or no data available."]
257
+
258
+ index = user_indexes[email] # FAISS index for the user
259
+ sentence_data = user_sentence_mapping.get(email, []) # Sentence mapping
260
+
261
+ if not sentence_data:
262
+ return ["No stored sentences for this user."]
263
 
264
  # Compute query embedding
265
  query_embedding = model.encode([query], convert_to_numpy=True)
266
+
267
+ # Perform FAISS search (top-k nearest neighbors)
268
+ k = min(100, len(sentence_data)) # Limit k to available sentences
269
  distances, indices = index.search(query_embedding, k)
270
 
271
+ # Set a similarity threshold to filter results
272
  threshold = 1.5
273
+ relevant_sentences = [sentence_data[idx][1] for dist, idx in zip(distances[0], indices[0]) if dist < threshold]
274
 
275
+ return relevant_sentences if relevant_sentences else ["No relevant information found."]
276
 
277
 
278
  def generate_response(email, query):
279
+ """Generate a natural language response using Gemini based on FAISS search results."""
280
+ relevant_sentences = get_relevant_sentences(email, query)
281
+
282
+ # Construct prompt using relevant sentences
283
+ prompt = f"Query: {query}\nContext:\n" + "\n".join(relevant_sentences) + "\nAnswer in a natural way."
284
+
285
+ # Use Gemini API to generate response
286
  model = genai.GenerativeModel("gemini-1.5-flash")
287
  response = model.generate_content(prompt)
288
+
289
+ return response.text if response.text else "I'm unable to find relevant information."
290
 
291
  @app.post("/chat")
292
  async def chat(request: QueryRequest):
 
304
  faiss_indices = update_faiss_index(generate_sentences(fetch_latest_data()))
305
 
306
  if __name__ == "__main__":
307
+ # Fetch initial data for FAISS indexing
308
+ fetch_initial_data()
309
+
310
+ # Start watching for real-time changes in a separate thread
311
+ threading.Thread(target=watch_changes, daemon=True).start()
312
+
313
+ print(f"Active Threads: {threading.active_count()}") # Debugging thread count
314
  uvicorn.run(app, host="0.0.0.0", port=7860)