KeenWoo commited on
Commit
ef69b49
·
verified ·
1 Parent(s): acec07e

Delete alz_companion/agent.py

Browse files
Files changed (1) hide show
  1. alz_companion/agent.py +0 -450
alz_companion/agent.py DELETED
@@ -1,450 +0,0 @@
1
- from __future__ import annotations
2
- import os
3
- import json
4
- import base64
5
- import time
6
- import tempfile
7
- import re # <-- ADD THIS LINE
8
-
9
- from typing import List, Dict, Any, Optional
10
-
11
- # OpenAI for LLM (optional)
12
- try:
13
- from openai import OpenAI
14
- except Exception: # pragma: no cover
15
- OpenAI = None # type: ignore
16
-
17
- # LangChain & RAG
18
- from langchain.schema import Document
19
- from langchain_community.vectorstores import FAISS
20
- from langchain_community.embeddings import HuggingFaceEmbeddings
21
-
22
- # TTS
23
- try:
24
- from gtts import gTTS
25
- except Exception: # pragma: no cover
26
- gTTS = None # type: ignore
27
-
28
-
29
- from .prompts import (
30
- SYSTEM_TEMPLATE, ANSWER_TEMPLATE_CALM, ANSWER_TEMPLATE_ADQ,
31
- SAFETY_GUARDRAILS, RISK_FOOTER, render_emotion_guidelines, CLASSIFICATION_PROMPT,
32
- # Add the new templates to the import list
33
- ROUTER_PROMPT,
34
- ANSWER_TEMPLATE_FACTUAL,
35
- ANSWER_TEMPLATE_GENERAL_KNOWLEDGE,
36
- ANSWER_TEMPLATE_GENERAL,
37
- QUERY_EXPANSION_PROMPT
38
- )
39
-
40
- # -----------------------------
41
- # Multimodal Processing Functions
42
- # -----------------------------
43
-
44
- def _openai_client() -> Optional[OpenAI]:
45
- api_key = os.getenv("OPENAI_API_KEY", "").strip()
46
- return OpenAI(api_key=api_key) if api_key and OpenAI else None
47
-
48
- # In agent.py
49
-
50
- def describe_image(image_path: str) -> str:
51
- """Uses a vision model to describe an image for context."""
52
- client = _openai_client()
53
- if not client:
54
- return "(Image description failed: OpenAI API key not configured.)"
55
-
56
- try:
57
- # --- FIX START ---
58
- # Determine the MIME type based on the file extension
59
- extension = os.path.splitext(image_path)[1].lower()
60
- if extension == ".png":
61
- mime_type = "image/png"
62
- elif extension in [".jpg", ".jpeg"]:
63
- mime_type = "image/jpeg"
64
- elif extension == ".gif":
65
- mime_type = "image/gif"
66
- elif extension == ".webp":
67
- mime_type = "image/webp"
68
- else:
69
- # Default to JPEG, but this handles the most common cases
70
- mime_type = "image/jpeg"
71
- # --- FIX END ---
72
-
73
- with open(image_path, "rb") as image_file:
74
- base64_image = base64.b64encode(image_file.read()).decode('utf-8')
75
-
76
- response = client.chat.completions.create(
77
- model="gpt-4o",
78
- messages=[
79
- {
80
- "role": "user",
81
- "content": [
82
- {"type": "text", "text": "Describe this image in a concise, factual way for a memory journal. Focus on people, places, and key objects. For example: 'A photo of John and Mary smiling on a bench at the park.'"},
83
- {
84
- "type": "image_url",
85
- # Use the dynamically determined MIME type
86
- "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}
87
- }
88
- ],
89
- }
90
- ],
91
- max_tokens=100,
92
- )
93
- return response.choices[0].message.content or "No description available."
94
- except Exception as e:
95
- return f"[Image description error: {e}]"
96
-
97
- # -----------------------------
98
- # NLU Classification Function
99
- # -----------------------------
100
- # Since the LLM's response will now contain both a <thinking> block and a JSON block,
101
- # we need to update the detect_tags_from_query function to correctly parse it.
102
-
103
- # In agent.py
104
-
105
- # def detect_tags_from_query(query: str, behavior_options: list, emotion_options: list, topic_options: list, context_options: list) -> Dict[str, Any]:
106
- def detect_tags_from_query(query: str, behavior_options: list, emotion_options: list, topic_options: list, context_options: list, settings: dict = None) -> Dict[str, Any]:
107
- """Uses a Chain-of-Thought prompt to classify the user's query."""
108
- behavior_str = ", ".join(f'"{opt}"' for opt in behavior_options if opt != "None")
109
- emotion_str = ", ".join(f'"{opt}"' for opt in emotion_options if opt != "None")
110
- topic_str = ", ".join(f'"{opt}"' for opt in topic_options if opt != "None")
111
- context_str = ", ".join(f'"{opt}"' for opt in context_options if opt != "None")
112
-
113
- prompt = CLASSIFICATION_PROMPT.format(
114
- behavior_options=behavior_str,
115
- emotion_options=emotion_str,
116
- topic_options=topic_str,
117
- context_options=context_str,
118
- query=query
119
- )
120
-
121
- messages = [{"role": "system", "content": "You are a helpful NLU classification assistant. Follow the instructions precisely."}, {"role": "user", "content": prompt}]
122
- response_str = call_llm(messages, temperature=0.1)
123
-
124
- # logging based on debug mode
125
- if settings and settings.get("debug_mode"):
126
- print(f"\n--- NLU Full Response ---\n{response_str}\n-----------------------\n")
127
-
128
- result_dict = {
129
- "detected_behaviors": [], "detected_emotion": "None",
130
- "detected_topic": "None", "detected_contexts": []
131
- }
132
-
133
- try:
134
- # --- ROBUST PARSING LOGIC ---
135
- # Find the first '{' and the last '}' to isolate the JSON object
136
- start_brace = response_str.find('{')
137
- end_brace = response_str.rfind('}')
138
-
139
- if start_brace != -1 and end_brace != -1 and end_brace > start_brace:
140
- json_str = response_str[start_brace : end_brace + 1]
141
- result = json.loads(json_str)
142
-
143
-
144
- # Safely process the results from the LLM
145
- behaviors = result.get("detected_behaviors")
146
- result_dict["detected_behaviors"] = [b for b in behaviors if b in behavior_options] if behaviors else []
147
-
148
- emotion = result.get("detected_emotion")
149
- result_dict["detected_emotion"] = emotion if emotion in emotion_options else "None"
150
-
151
- topic = result.get("detected_topic")
152
- result_dict["detected_topic"] = topic if topic in topic_options else "None"
153
-
154
- contexts = result.get("detected_contexts") # This could be None
155
- result_dict["detected_contexts"] = [c for c in contexts if c in context_options] if contexts else [] # Check if contexts is not None
156
-
157
- # old code
158
- # result_dict["detected_behaviors"] = [b for b in result.get("detected_behaviors", []) if b in behavior_options]
159
- # result_dict["detected_emotion"] = result.get("detected_emotion") if result.get("detected_emotion") in emotion_options else "None"
160
- # result_dict["detected_topic"] = result.get("detected_topic") if result.get("detected_topic") in topic_options else "None"
161
- # result_dict["detected_contexts"] = [c for c in result.get("detected_contexts", []) if c in context_options]
162
- # --- END OF ROBUST LOGIC ---
163
-
164
- return result_dict
165
- except (json.JSONDecodeError, AttributeError) as e:
166
- print(f"ERROR parsing CoT JSON: {e}")
167
- return result_dict
168
-
169
-
170
- # -----------------------------
171
- # Embeddings & VectorStore
172
- # -----------------------------
173
-
174
- def _default_embeddings():
175
- """Lightweight, widely available model."""
176
- model_name = os.getenv("EMBEDDINGS_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
177
- return HuggingFaceEmbeddings(model_name=model_name)
178
-
179
- def build_or_load_vectorstore(docs: List[Document], index_path: str, is_personal: bool = False) -> FAISS:
180
- os.makedirs(os.path.dirname(index_path), exist_ok=True)
181
- if os.path.isdir(index_path) and os.path.exists(os.path.join(index_path, "index.faiss")):
182
- try:
183
- return FAISS.load_local(index_path, _default_embeddings(), allow_dangerous_deserialization=True)
184
- except Exception:
185
- pass
186
-
187
- if is_personal and not docs:
188
- docs = [Document(page_content="(This is the start of the personal memory journal.)", metadata={"source": "placeholder"})]
189
-
190
- vs = FAISS.from_documents(docs, _default_embeddings())
191
- vs.save_local(index_path)
192
- return vs
193
-
194
- def texts_from_jsonl(path: str) -> List[Document]:
195
- out: List[Document] = []
196
- try:
197
- with open(path, "r", encoding="utf-8") as f:
198
- for i, line in enumerate(f):
199
- line = line.strip()
200
- if not line: continue
201
- obj = json.loads(line)
202
- txt = obj.get("text") or ""
203
- if not isinstance(txt, str) or not txt.strip(): continue
204
-
205
- # fix bugs by adding tags for topic and context
206
- md = {"source": os.path.basename(path), "chunk": i}
207
- for k in ("behaviors", "emotion", "topic_tags", "context_tags"):
208
- if k in obj and obj[k]: # Ensure the key exists and is not empty
209
- md[k] = obj[k]
210
- out.append(Document(page_content=txt, metadata=md))
211
-
212
- except Exception:
213
- return []
214
- return out
215
-
216
- def bootstrap_vectorstore(sample_paths: List[str] | None = None, index_path: str = "data/faiss_index") -> FAISS:
217
- docs: List[Document] = []
218
- for p in (sample_paths or []):
219
- try:
220
- if p.lower().endswith(".jsonl"):
221
- docs.extend(texts_from_jsonl(p))
222
- else:
223
- with open(p, "r", encoding="utf-8", errors="ignore") as fh:
224
- docs.append(Document(page_content=fh.read(), metadata={"source": os.path.basename(p)}))
225
- except Exception:
226
- continue
227
- if not docs:
228
- docs = [Document(page_content="(empty index)", metadata={"source": "placeholder"})]
229
- return build_or_load_vectorstore(docs, index_path=index_path)
230
-
231
- # -----------------------------
232
- # LLM Call
233
- # -----------------------------
234
- # updated the detect_tags_from_query function to call call_llm with a new stop argument,
235
- # but I failed to update the call_llm function itself to accept that argument.
236
- # Now fix call_llm function:
237
- def call_llm(messages: List[Dict[str, str]], temperature: float = 0.6, stop: Optional[List[str]] = None) -> str:
238
- """Call OpenAI Chat Completions if available; else return a fallback."""
239
- client = _openai_client()
240
- model = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
241
- if not client:
242
- return "(Offline Mode: OpenAI API key not configured.)"
243
- try:
244
- # Prepare arguments for the API call to handle the optional 'stop' parameter
245
- api_args = {
246
- "model": model,
247
- "messages": messages,
248
- "temperature": float(temperature if temperature is not None else 0.6)
249
- }
250
- if stop:
251
- api_args["stop"] = stop
252
-
253
- resp = client.chat.completions.create(**api_args)
254
- return (resp.choices[0].message.content or "").strip()
255
- except Exception as e:
256
- return f"[LLM API Error: {e}]"
257
-
258
-
259
- # -----------------------------
260
- # Prompting & RAG Chain
261
- # -----------------------------
262
-
263
- def _format_sources(docs: List[Document]) -> List[str]:
264
- return list(set(d.metadata.get("source", "unknown") for d in docs))
265
-
266
- # In agent.py, replace the existing make_rag_chain function with this new one to handle general & specific conversations .
267
- # The logic for the "factual_question" path needs to be updated to perform the expansion query
268
-
269
- def make_rag_chain(
270
- vs_general: FAISS,
271
- vs_personal: FAISS,
272
- *,
273
- role: str = "patient",
274
- temperature: float = 0.6,
275
- language: str = "English",
276
- patient_name: str = "the patient",
277
- caregiver_name: str = "the caregiver",
278
- tone: str = "warm",
279
- ):
280
- """Returns a callable that performs the complete, intelligent RAG process."""
281
-
282
- def _format_docs(docs: List[Document], default_msg: str) -> str:
283
- if not docs: return default_msg
284
- unique_docs = {doc.page_content: doc for doc in docs}.values()
285
- return "\n".join([f"- {d.page_content.strip()}" for d in unique_docs])
286
-
287
- # def _answer_fn(query: str, chat_history: List[Dict[str, str]], scenario_tag: Optional[str] = None, emotion_tag: Optional[str] = None) -> Dict[str, Any]:
288
- def _answer_fn(query: str, chat_history: List[Dict[str, str]], scenario_tag: Optional[str] = None, emotion_tag: Optional[str] = None, topic_tag: Optional[str] = None, context_tags: Optional[List[str]] = None) -> Dict[str, Any]:
289
-
290
- router_messages = [{"role": "user", "content": ROUTER_PROMPT.format(query=query)}]
291
- query_type = call_llm(router_messages, temperature=0.0).strip().lower()
292
- print(f"Query classified as: {query_type}")
293
-
294
- system_message = SYSTEM_TEMPLATE.format(tone=tone, language=language, patient_name=patient_name or "the patient", caregiver_name=caregiver_name or "the caregiver", guardrails=SAFETY_GUARDRAILS)
295
- messages = [{"role": "system", "content": system_message}]
296
- messages.extend(chat_history)
297
-
298
- # --- NEW 'general_knowledge_question' PATH ---
299
- if "general_knowledge_question" in query_type:
300
- user_prompt = ANSWER_TEMPLATE_GENERAL_KNOWLEDGE.format(question=query, language=language)
301
- messages.append({"role": "user", "content": user_prompt})
302
- answer = call_llm(messages, temperature=temperature)
303
- return {"answer": answer, "sources": ["General Knowledge"]}
304
- # --- END NEW PATH ---
305
-
306
- elif "factual_question" in query_type:
307
- # ... (This entire section for query expansion and factual search remains the same)
308
- print(f"Performing query expansion for: '{query}'")
309
- expansion_prompt = QUERY_EXPANSION_PROMPT.format(question=query)
310
- expansion_response = call_llm([{"role": "user", "content": expansion_prompt}], temperature=0.1)
311
-
312
- try:
313
- clean_response = expansion_response.strip().replace("```json", "").replace("```", "")
314
- expanded_queries = json.loads(clean_response)
315
- search_queries = [query] + expanded_queries
316
- except json.JSONDecodeError:
317
- search_queries = [query]
318
-
319
- print(f"Searching with queries: {search_queries}")
320
- retriever_personal = vs_personal.as_retriever(search_kwargs={"k": 2})
321
- retriever_general = vs_general.as_retriever(search_kwargs={"k": 2})
322
-
323
- all_docs = []
324
- for q in search_queries:
325
- all_docs.extend(retriever_personal.invoke(q))
326
- all_docs.extend(retriever_general.invoke(q))
327
-
328
- context = _format_docs(all_docs, "(No relevant information found in the memory journal.)")
329
-
330
- user_prompt = ANSWER_TEMPLATE_FACTUAL.format(context=context, question=query, language=language)
331
- messages.append({"role": "user", "content": user_prompt})
332
- answer = call_llm(messages, temperature=temperature)
333
- return {"answer": answer, "sources": _format_sources(all_docs)}
334
-
335
- elif "general_conversation" in query_type:
336
- user_prompt = ANSWER_TEMPLATE_GENERAL.format(question=query, language=language)
337
- messages.append({"role": "user", "content": user_prompt})
338
- answer = call_llm(messages, temperature=temperature)
339
- return {"answer": answer, "sources": []}
340
-
341
- else: # Default to the original caregiving logic
342
- # ... (This entire section for caregiving scenarios remains the same)
343
- search_filter = {}
344
- if scenario_tag and scenario_tag != "None":
345
- search_filter["behaviors"] = scenario_tag.lower()
346
- if emotion_tag and emotion_tag != "None":
347
- search_filter["emotion"] = emotion_tag.lower()
348
- # fix bug by adding topic tag and context tag
349
- if topic_tag and topic_tag != "None": # <-- ADD THESE TWO LINES
350
- search_filter["topic_tags"] = topic_tag.lower()
351
- if context_tags: # <-- ADD THESE TWO LINES
352
- search_filter["context_tags"] = {"in": [tag.lower() for tag in context_tags]}
353
-
354
- # --- Robust Search Strategy ---
355
- # 1. Start with a general, unfiltered search to always get text-based matches.
356
- retriever_personal = vs_personal.as_retriever(search_kwargs={"k": 3})
357
- retriever_general = vs_general.as_retriever(search_kwargs={"k": 3})
358
-
359
- personal_docs = retriever_personal.invoke(query)
360
- general_docs = retriever_general.invoke(query)
361
-
362
- # 2. If filters exist, perform a second, more specific search and add the results.
363
- if search_filter:
364
- print(f"Performing additional search with filter: {search_filter}")
365
- personal_docs.extend(vs_personal.similarity_search(query, k=3, filter=search_filter))
366
- general_docs.extend(vs_general.similarity_search(query, k=3, filter=search_filter))
367
-
368
- # 3. Combine and de-duplicate the results to get the best of both searches.
369
- all_personal_docs = list({doc.page_content: doc for doc in personal_docs}.values())
370
- all_general_docs = list({doc.page_content: doc for doc in general_docs}.values())
371
-
372
- # 4. Define the context variables based on the new, combined results.
373
- personal_context = _format_docs(all_personal_docs, "(No relevant personal memories found.)")
374
- general_context = _format_docs(all_general_docs, "(No general guidance found.)")
375
-
376
- first_emotion = None
377
- all_docs_care = all_personal_docs + all_general_docs
378
-
379
- # -- end of Robust Search Strategy
380
-
381
-
382
- for doc in all_docs_care:
383
- if "emotion" in doc.metadata and doc.metadata["emotion"]:
384
- emotion_data = doc.metadata["emotion"]
385
- if isinstance(emotion_data, list): first_emotion = emotion_data[0]
386
- else: first_emotion = emotion_data
387
- if first_emotion: break
388
-
389
- emotions_context = render_emotion_guidelines(first_emotion or emotion_tag)
390
- is_tagged_scenario = (scenario_tag and scenario_tag != "None") or (emotion_tag and emotion_tag != "None") or (first_emotion is not None)
391
- template = ANSWER_TEMPLATE_ADQ if is_tagged_scenario else ANSWER_TEMPLATE_CALM
392
-
393
- if template == ANSWER_TEMPLATE_ADQ:
394
- user_prompt = template.format(general_context=general_context, personal_context=personal_context, question=query, scenario_tag=scenario_tag, emotions_context=emotions_context, role=role, language=language)
395
- else:
396
- combined_context = f"General Guidance:\n{general_context}\n\nPersonal Memories:\n{personal_context}"
397
- user_prompt = template.format(context=combined_context, question=query, language=language)
398
-
399
- messages.append({"role": "user", "content": user_prompt})
400
- answer = call_llm(messages, temperature=temperature)
401
-
402
- high_risk_scenarios = ["exit_seeking", "wandering", "elopement"]
403
- if scenario_tag and scenario_tag.lower() in high_risk_scenarios:
404
- answer += f"\n\n---\n{RISK_FOOTER}"
405
-
406
- return {"answer": answer, "sources": _format_sources(all_docs_care)}
407
-
408
- return _answer_fn
409
-
410
-
411
- # Fix bug by adding topic tag ... how about context tag??
412
- def answer_query(chain, question: str, **kwargs) -> Dict[str, Any]:
413
- if not callable(chain): return {"answer": "[Error: RAG chain is not callable]", "sources": []}
414
- chat_history = kwargs.get("chat_history", [])
415
- scenario_tag = kwargs.get("scenario_tag")
416
- emotion_tag = kwargs.get("emotion_tag")
417
- topic_tag = kwargs.get("topic_tag") # <-- ADD THIS LINE
418
- context_tags = kwargs.get("context_tags") # <-- ADD THIS LINE
419
- try:
420
- return chain(question, chat_history=chat_history, scenario_tag=scenario_tag, emotion_tag=emotion_tag, topic_tag=topic_tag) # <-- ADD topic_tag
421
- except Exception as e:
422
- print(f"ERROR in answer_query: {e}")
423
- return {"answer": f"[Error executing chain: {e}]", "sources": []}
424
-
425
-
426
- # -----------------------------
427
- # TTS & Transcription
428
- # -----------------------------
429
- def synthesize_tts(text: str, lang: str = "en"):
430
- if not text or gTTS is None: return None
431
- try:
432
- fd, path = tempfile.mkstemp(suffix=".mp3")
433
- os.close(fd)
434
- tts = gTTS(text=text, lang=(lang or "en"))
435
- tts.save(path)
436
- return path
437
- except Exception:
438
- return None
439
-
440
- def transcribe_audio(filepath: str, lang: str = "en"):
441
- client = _openai_client()
442
- if not client:
443
- return "[Transcription failed: API key not configured]"
444
- api_args = {"model": "whisper-1"}
445
- if lang and lang != "auto":
446
- api_args["language"] = lang
447
- with open(filepath, "rb") as audio_file:
448
- transcription = client.audio.transcriptions.create(file=audio_file, **api_args)
449
- return transcription.text
450
-