eeshanyaj commited on
Commit
fc73f93
·
1 Parent(s): c463ae7

added sse features

Browse files
app/api/v1/conversation_routes.py CHANGED
@@ -18,6 +18,9 @@ from fastapi import APIRouter, HTTPException, status, Depends
18
  from pydantic import BaseModel, Field
19
  from typing import List, Dict, Optional
20
  from datetime import datetime
 
 
 
21
 
22
  from app.services.chat_service import chat_service
23
  from app.services.conversation_service import conversation_service
@@ -637,6 +640,439 @@ async def remove_reaction(
637
  )
638
 
639
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
640
  # ============================================================================
641
  # HEALTH CHECK
642
  # ============================================================================
 
18
  from pydantic import BaseModel, Field
19
  from typing import List, Dict, Optional
20
  from datetime import datetime
21
+ from fastapi.responses import StreamingResponse
22
+ from app.services.streaming_service import streaming_service
23
+ import json
24
 
25
  from app.services.chat_service import chat_service
26
  from app.services.conversation_service import conversation_service
 
640
  )
641
 
642
 
643
+ # ========================================================================
644
+ # 🆕 STREAMING ENDPOINTS - Add after existing chat endpoint
645
+ # ========================================================================
646
+
647
+ @router.post("/stream")
648
+ async def chat_stream(
649
+ request: ChatRequest,
650
+ current_user: TokenData = Depends(get_current_user)
651
+ ):
652
+ """
653
+ 💬 Send message and get AI response via Server-Sent Events (SSE).
654
+
655
+ **Streaming endpoint** - returns response in real-time chunks.
656
+
657
+ Events sent:
658
+ - `status`: Progress updates (retrieval, generation)
659
+ - `content`: Response text chunks
660
+ - `metadata`: Final statistics (policy action, docs retrieved, timing)
661
+ - `done`: Stream completion
662
+ - `error`: If error occurs
663
+
664
+ Frontend should use EventSource API to consume stream.
665
+
666
+ Example:
667
+ ```javascript
668
+ const eventSource = new EventSource('/api/v1/chat/stream');
669
+
670
+ eventSource.addEventListener('content', (e) => {
671
+ const data = JSON.parse(e.data);
672
+ console.log(data.text); // Append to UI
673
+ });
674
+
675
+ eventSource.addEventListener('done', () => {
676
+ eventSource.close();
677
+ });
678
+ ```
679
+
680
+ Requires JWT authentication (pass as query param: ?token=YOUR_JWT).
681
+ """
682
+ try:
683
+ user_id = current_user.user_id
684
+
685
+ # ====================================================================
686
+ # STEP 1: Get or Create Conversation (same as non-streaming)
687
+ # ====================================================================
688
+ conversation_id = request.conversation_id
689
+
690
+ if conversation_id:
691
+ conversation = await conversation_repository.get_conversation(conversation_id)
692
+
693
+ if not conversation:
694
+ raise HTTPException(
695
+ status_code=status.HTTP_404_NOT_FOUND,
696
+ detail="Conversation not found"
697
+ )
698
+
699
+ if conversation["user_id"] != user_id:
700
+ raise HTTPException(
701
+ status_code=status.HTTP_403_FORBIDDEN,
702
+ detail="Access denied"
703
+ )
704
+ else:
705
+ # Create new conversation
706
+ from app.models.conversation import CreateConversationRequest
707
+ create_req = CreateConversationRequest(
708
+ title=None,
709
+ first_message=request.query
710
+ )
711
+
712
+ new_conversation = await conversation_service.create_conversation(
713
+ user_id=user_id,
714
+ request=create_req,
715
+ llm_manager=None
716
+ )
717
+
718
+ conversation_id = str(new_conversation.id)
719
+
720
+ # ====================================================================
721
+ # STEP 2: Get History
722
+ # ====================================================================
723
+ history = await conversation_repository.get_conversation_history(
724
+ conversation_id=conversation_id,
725
+ max_messages=10
726
+ )
727
+
728
+ # ====================================================================
729
+ # STEP 3: Save User Message
730
+ # ====================================================================
731
+ await conversation_repository.add_message(
732
+ conversation_id=conversation_id,
733
+ message={
734
+ 'role': 'user',
735
+ 'content': request.query,
736
+ 'timestamp': datetime.utcnow(),
737
+ 'metadata': None
738
+ }
739
+ )
740
+
741
+ # ====================================================================
742
+ # STEP 4: Stream Response
743
+ # ====================================================================
744
+ async def generate_stream():
745
+ """Generator that adds conversation_id to first event"""
746
+
747
+ # Send conversation_id first (so frontend knows where to save)
748
+ yield f"event: conversation_id\ndata: {json.dumps({'conversation_id': conversation_id})}\n\n"
749
+
750
+ # Collect full response for saving
751
+ full_response = ""
752
+ final_metadata = {}
753
+
754
+ # Stream from service
755
+ async for sse_event in streaming_service.stream_chat_response(
756
+ query=request.query,
757
+ conversation_history=history,
758
+ user_id=user_id
759
+ ):
760
+ yield sse_event
761
+
762
+ # Parse event to collect data
763
+ if "event: content" in sse_event:
764
+ # Extract text from: data: {"text": "..."}
765
+ import re
766
+ match = re.search(r'"text":\s*"([^"]*)"', sse_event)
767
+ if match:
768
+ full_response += match.group(1)
769
+
770
+ elif "event: metadata" in sse_event:
771
+ # Extract metadata
772
+ import re
773
+ data_match = re.search(r'data: (.+)', sse_event)
774
+ if data_match:
775
+ final_metadata = json.loads(data_match.group(1))
776
+
777
+ # Save assistant response after streaming completes
778
+ await conversation_repository.add_message(
779
+ conversation_id=conversation_id,
780
+ message={
781
+ 'role': 'assistant',
782
+ 'content': full_response,
783
+ 'timestamp': datetime.utcnow(),
784
+ 'metadata': {
785
+ 'policy_action': final_metadata.get('policy_action'),
786
+ 'policy_confidence': final_metadata.get('policy_confidence'),
787
+ 'documents_retrieved': final_metadata.get('documents_retrieved'),
788
+ 'top_doc_score': final_metadata.get('top_doc_score'),
789
+ 'retrieval_time_ms': final_metadata.get('retrieval_time_ms'),
790
+ 'generation_time_ms': final_metadata.get('generation_time_ms')
791
+ }
792
+ }
793
+ )
794
+
795
+ # Log retrieval data
796
+ await conversation_repository.log_retrieval({
797
+ 'conversation_id': conversation_id,
798
+ 'user_id': user_id,
799
+ 'query': request.query,
800
+ 'policy_action': final_metadata.get('policy_action'),
801
+ 'policy_confidence': final_metadata.get('policy_confidence'),
802
+ 'should_retrieve': final_metadata.get('documents_retrieved', 0) > 0,
803
+ 'documents_retrieved': final_metadata.get('documents_retrieved', 0),
804
+ 'top_doc_score': final_metadata.get('top_doc_score'),
805
+ 'response': full_response,
806
+ 'retrieval_time_ms': final_metadata.get('retrieval_time_ms'),
807
+ 'generation_time_ms': final_metadata.get('generation_time_ms'),
808
+ 'total_time_ms': final_metadata.get('total_time_ms'),
809
+ 'retrieved_docs_metadata': final_metadata.get('retrieved_docs_metadata', []),
810
+ 'timestamp': datetime.utcnow()
811
+ })
812
+
813
+ # Return SSE stream
814
+ return StreamingResponse(
815
+ generate_stream(),
816
+ media_type="text/event-stream",
817
+ headers={
818
+ "Cache-Control": "no-cache",
819
+ "Connection": "keep-alive",
820
+ "X-Accel-Buffering": "no" # Disable nginx buffering
821
+ }
822
+ )
823
+
824
+ except HTTPException:
825
+ raise
826
+ except Exception as e:
827
+ print(f"❌ Streaming endpoint error: {e}")
828
+ import traceback
829
+ traceback.print_exc()
830
+ raise HTTPException(
831
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
832
+ detail=f"Failed to stream response: {str(e)}"
833
+ )
834
+
835
+
836
+ # ========================================================================
837
+ # 🆕 REGENERATE RESPONSE (with streaming)
838
+ # ========================================================================
839
+
840
+ @router.post("/conversation/{conversation_id}/regenerate")
841
+ async def regenerate_last_response(
842
+ conversation_id: str,
843
+ current_user: TokenData = Depends(get_current_user)
844
+ ):
845
+ """
846
+ 🔄 Regenerate the last assistant response.
847
+
848
+ - Removes last assistant message
849
+ - Re-processes last user query
850
+ - Returns streaming response
851
+
852
+ User must own the conversation.
853
+ """
854
+ try:
855
+ # Get conversation
856
+ conversation = await conversation_service.get_conversation(
857
+ conversation_id=conversation_id,
858
+ user_id=current_user.user_id
859
+ )
860
+
861
+ if not conversation:
862
+ raise HTTPException(
863
+ status_code=status.HTTP_404_NOT_FOUND,
864
+ detail="Conversation not found"
865
+ )
866
+
867
+ if len(conversation.messages) < 2:
868
+ raise HTTPException(
869
+ status_code=status.HTTP_400_BAD_REQUEST,
870
+ detail="Need at least 2 messages to regenerate"
871
+ )
872
+
873
+ # Get last user message
874
+ last_user_msg = None
875
+ for msg in reversed(conversation.messages):
876
+ if msg.role == 'user':
877
+ last_user_msg = msg
878
+ break
879
+
880
+ if not last_user_msg:
881
+ raise HTTPException(
882
+ status_code=status.HTTP_400_BAD_REQUEST,
883
+ detail="No user message found to regenerate from"
884
+ )
885
+
886
+ # Remove last assistant message(s)
887
+ await conversation_repository.remove_last_assistant_message(conversation_id)
888
+
889
+ # Get updated history
890
+ history = await conversation_repository.get_conversation_history(
891
+ conversation_id=conversation_id,
892
+ max_messages=10
893
+ )
894
+
895
+ # Stream regenerated response
896
+ async def generate_stream():
897
+ yield f"event: conversation_id\ndata: {json.dumps({'conversation_id': conversation_id})}\n\n"
898
+
899
+ full_response = ""
900
+ final_metadata = {}
901
+
902
+ async for sse_event in streaming_service.stream_chat_response(
903
+ query=last_user_msg.content,
904
+ conversation_history=history,
905
+ user_id=current_user.user_id
906
+ ):
907
+ yield sse_event
908
+
909
+ if "event: content" in sse_event:
910
+ import re
911
+ match = re.search(r'"text":\s*"([^"]*)"', sse_event)
912
+ if match:
913
+ full_response += match.group(1)
914
+
915
+ elif "event: metadata" in sse_event:
916
+ import re
917
+ data_match = re.search(r'data: (.+)', sse_event)
918
+ if data_match:
919
+ final_metadata = json.loads(data_match.group(1))
920
+
921
+ # Save new response
922
+ await conversation_repository.add_message(
923
+ conversation_id=conversation_id,
924
+ message={
925
+ 'role': 'assistant',
926
+ 'content': full_response,
927
+ 'timestamp': datetime.utcnow(),
928
+ 'metadata': {
929
+ 'policy_action': final_metadata.get('policy_action'),
930
+ 'policy_confidence': final_metadata.get('policy_confidence'),
931
+ 'documents_retrieved': final_metadata.get('documents_retrieved'),
932
+ 'regenerated': True # Flag for analytics
933
+ }
934
+ }
935
+ )
936
+
937
+ return StreamingResponse(
938
+ generate_stream(),
939
+ media_type="text/event-stream",
940
+ headers={
941
+ "Cache-Control": "no-cache",
942
+ "Connection": "keep-alive"
943
+ }
944
+ )
945
+
946
+ except HTTPException:
947
+ raise
948
+ except Exception as e:
949
+ print(f"❌ Regenerate error: {e}")
950
+ raise HTTPException(
951
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
952
+ detail=f"Failed to regenerate: {str(e)}"
953
+ )
954
+
955
+
956
+ # ========================================================================
957
+ # 🆕 EDIT LAST MESSAGE (then regenerate)
958
+ # ========================================================================
959
+
960
+ class EditMessageRequest(BaseModel):
961
+ """Request body for editing last user message"""
962
+ new_content: str = Field(..., min_length=1, max_length=2000)
963
+
964
+
965
+ @router.post("/conversation/{conversation_id}/edit")
966
+ async def edit_and_regenerate(
967
+ conversation_id: str,
968
+ request: EditMessageRequest,
969
+ current_user: TokenData = Depends(get_current_user)
970
+ ):
971
+ """
972
+ ✏️ Edit last user message and regenerate response.
973
+
974
+ - Updates last user message content
975
+ - Removes last assistant response
976
+ - Regenerates with new message
977
+ - Returns streaming response
978
+
979
+ User must own the conversation.
980
+ """
981
+ try:
982
+ # Get conversation
983
+ conversation = await conversation_service.get_conversation(
984
+ conversation_id=conversation_id,
985
+ user_id=current_user.user_id
986
+ )
987
+
988
+ if not conversation:
989
+ raise HTTPException(
990
+ status_code=status.HTTP_404_NOT_FOUND,
991
+ detail="Conversation not found"
992
+ )
993
+
994
+ # Update last user message
995
+ success = await conversation_repository.update_last_user_message(
996
+ conversation_id=conversation_id,
997
+ new_content=request.new_content.strip()
998
+ )
999
+
1000
+ if not success:
1001
+ raise HTTPException(
1002
+ status_code=status.HTTP_400_BAD_REQUEST,
1003
+ detail="Failed to update message"
1004
+ )
1005
+
1006
+ # Remove last assistant message
1007
+ await conversation_repository.remove_last_assistant_message(conversation_id)
1008
+
1009
+ # Get updated history
1010
+ history = await conversation_repository.get_conversation_history(
1011
+ conversation_id=conversation_id,
1012
+ max_messages=10
1013
+ )
1014
+
1015
+ # Stream regenerated response with edited query
1016
+ async def generate_stream():
1017
+ yield f"event: conversation_id\ndata: {json.dumps({'conversation_id': conversation_id})}\n\n"
1018
+
1019
+ full_response = ""
1020
+ final_metadata = {}
1021
+
1022
+ async for sse_event in streaming_service.stream_chat_response(
1023
+ query=request.new_content,
1024
+ conversation_history=history,
1025
+ user_id=current_user.user_id
1026
+ ):
1027
+ yield sse_event
1028
+
1029
+ if "event: content" in sse_event:
1030
+ import re
1031
+ match = re.search(r'"text":\s*"([^"]*)"', sse_event)
1032
+ if match:
1033
+ full_response += match.group(1)
1034
+
1035
+ elif "event: metadata" in sse_event:
1036
+ import re
1037
+ data_match = re.search(r'data: (.+)', sse_event)
1038
+ if data_match:
1039
+ final_metadata = json.loads(data_match.group(1))
1040
+
1041
+ # Save new response
1042
+ await conversation_repository.add_message(
1043
+ conversation_id=conversation_id,
1044
+ message={
1045
+ 'role': 'assistant',
1046
+ 'content': full_response,
1047
+ 'timestamp': datetime.utcnow(),
1048
+ 'metadata': {
1049
+ 'policy_action': final_metadata.get('policy_action'),
1050
+ 'policy_confidence': final_metadata.get('policy_confidence'),
1051
+ 'documents_retrieved': final_metadata.get('documents_retrieved'),
1052
+ 'edited': True # Flag for analytics
1053
+ }
1054
+ }
1055
+ )
1056
+
1057
+ return StreamingResponse(
1058
+ generate_stream(),
1059
+ media_type="text/event-stream",
1060
+ headers={
1061
+ "Cache-Control": "no-cache",
1062
+ "Connection": "keep-alive"
1063
+ }
1064
+ )
1065
+
1066
+ except HTTPException:
1067
+ raise
1068
+ except Exception as e:
1069
+ print(f"❌ Edit error: {e}")
1070
+ raise HTTPException(
1071
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
1072
+ detail=f"Failed to edit message: {str(e)}"
1073
+ )
1074
+
1075
+
1076
  # ============================================================================
1077
  # HEALTH CHECK
1078
  # ============================================================================
app/core/llm_manager.py CHANGED
@@ -17,11 +17,12 @@ Fallback Logic:
17
  """
18
 
19
  import time
20
- from typing import List, Dict, Optional, Literal
21
  from langchain_groq import ChatGroq
22
  from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
23
  from huggingface_hub import InferenceClient
24
  from app.config import settings
 
25
 
26
  # ============================================================================
27
  # GROQ MANAGER WITH FALLBACK
@@ -299,7 +300,7 @@ class LLMManager:
299
  messages: List[Dict[str, str]],
300
  system_prompt: Optional[str] = None,
301
  task: Literal["chat", "evaluation"] = "chat"
302
- ) -> str:
303
  """
304
  Generate response with cascading fallback logic.
305
 
@@ -344,6 +345,140 @@ class LLMManager:
344
 
345
  raise ValueError("No LLM provider available")
346
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
347
  async def generate_chat_response(
348
  self,
349
  query: str,
 
17
  """
18
 
19
  import time
20
+ from typing import AsyncGenerator, List, Dict, Optional, Literal
21
  from langchain_groq import ChatGroq
22
  from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
23
  from huggingface_hub import InferenceClient
24
  from app.config import settings
25
+ import asyncio
26
 
27
  # ============================================================================
28
  # GROQ MANAGER WITH FALLBACK
 
300
  messages: List[Dict[str, str]],
301
  system_prompt: Optional[str] = None,
302
  task: Literal["chat", "evaluation"] = "chat"
303
+ ) -> str:
304
  """
305
  Generate response with cascading fallback logic.
306
 
 
345
 
346
  raise ValueError("No LLM provider available")
347
 
348
+ # ============================================================================
349
+ # ADD TO: backend/app/core/llm_manager.py
350
+ # Add this method to LLMManager class
351
+ # ============================================================================
352
+
353
+ async def stream_chat_response(
354
+ self,
355
+ query: str,
356
+ context: str = "",
357
+ history: List[Dict[str, str]] = None,
358
+ max_tokens: int = 1000,
359
+ temperature: float = 0.7
360
+ ) -> AsyncGenerator[str, None]:
361
+ """
362
+ Stream chat response (yields chunks as they're generated).
363
+
364
+ Tries Groq first (streaming), falls back to HuggingFace (non-streaming).
365
+
366
+ Args:
367
+ query: User query
368
+ context: Retrieved context
369
+ history: Conversation history
370
+ max_tokens: Max response length
371
+ temperature: Sampling temperature
372
+
373
+ Yields:
374
+ str: Response chunks
375
+ """
376
+ if history is None:
377
+ history = []
378
+
379
+ # Build system prompt
380
+ system_prompt = """You are an expert banking assistant specialized in Indian financial regulations and banking practices.
381
+
382
+ Instructions:
383
+ - Answer accurately using provided context when available
384
+ - If context is insufficient, still respond helpfully
385
+ - Keep responses clear and concise
386
+ - Never fabricate specific policies or rates
387
+ - Maintain a professional tone"""
388
+
389
+ # Build user message
390
+ user_message = query
391
+ if context:
392
+ user_message = f"""Context from knowledge base:
393
+ {context}
394
+
395
+ User Query: {query}
396
+
397
+ Please answer the query using the context above when relevant."""
398
+
399
+ # ====================================================================
400
+ # TRY GROQ (STREAMING SUPPORTED)
401
+ # ====================================================================
402
+ if self.groq:
403
+ try:
404
+ # Build messages for Groq
405
+ messages = [{"role": "system", "content": system_prompt}]
406
+
407
+ # Add history
408
+ for msg in history[-10:]: # Last 10 messages
409
+ messages.append({
410
+ "role": msg['role'],
411
+ "content": msg['content']
412
+ })
413
+
414
+ # Add current query
415
+ messages.append({"role": "user", "content": user_message})
416
+
417
+ # Stream from Groq
418
+ stream = self.groq.chat.completions.create(
419
+ model=self.groq_model,
420
+ messages=messages,
421
+ max_tokens=max_tokens,
422
+ temperature=temperature,
423
+ stream=True # Enable streaming
424
+ )
425
+
426
+ for chunk in stream:
427
+ if chunk.choices[0].delta.content:
428
+ yield chunk.choices[0].delta.content
429
+
430
+ return # Success, exit
431
+
432
+ except Exception as e:
433
+ print(f"⚠️ Groq streaming failed: {e}")
434
+ # Fall through to HuggingFace
435
+
436
+ # ====================================================================
437
+ # FALLBACK: HUGGINGFACE (NO STREAMING - SIMULATE)
438
+ # ====================================================================
439
+ if self.huggingface:
440
+ try:
441
+ print("⚠️ Using HuggingFace (simulated streaming)")
442
+
443
+ # Build prompt for HuggingFace
444
+ prompt = f"{system_prompt}\n\n"
445
+
446
+ # Add history
447
+ for msg in history[-5:]:
448
+ role = "Human" if msg['role'] == 'user' else "Assistant"
449
+ prompt += f"{role}: {msg['content']}\n"
450
+
451
+ prompt += f"Human: {user_message}\nAssistant:"
452
+
453
+ # Generate full response
454
+ response = self.huggingface(
455
+ prompt,
456
+ max_new_tokens=max_tokens,
457
+ temperature=temperature,
458
+ do_sample=True,
459
+ return_full_text=False
460
+ )[0]['generated_text']
461
+
462
+ # Simulate streaming by splitting into words
463
+ words = response.split()
464
+ for i, word in enumerate(words):
465
+ # Add space except for first word
466
+ chunk = word if i == 0 else f" {word}"
467
+ yield chunk
468
+
469
+ # Small delay to simulate streaming
470
+ await asyncio.sleep(0.05) # 50ms per word
471
+
472
+ return
473
+
474
+ except Exception as e:
475
+ print(f"❌ HuggingFace streaming failed: {e}")
476
+
477
+ # ====================================================================
478
+ # BOTH FAILED - RETURN ERROR
479
+ # ====================================================================
480
+ yield "I apologize, but I'm unable to generate a response at the moment. Please try again."
481
+
482
  async def generate_chat_response(
483
  self,
484
  query: str,
app/db/repositories/conversation_repository.py CHANGED
@@ -626,6 +626,145 @@ class ConversationRepository:
626
  except Exception as e:
627
  print(f"❌ Update reaction error: {e}")
628
  return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
629
 
630
 
631
  # ============================================================================
 
626
  except Exception as e:
627
  print(f"❌ Update reaction error: {e}")
628
  return False
629
+
630
+
631
+
632
+ # ============================================================================
633
+ # ADD TO: backend/app/db/repositories/conversation_repository.py
634
+ # Add these methods to ConversationRepository class
635
+ # ============================================================================
636
+
637
+ async def remove_last_assistant_message(
638
+ self,
639
+ conversation_id: str
640
+ ) -> bool:
641
+ """
642
+ Remove the last assistant message from conversation.
643
+
644
+ Used for regenerate functionality.
645
+
646
+ Args:
647
+ conversation_id: Conversation ID
648
+
649
+ Returns:
650
+ bool: True if removed
651
+ """
652
+ try:
653
+ from bson import ObjectId
654
+
655
+ # Get conversation
656
+ conversation = await self.collection.find_one(
657
+ {"_id": ObjectId(conversation_id)}
658
+ )
659
+
660
+ if not conversation:
661
+ return False
662
+
663
+ messages = conversation.get('messages', [])
664
+
665
+ # Find last assistant message index
666
+ last_assistant_idx = None
667
+ for i in range(len(messages) - 1, -1, -1):
668
+ if messages[i].get('role') == 'assistant':
669
+ last_assistant_idx = i
670
+ break
671
+
672
+ if last_assistant_idx is None:
673
+ print("⚠️ No assistant message to remove")
674
+ return False
675
+
676
+ # Remove message
677
+ messages.pop(last_assistant_idx)
678
+
679
+ # Update conversation
680
+ result = await self.collection.update_one(
681
+ {"_id": ObjectId(conversation_id)},
682
+ {
683
+ "$set": {
684
+ "messages": messages,
685
+ "message_count": len(messages),
686
+ "updated_at": datetime.utcnow()
687
+ }
688
+ }
689
+ )
690
+
691
+ if result.modified_count > 0:
692
+ print(f"✅ Removed last assistant message from conversation {conversation_id}")
693
+ return True
694
+
695
+ return False
696
+
697
+ except Exception as e:
698
+ print(f"❌ Remove last assistant message error: {e}")
699
+ return False
700
+
701
+
702
+ async def update_last_user_message(
703
+ self,
704
+ conversation_id: str,
705
+ new_content: str
706
+ ) -> bool:
707
+ """
708
+ Update the content of last user message.
709
+
710
+ Used for edit functionality.
711
+
712
+ Args:
713
+ conversation_id: Conversation ID
714
+ new_content: New message content
715
+
716
+ Returns:
717
+ bool: True if updated
718
+ """
719
+ try:
720
+ from bson import ObjectId
721
+
722
+ # Get conversation
723
+ conversation = await self.collection.find_one(
724
+ {"_id": ObjectId(conversation_id)}
725
+ )
726
+
727
+ if not conversation:
728
+ return False
729
+
730
+ messages = conversation.get('messages', [])
731
+
732
+ # Find last user message index
733
+ last_user_idx = None
734
+ for i in range(len(messages) - 1, -1, -1):
735
+ if messages[i].get('role') == 'user':
736
+ last_user_idx = i
737
+ break
738
+
739
+ if last_user_idx is None:
740
+ print("⚠️ No user message to update")
741
+ return False
742
+
743
+ # Update message content
744
+ messages[last_user_idx]['content'] = new_content
745
+ messages[last_user_idx]['timestamp'] = datetime.utcnow()
746
+ messages[last_user_idx]['edited'] = True # Flag as edited
747
+
748
+ # Update conversation
749
+ result = await self.collection.update_one(
750
+ {"_id": ObjectId(conversation_id)},
751
+ {
752
+ "$set": {
753
+ "messages": messages,
754
+ "updated_at": datetime.utcnow()
755
+ }
756
+ }
757
+ )
758
+
759
+ if result.modified_count > 0:
760
+ print(f"✅ Updated last user message in conversation {conversation_id}")
761
+ return True
762
+
763
+ return False
764
+
765
+ except Exception as e:
766
+ print(f"❌ Update last user message error: {e}")
767
+ return False
768
 
769
 
770
  # ============================================================================
app/services/streaming_service.py ADDED
@@ -0,0 +1,222 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ============================================================================
2
+ # backend/app/services/streaming_service.py - NEW FILE
3
+ # ============================================================================
4
+
5
+ """
6
+ Streaming Service - Server-Sent Events (SSE)
7
+
8
+ Handles real-time streaming of AI responses.
9
+ Integrates with chat_service.py RAG pipeline.
10
+ """
11
+
12
+ import asyncio
13
+ import json
14
+ from typing import AsyncGenerator, Dict, Any, List, Optional
15
+ from datetime import datetime
16
+
17
+ from app.config import settings
18
+ from app.ml.policy_network import predict_policy_action
19
+ from app.ml.retriever import retrieve_documents, format_context
20
+ from app.core.llm_manager import llm_manager
21
+
22
+
23
+ # ============================================================================
24
+ # STREAMING SERVICE
25
+ # ============================================================================
26
+
27
+ class StreamingService:
28
+ """
29
+ Handles SSE streaming for real-time chat responses.
30
+
31
+ Events sent:
32
+ - status: Progress updates (retrieval, generation stages)
33
+ - content: Response chunks (word by word)
34
+ - metadata: Final stats (policy action, docs retrieved, etc.)
35
+ - done: Stream completion signal
36
+ - error: Error occurred
37
+ """
38
+
39
+ def __init__(self):
40
+ print("🌊 StreamingService initialized")
41
+
42
+ async def stream_chat_response(
43
+ self,
44
+ query: str,
45
+ conversation_history: List[Dict[str, str]] = None,
46
+ user_id: Optional[str] = None
47
+ ) -> AsyncGenerator[str, None]:
48
+ """
49
+ Stream chat response with progress updates.
50
+
51
+ Yields SSE-formatted events:
52
+ - event: status, content, metadata, done, error
53
+ - data: JSON payload
54
+
55
+ Args:
56
+ query: User query
57
+ conversation_history: Previous messages
58
+ user_id: User ID
59
+
60
+ Yields:
61
+ str: SSE formatted events
62
+ """
63
+ import time
64
+ start_time = time.time()
65
+
66
+ if conversation_history is None:
67
+ conversation_history = []
68
+
69
+ try:
70
+ # ================================================================
71
+ # STAGE 1: Policy Decision
72
+ # ================================================================
73
+ yield self._format_sse_event(
74
+ event="status",
75
+ data={"stage": "policy", "message": "Analyzing query..."}
76
+ )
77
+
78
+ await asyncio.sleep(0.1) # Small delay for UX
79
+
80
+ policy_result = predict_policy_action(
81
+ query=query,
82
+ history=conversation_history,
83
+ return_probs=True
84
+ )
85
+
86
+ # ================================================================
87
+ # STAGE 2: Retrieval (if needed)
88
+ # ================================================================
89
+ retrieved_docs = []
90
+ context = ""
91
+ retrieval_time = 0
92
+
93
+ if policy_result['should_retrieve']:
94
+ yield self._format_sse_event(
95
+ event="status",
96
+ data={"stage": "retrieval", "message": "Searching knowledge base..."}
97
+ )
98
+
99
+ retrieval_start = time.time()
100
+
101
+ try:
102
+ retrieved_docs = retrieve_documents(
103
+ query=query,
104
+ top_k=settings.TOP_K,
105
+ min_similarity=settings.SIMILARITY_THRESHOLD
106
+ )
107
+
108
+ retrieval_time = (time.time() - retrieval_start) * 1000
109
+
110
+ if retrieved_docs:
111
+ context = format_context(
112
+ retrieved_docs,
113
+ max_context_length=settings.MAX_CONTEXT_LENGTH
114
+ )
115
+
116
+ yield self._format_sse_event(
117
+ event="status",
118
+ data={
119
+ "stage": "retrieval",
120
+ "message": f"Found {len(retrieved_docs)} relevant documents"
121
+ }
122
+ )
123
+
124
+ except Exception as e:
125
+ print(f"⚠️ Retrieval error during streaming: {e}")
126
+ # Continue without retrieval
127
+
128
+ # ================================================================
129
+ # STAGE 3: Stream Generation
130
+ # ================================================================
131
+ yield self._format_sse_event(
132
+ event="status",
133
+ data={"stage": "generation", "message": "Generating response..."}
134
+ )
135
+
136
+ generation_start = time.time()
137
+ full_response = ""
138
+
139
+ # Stream from LLM
140
+ async for chunk in llm_manager.stream_chat_response(
141
+ query=query,
142
+ context=context,
143
+ history=conversation_history
144
+ ):
145
+ full_response += chunk
146
+
147
+ yield self._format_sse_event(
148
+ event="content",
149
+ data={"text": chunk}
150
+ )
151
+
152
+ generation_time = (time.time() - generation_start) * 1000
153
+ total_time = (time.time() - start_time) * 1000
154
+
155
+ # ================================================================
156
+ # STAGE 4: Send Metadata
157
+ # ================================================================
158
+ metadata = {
159
+ "policy_action": policy_result['action'],
160
+ "policy_confidence": policy_result['confidence'],
161
+ "documents_retrieved": len(retrieved_docs),
162
+ "top_doc_score": retrieved_docs[0]['score'] if retrieved_docs else None,
163
+ "retrieval_time_ms": round(retrieval_time, 2),
164
+ "generation_time_ms": round(generation_time, 2),
165
+ "total_time_ms": round(total_time, 2),
166
+ "timestamp": datetime.now().isoformat()
167
+ }
168
+
169
+ # Add retrieved docs metadata
170
+ if retrieved_docs:
171
+ metadata['retrieved_docs_metadata'] = [
172
+ {
173
+ 'faq_id': doc['faq_id'],
174
+ 'score': doc['score'],
175
+ 'category': doc['category'],
176
+ 'rank': doc['rank']
177
+ }
178
+ for doc in retrieved_docs
179
+ ]
180
+
181
+ yield self._format_sse_event(
182
+ event="metadata",
183
+ data=metadata
184
+ )
185
+
186
+ # ================================================================
187
+ # STAGE 5: Done
188
+ # ================================================================
189
+ yield self._format_sse_event(
190
+ event="done",
191
+ data={"message": "Stream completed"}
192
+ )
193
+
194
+ except Exception as e:
195
+ print(f"❌ Streaming error: {e}")
196
+ import traceback
197
+ traceback.print_exc()
198
+
199
+ yield self._format_sse_event(
200
+ event="error",
201
+ data={"error": str(e), "message": "An error occurred during streaming"}
202
+ )
203
+
204
+ def _format_sse_event(self, event: str, data: Dict[str, Any]) -> str:
205
+ """
206
+ Format data as SSE event.
207
+
208
+ SSE format:
209
+ event: <event_name>
210
+ data: <json_data>
211
+
212
+ (blank line to separate events)
213
+ """
214
+ json_data = json.dumps(data, ensure_ascii=False)
215
+ return f"event: {event}\ndata: {json_data}\n\n"
216
+
217
+
218
+ # ============================================================================
219
+ # GLOBAL INSTANCE
220
+ # ============================================================================
221
+
222
+ streaming_service = StreamingService()
frontend_integration_example.js ADDED
@@ -0,0 +1,300 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ // ============================================================================
2
+ // Frontend Integration - How to Consume SSE Streams
3
+ // ============================================================================
4
+
5
+ // ============================================================================
6
+ // VANILLA JAVASCRIPT EXAMPLE
7
+ // ============================================================================
8
+
9
+ async function sendStreamingMessage(query, conversationId = null) {
10
+ const token = localStorage.getItem('jwt_token');
11
+
12
+ // Prepare request
13
+ const requestData = {
14
+ query: query,
15
+ conversation_id: conversationId
16
+ };
17
+
18
+ try {
19
+ // Make POST request to get stream
20
+ const response = await fetch('http://localhost:8000/api/v1/chat/stream', {
21
+ method: 'POST',
22
+ headers: {
23
+ 'Content-Type': 'application/json',
24
+ 'Authorization': `Bearer ${token}`
25
+ },
26
+ body: JSON.stringify(requestData)
27
+ });
28
+
29
+ if (!response.ok) {
30
+ throw new Error(`HTTP error! status: ${response.status}`);
31
+ }
32
+
33
+ // Read stream
34
+ const reader = response.body.getReader();
35
+ const decoder = new TextDecoder();
36
+
37
+ let fullResponse = '';
38
+ let currentConversationId = conversationId;
39
+
40
+ while (true) {
41
+ const { done, value } = await reader.read();
42
+
43
+ if (done) break;
44
+
45
+ // Decode chunk
46
+ const chunk = decoder.decode(value);
47
+
48
+ // Parse SSE events
49
+ const events = parseSSE(chunk);
50
+
51
+ for (const event of events) {
52
+ switch (event.type) {
53
+ case 'conversation_id':
54
+ currentConversationId = event.data.conversation_id;
55
+ console.log('Conversation ID:', currentConversationId);
56
+ break;
57
+
58
+ case 'status':
59
+ console.log('Status:', event.data.message);
60
+ // Update UI with status
61
+ updateStatusIndicator(event.data.message);
62
+ break;
63
+
64
+ case 'content':
65
+ fullResponse += event.data.text;
66
+ // Append to UI in real-time
67
+ appendToMessageBubble(event.data.text);
68
+ break;
69
+
70
+ case 'metadata':
71
+ console.log('Metadata:', event.data);
72
+ // Save metadata if needed
73
+ break;
74
+
75
+ case 'done':
76
+ console.log('Stream completed');
77
+ hideStatusIndicator();
78
+ break;
79
+
80
+ case 'error':
81
+ console.error('Stream error:', event.data.error);
82
+ showError(event.data.message);
83
+ break;
84
+ }
85
+ }
86
+ }
87
+
88
+ return {
89
+ response: fullResponse,
90
+ conversationId: currentConversationId
91
+ };
92
+
93
+ } catch (error) {
94
+ console.error('Streaming error:', error);
95
+ throw error;
96
+ }
97
+ }
98
+
99
+ // Parse SSE format
100
+ function parseSSE(text) {
101
+ const events = [];
102
+ const lines = text.split('\n\n');
103
+
104
+ for (const line of lines) {
105
+ if (!line.trim()) continue;
106
+
107
+ const eventMatch = line.match(/event: (.+)/);
108
+ const dataMatch = line.match(/data: (.+)/);
109
+
110
+ if (eventMatch && dataMatch) {
111
+ try {
112
+ events.push({
113
+ type: eventMatch[1],
114
+ data: JSON.parse(dataMatch[1])
115
+ });
116
+ } catch (e) {
117
+ console.warn('Failed to parse SSE event:', e);
118
+ }
119
+ }
120
+ }
121
+
122
+ return events;
123
+ }
124
+
125
+
126
+ // ============================================================================
127
+ // REACT EXAMPLE (with hooks)
128
+ // ============================================================================
129
+
130
+ import { useState, useRef } from 'react';
131
+
132
+ function ChatComponent() {
133
+ const [messages, setMessages] = useState([]);
134
+ const [isStreaming, setIsStreaming] = useState(false);
135
+ const [statusMessage, setStatusMessage] = useState('');
136
+ const abortControllerRef = useRef(null);
137
+
138
+ const sendStreamingMessage = async (query, conversationId) => {
139
+ setIsStreaming(true);
140
+ setStatusMessage('');
141
+
142
+ // Add user message immediately
143
+ const userMessage = { role: 'user', content: query };
144
+ setMessages(prev => [...prev, userMessage]);
145
+
146
+ // Create assistant message placeholder
147
+ const assistantMessageId = Date.now();
148
+ setMessages(prev => [...prev, {
149
+ id: assistantMessageId,
150
+ role: 'assistant',
151
+ content: ''
152
+ }]);
153
+
154
+ try {
155
+ const token = localStorage.getItem('jwt_token');
156
+
157
+ // Create abort controller for cancellation
158
+ abortControllerRef.current = new AbortController();
159
+
160
+ const response = await fetch('http://localhost:8000/api/v1/chat/stream', {
161
+ method: 'POST',
162
+ headers: {
163
+ 'Content-Type': 'application/json',
164
+ 'Authorization': `Bearer ${token}`
165
+ },
166
+ body: JSON.stringify({ query, conversation_id: conversationId }),
167
+ signal: abortControllerRef.current.signal
168
+ });
169
+
170
+ const reader = response.body.getReader();
171
+ const decoder = new TextDecoder();
172
+
173
+ let newConversationId = conversationId;
174
+
175
+ while (true) {
176
+ const { done, value } = await reader.read();
177
+ if (done) break;
178
+
179
+ const chunk = decoder.decode(value);
180
+ const events = parseSSE(chunk);
181
+
182
+ for (const event of events) {
183
+ switch (event.type) {
184
+ case 'conversation_id':
185
+ newConversationId = event.data.conversation_id;
186
+ break;
187
+
188
+ case 'status':
189
+ setStatusMessage(event.data.message);
190
+ break;
191
+
192
+ case 'content':
193
+ // Update assistant message
194
+ setMessages(prev => prev.map(msg =>
195
+ msg.id === assistantMessageId
196
+ ? { ...msg, content: msg.content + event.data.text }
197
+ : msg
198
+ ));
199
+ break;
200
+
201
+ case 'done':
202
+ setStatusMessage('');
203
+ break;
204
+
205
+ case 'error':
206
+ console.error('Error:', event.data);
207
+ setStatusMessage('');
208
+ break;
209
+ }
210
+ }
211
+ }
212
+
213
+ return newConversationId;
214
+
215
+ } catch (error) {
216
+ if (error.name === 'AbortError') {
217
+ console.log('Stream cancelled by user');
218
+ } else {
219
+ console.error('Streaming error:', error);
220
+ }
221
+ } finally {
222
+ setIsStreaming(false);
223
+ setStatusMessage('');
224
+ abortControllerRef.current = null;
225
+ }
226
+ };
227
+
228
+ const stopStreaming = () => {
229
+ if (abortControllerRef.current) {
230
+ abortControllerRef.current.abort();
231
+ }
232
+ };
233
+
234
+ return (
235
+ <div className="chat-container">
236
+ <div className="messages">
237
+ {messages.map((msg, idx) => (
238
+ <div key={idx} className={`message ${msg.role}`}>
239
+ {msg.content}
240
+ </div>
241
+ ))}
242
+
243
+ {statusMessage && (
244
+ <div className="status-indicator">
245
+ {statusMessage}
246
+ </div>
247
+ )}
248
+ </div>
249
+
250
+ {isStreaming && (
251
+ <button onClick={stopStreaming}>
252
+ Stop Generating
253
+ </button>
254
+ )}
255
+ </div>
256
+ );
257
+ }
258
+
259
+
260
+ // ============================================================================
261
+ // REGENERATE & EDIT EXAMPLES
262
+ // ============================================================================
263
+
264
+ // Regenerate last response
265
+ async function regenerateResponse(conversationId) {
266
+ const token = localStorage.getItem('jwt_token');
267
+
268
+ const response = await fetch(
269
+ `http://localhost:8000/api/v1/chat/conversation/${conversationId}/regenerate`,
270
+ {
271
+ method: 'POST',
272
+ headers: {
273
+ 'Authorization': `Bearer ${token}`
274
+ }
275
+ }
276
+ );
277
+
278
+ // Process stream same as sendStreamingMessage
279
+ // ...
280
+ }
281
+
282
+ // Edit last message
283
+ async function editLastMessage(conversationId, newContent) {
284
+ const token = localStorage.getItem('jwt_token');
285
+
286
+ const response = await fetch(
287
+ `http://localhost:8000/api/v1/chat/conversation/${conversationId}/edit`,
288
+ {
289
+ method: 'POST',
290
+ headers: {
291
+ 'Content-Type': 'application/json',
292
+ 'Authorization': `Bearer ${token}`
293
+ },
294
+ body: JSON.stringify({ new_content: newContent })
295
+ }
296
+ );
297
+
298
+ // Process stream
299
+ // ...
300
+ }