Spaces:
Running
Running
| """ | |
| REALTIME GROQ SERVICE MODULE | |
| ============================= | |
| Extents GroqService to add Tavily web search before calling the LLM. Used by | |
| ChatService for POST /chat/realtime. Same session and history as general chat; | |
| the only difference is we run a Tavily search for the user's question and add | |
| the results to the system message, them call Groq. | |
| ROUND-ROBIN API KEYS: | |
| - Shares the same round-robin counter as GroqService (class-level _shared_key_index) | |
| - This means /chat and /chat/realtime requests use the same rotation sequence | |
| - Example: If /chat uses key 1, the next /chat/realtime request will use key 2 | |
| - All API key usage is logged wih masked keys for security and debugging | |
| FLOW: | |
| 1. search_tavily(question): call Tavily API, format results as text (or "" on failure). | |
| 2. get_response(question, chat_history): add search results to system message, | |
| then same as parent: retrieve context from vector store, build prompt , call Groq. | |
| If TAVILY_API_KEY is not set, tavily_clinet is None and search_tavily returns ""; | |
| the user still gets an answer from Groq with no search results. | |
| """ | |
| from typing import List, Optional, Iterator, Any | |
| from tavily import TavilyClient | |
| import logging | |
| import os | |
| import time | |
| from app.services.groq_service import GroqService, escape_curly_braces, AllGroqApisFailedError | |
| from app.services.vector_store import VectorStoreService | |
| from app.utils.retry import with_retry | |
| from config import REALTIME_CHAT_ADDENDUM, GROQ_API_KEYS, GROQ_MODEL | |
| logger = logging.getLogger("J.A.R.V.I.S") | |
| GROQ_REQUEST_TIMEOUT_FAST = 15 | |
| _QUERY_EXTRACTION_PROMPT = ( | |
| "You are a search query optimizer. Given the user's message and recent conversation, " | |
| "produce a single, focused web search query (max 12 word) that will find the " | |
| "information the user needs. Resolve any references (like 'that website ', 'him', 'it) " | |
| "using the conversation history. Output ONLY the search query, nothing else." | |
| ) | |
| # ============================================================================== | |
| # REALTIME GROQ SERVICE CLASS (extends GroqService) | |
| # ============================================================================== | |
| class RealtimeGroqService(GroqService): | |
| """ | |
| Same as GroqService but runs a Tavily web search first and adds the results | |
| to the system message. If Tavily is missing or fails, we still call Groq with | |
| no search results (user gets and answer without real-time data). | |
| """ | |
| def __init__(self, vector_store_service: VectorStoreService): | |
| """Call parent init (Groq LLM + vector store); then create Tavily client if key is set.""" | |
| super().__init__(vector_store_service) | |
| tavily_api_key = os.getenv("TAVILY_API_KEY", "") | |
| if tavily_api_key: | |
| self.tavily_client = TavilyClient(api_key=tavily_api_key) | |
| logger.info("Tavily search client initialized successfully") | |
| else: | |
| self.tavily_client = None | |
| logger.warning("TAVILY_API_KEY not set. Realtime search will be unavailable.") | |
| if GROQ_API_KEYS: | |
| from langchain_groq import ChatGroq | |
| self._fast_llm = ChatGroq( | |
| groq_api_key=GROQ_API_KEYS[0], | |
| model_name=GROQ_MODEL, | |
| temperature=0.0, | |
| request_timeout=GROQ_REQUEST_TIMEOUT_FAST, | |
| max_tokens=50, | |
| ) | |
| else: | |
| self._fast_llm = None | |
| def _extract_search_query( | |
| self, question:str, chat_history: Optional[List[tuple]] = None | |
| ) -> str: | |
| if not self._fast_llm: | |
| return question | |
| try: | |
| t0 = time.perf_counter() | |
| history_context = "" | |
| if chat_history: | |
| recent = chat_history[-3:] | |
| parts = [] | |
| for h, a in recent: | |
| parts.append(f"User: {h[:200]}") | |
| parts.append(f"Assistant: {a[:200]}") | |
| history_context = "\n".join(parts) | |
| if history_context: | |
| full_prompt = ( | |
| f"{_QUERY_EXTRACTION_PROMPT}\n\n" | |
| f"Recent conversation:\n{history_context}\n\n" | |
| f"User's latest message: {question}\n\n" | |
| f"Search query:" | |
| ) | |
| else: | |
| full_prompt = ( | |
| f"{_QUERY_EXTRACTION_PROMPT}\n\n" | |
| f"User's message: {question}\n\n" | |
| f"Search query:" | |
| ) | |
| response = self._fast_llm.invoke(full_prompt) | |
| extracted = response.content.strip().strip('"').strip("'") | |
| if extracted and 3<= len(extracted) <= 200: | |
| logger.info( | |
| "[REALTIME] Query extraction: '%s' -> '%s' (%.3fs)", | |
| question[:80], extracted[:80], time.perf_counter() - t0, | |
| ) | |
| return extracted | |
| logger.warning("[REALTIME] Query extraction returned unusable result, using raw question") | |
| return question | |
| except Exception as e: | |
| logger.error("[REALTIME] Error extracting search query: %s", e) | |
| return question | |
| def search_tavily(self, query: str, num_results: int = 7) -> str: | |
| """ | |
| Call Tavily API with the given query and return formatted result text for the prompt. | |
| On any failure (no key, rate limit, network) we return "" so the LLM still gets a reply. | |
| """ | |
| if not self.tavily_client: | |
| logger.warning("Tavily client not initialized. TAVILY_API_KEY not set.") | |
| return ("",None) | |
| t0 = time.perf_counter() | |
| try: | |
| # Perform Tavily search with retries for rate limits and transient errors. | |
| response = with_retry( | |
| lambda: self.tavily_client.search( | |
| query=query, | |
| search_depth="advanced", | |
| max_results=num_results, | |
| include_answer=False, | |
| include_raw_content=False, | |
| ), | |
| max_retries=3, | |
| initial_delay=1.0, | |
| ) | |
| results = response.get('results', []) | |
| ai_answer = response.get("answer", "") | |
| if not results and not ai_answer: | |
| logger.warning(f"No Tavily search results found for query: {query}") | |
| return ("",None) | |
| payload: Optional[dict] = { | |
| "query": query, | |
| "answer": ai_answer, | |
| "results": [ | |
| { | |
| "title": r.get("title", "No title"), | |
| "content": (r.get("content") or "")[:500], | |
| "url": r.get("url", ""), | |
| "score": round(float(r.get("score", 0)), 2), | |
| } | |
| for r in results[:num_results] | |
| ], | |
| } | |
| parts = [f"=== WEB SEARCH RESULTS FOR: {query} ===\n"] | |
| if ai_answer: | |
| parts.append(f"AI-SYNTHESIZED ANSWER (use this as your primary source):\n{ai_answer}\n") | |
| if results: | |
| parts.append("INDIVIDUAL SOURCES:") | |
| for i, results in enumerate(results[:num_results], 1): | |
| title = results.get("title", "No title") | |
| content = results.get("content", "") | |
| url = results.get("url", "") | |
| score = results.get("score", 0) | |
| parts.append(f"\n[Source {i}] relevance: {score:.2f}") | |
| parts.append(f"Title: {title}") | |
| if content: | |
| parts.append(f"Content: {content}") | |
| if url: | |
| parts.append(f"URL: {url}") | |
| parts.append("\n=== END SEARCH RESULTS ===") | |
| formatted = "\n".join(parts) | |
| logger.info( | |
| "[TAVILY] %d results, AI answer: %s, formatted: %d chars (%.3fs)", | |
| len(results), "yes" if ai_answer else "no", | |
| len(formatted), time.perf_counter() - t0, | |
| ) | |
| return (formatted, payload) | |
| except Exception as e: | |
| logger.error("Error performing Tavily search: %s", e) | |
| return ("", None) | |
| def get_response(self, question: str, chat_history: Optional[List[tuple]] = None) -> str: | |
| """ | |
| Run Tavily search for the question, add results to system message, then call the Groq | |
| via the parent's _invoke_llm (same multi-key round-robin and fallback as general chat). | |
| """ | |
| try: | |
| search_query = self._extract_search_query(question, chat_history) | |
| logger.info("[REALTIME] Searching Tavily for: %s", search_query) | |
| formatted_results, _ = self.search_tavily(search_query, num_results=7) | |
| if formatted_results: | |
| logger.info("[REALTIME] Tavily returned results (length: %d chars)", len(formatted_results)) | |
| else: | |
| logger.warning("[REALTIME] Tavily returned no results for: %s", search_query) | |
| extra_parts = [escape_curly_braces(formatted_results)] if formatted_results else None | |
| prompt, messages = self._build_prompt_and_messages( | |
| question, chat_history, | |
| extra_system_parts=extra_parts, | |
| mode_addendum=REALTIME_CHAT_ADDENDUM, | |
| ) | |
| t0 = time.perf_counter() | |
| response_content = self._invoke_llm(prompt, messages, question) | |
| logger.info("[TIMING] groq_api: %.3fs", time.perf_counter() - t0) | |
| logger.info( | |
| "[RESPONSE] Realtime chat | Length: %d chars | Preview: %.120s", | |
| len(response_content), response_content, | |
| ) | |
| return response_content | |
| except AllGroqApisFailedError: | |
| raise | |
| except Exception as e: | |
| logger.error("Error in realtime get_response: %s", e, exc_info=True) | |
| raise | |
| def stream_response(self, question: str, chat_history: Optional[List[tuple]] = None) -> Iterator[Any]: | |
| try: | |
| search_query = self._extract_search_query(question, chat_history) | |
| logger.info("[REALTIME] Searching Tavily for: %s", search_query) | |
| formatted_results, payload = self.search_tavily(search_query, num_results=7) | |
| if formatted_results: | |
| logger.info("[REALTIME] Tavily returned results (length: %d chars)", len(formatted_results)) | |
| else: | |
| logger.warning("[REALTIME] Tavily returned no results for: %s", search_query) | |
| if payload: | |
| yield {"_search_results": payload} | |
| extra_parts = [escape_curly_braces(formatted_results)] if formatted_results else None | |
| prompt, messages = self._build_prompt_and_messages( | |
| question, chat_history, | |
| extra_system_parts=extra_parts, | |
| mode_addendum=REALTIME_CHAT_ADDENDUM, | |
| ) | |
| yield from self._stream_llm(prompt, messages, question) | |
| logger.info("[REALTIME] stream completed for %s", search_query) | |
| except AllGroqApisFailedError: | |
| raise | |
| except Exception as e: | |
| logger.error("Error in realtime stream_response: %s", e, exc_info=True) | |
| raise | |