Spaces:
Running
Running
fix(rag-api): session ID merging + FOLLOW_UP streaming leak
Browse filesfix: session_id 'default_session' merging all chats into one
- rag_chat_use_case: generate UUID when session_id is None
- agent_router_use_case: generate UUID when session_id is None
- Eliminates shared 'default_session' / 'anonymous' fallback
- Each new chat now gets a unique UUID session
fix: FOLLOW_UP: tokens leaking into streamed response
- Buffer streaming tokens and stop yielding once FOLLOW_UP: detected
- Users no longer see raw FOLLOW_UP: text in the chat bubble
- Follow-up questions still parsed correctly from full_answer
src/core/use_cases/agent_router_use_case.py
CHANGED
|
@@ -25,7 +25,11 @@ class AgentRouterUseCase:
|
|
| 25 |
|
| 26 |
async def execute_chat(self, request: ChatRequest) -> Dict[str, Any]:
|
| 27 |
intent = self._classify_intent(request.query)
|
| 28 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 29 |
|
| 30 |
if intent == "OTHER":
|
| 31 |
print(f"DEBUG: Routing to OTHER (Direct LLM Response)")
|
|
@@ -42,7 +46,11 @@ class AgentRouterUseCase:
|
|
| 42 |
|
| 43 |
async def execute_stream(self, request: ChatRequest, is_guest: bool = False, user_id: int = None) -> AsyncGenerator[str, None]:
|
| 44 |
intent = self._classify_intent(request.query)
|
| 45 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 46 |
|
| 47 |
if intent == "OTHER":
|
| 48 |
full_answer = ""
|
|
|
|
| 25 |
|
| 26 |
async def execute_chat(self, request: ChatRequest) -> Dict[str, Any]:
|
| 27 |
intent = self._classify_intent(request.query)
|
| 28 |
+
# Generate a unique session ID if none provided β never use a shared fallback
|
| 29 |
+
if not request.session_id:
|
| 30 |
+
import uuid
|
| 31 |
+
request.session_id = str(uuid.uuid4())
|
| 32 |
+
session_id = request.session_id
|
| 33 |
|
| 34 |
if intent == "OTHER":
|
| 35 |
print(f"DEBUG: Routing to OTHER (Direct LLM Response)")
|
|
|
|
| 46 |
|
| 47 |
async def execute_stream(self, request: ChatRequest, is_guest: bool = False, user_id: int = None) -> AsyncGenerator[str, None]:
|
| 48 |
intent = self._classify_intent(request.query)
|
| 49 |
+
# Generate a unique session ID if none provided β never use a shared fallback
|
| 50 |
+
if not request.session_id:
|
| 51 |
+
import uuid
|
| 52 |
+
request.session_id = str(uuid.uuid4())
|
| 53 |
+
session_id = request.session_id
|
| 54 |
|
| 55 |
if intent == "OTHER":
|
| 56 |
full_answer = ""
|
src/core/use_cases/rag_chat_use_case.py
CHANGED
|
@@ -704,7 +704,12 @@ JSON:"""
|
|
| 704 |
|
| 705 |
async def execute_chat(self, request: ChatRequest) -> Dict[str, Any]:
|
| 706 |
print(f"DEBUG: execute_chat called with query: {request.query}")
|
| 707 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 708 |
|
| 709 |
# ββ Layer 1: Full Response Cache (5 min TTL) ββββββββββββββββββββββββββ
|
| 710 |
cache_keys = self._get_cache_keys(request.query)
|
|
@@ -869,7 +874,12 @@ Answer:"""
|
|
| 869 |
return result
|
| 870 |
|
| 871 |
async def execute_stream(self, request: ChatRequest, is_guest: bool = False, user_id: int = None) -> AsyncGenerator[str, None]:
|
| 872 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 873 |
history_text = "" if is_guest else self._get_history_text(session_id)
|
| 874 |
context_text, final_sources = await self._build_context(
|
| 875 |
request.query, request.top_k, request.source_filter, request.language_filter, getattr(request, 'days_back', None)
|
|
@@ -940,16 +950,38 @@ User Question: {request.query}
|
|
| 940 |
|
| 941 |
Answer:"""
|
| 942 |
full_answer = ""
|
|
|
|
|
|
|
|
|
|
|
|
|
| 943 |
async for chunk in self.llm.generate_stream(prompt_stream):
|
| 944 |
-
yield chunk
|
| 945 |
if chunk.startswith("data: "):
|
| 946 |
try:
|
| 947 |
-
import json
|
| 948 |
-
data =
|
| 949 |
if "token" in data:
|
| 950 |
-
|
| 951 |
-
|
| 952 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 953 |
|
| 954 |
# ββ Parse follow-up questions out of the streamed answer ββββββββββββββ
|
| 955 |
follow_up_questions: List[str] = []
|
|
|
|
| 704 |
|
| 705 |
async def execute_chat(self, request: ChatRequest) -> Dict[str, Any]:
|
| 706 |
print(f"DEBUG: execute_chat called with query: {request.query}")
|
| 707 |
+
# Generate a unique session ID if none provided β never use a shared fallback
|
| 708 |
+
if not request.session_id:
|
| 709 |
+
import uuid
|
| 710 |
+
request.session_id = str(uuid.uuid4())
|
| 711 |
+
print(f"DEBUG: Generated new session_id: {request.session_id}")
|
| 712 |
+
session_id = request.session_id
|
| 713 |
|
| 714 |
# ββ Layer 1: Full Response Cache (5 min TTL) ββββββββββββββββββββββββββ
|
| 715 |
cache_keys = self._get_cache_keys(request.query)
|
|
|
|
| 874 |
return result
|
| 875 |
|
| 876 |
async def execute_stream(self, request: ChatRequest, is_guest: bool = False, user_id: int = None) -> AsyncGenerator[str, None]:
|
| 877 |
+
# Generate a unique session ID if none provided β never use a shared fallback
|
| 878 |
+
if not request.session_id:
|
| 879 |
+
import uuid
|
| 880 |
+
request.session_id = str(uuid.uuid4())
|
| 881 |
+
print(f"DEBUG: Generated new session_id: {request.session_id}")
|
| 882 |
+
session_id = request.session_id
|
| 883 |
history_text = "" if is_guest else self._get_history_text(session_id)
|
| 884 |
context_text, final_sources = await self._build_context(
|
| 885 |
request.query, request.top_k, request.source_filter, request.language_filter, getattr(request, 'days_back', None)
|
|
|
|
| 950 |
|
| 951 |
Answer:"""
|
| 952 |
full_answer = ""
|
| 953 |
+
# Buffer to detect and suppress FOLLOW_UP: block during streaming
|
| 954 |
+
_follow_up_buffer = ""
|
| 955 |
+
_follow_up_started = False
|
| 956 |
+
|
| 957 |
async for chunk in self.llm.generate_stream(prompt_stream):
|
|
|
|
| 958 |
if chunk.startswith("data: "):
|
| 959 |
try:
|
| 960 |
+
import json as _json
|
| 961 |
+
data = _json.loads(chunk[6:])
|
| 962 |
if "token" in data:
|
| 963 |
+
token = data["token"]
|
| 964 |
+
full_answer += token
|
| 965 |
+
|
| 966 |
+
# Once FOLLOW_UP: appears, stop yielding tokens to frontend
|
| 967 |
+
if "FOLLOW_UP:" in full_answer and not _follow_up_started:
|
| 968 |
+
_follow_up_started = True
|
| 969 |
+
# Yield everything before FOLLOW_UP: as a corrected chunk
|
| 970 |
+
clean_so_far = full_answer.split("FOLLOW_UP:")[0].strip()
|
| 971 |
+
# Don't yield individual tokens after this point
|
| 972 |
+
continue
|
| 973 |
+
|
| 974 |
+
if not _follow_up_started:
|
| 975 |
+
yield chunk
|
| 976 |
+
else:
|
| 977 |
+
if not _follow_up_started:
|
| 978 |
+
yield chunk
|
| 979 |
+
except Exception:
|
| 980 |
+
if not _follow_up_started:
|
| 981 |
+
yield chunk
|
| 982 |
+
else:
|
| 983 |
+
if not _follow_up_started:
|
| 984 |
+
yield chunk
|
| 985 |
|
| 986 |
# ββ Parse follow-up questions out of the streamed answer ββββββββββββββ
|
| 987 |
follow_up_questions: List[str] = []
|