junaid17 commited on
Commit
9554ad4
·
verified ·
1 Parent(s): 21ca625

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +112 -19
app.py CHANGED
@@ -79,17 +79,16 @@ async def upload_document(
79
 
80
  # ... (keep existing imports) ...
81
 
 
82
  @app.post("/chat/stream")
83
  async def chat_stream_endpoint(request: ChatRequest):
84
  """
85
  Streaming Chat Endpoint.
86
- Streams tokens/chunks as they are generated by LangGraph.
87
  """
88
-
89
  async def event_generator():
90
  try:
91
  config = {"configurable": {"thread_id": request.thread_id}}
92
-
93
  inputs = {
94
  "query": request.query,
95
  "RAG": request.use_rag,
@@ -99,31 +98,125 @@ async def chat_stream_endpoint(request: ChatRequest):
99
  "metadata": [],
100
  "web_context": "",
101
  }
102
-
103
- async for event in rag_app.astream(inputs, config=config, stream_mode="values"):
 
 
 
104
  if "response" in event:
105
- msg = event["response"][-1]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
- if hasattr(msg, "content") and msg.content:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
  chunk = {
109
- "type": "chunk",
110
- "content": msg.content
 
111
  }
112
- yield json.dumps(chunk) + "\n"
113
-
114
- # signal end of stream
115
- yield json.dumps({"type": "done"}) + "\n"
116
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  except Exception as e:
118
- error_chunk = {"type": "error", "message": str(e)}
119
- yield json.dumps(error_chunk) + "\n"
120
-
 
 
 
 
121
  return StreamingResponse(
122
  event_generator(),
123
- media_type="text/plain"
 
 
 
 
 
124
  )
125
 
126
-
127
  # ---------------- STT ---------------- #
128
  @app.post("/stt")
129
  async def transcribe_audio(file: UploadFile = File(...)):
 
79
 
80
  # ... (keep existing imports) ...
81
 
82
+ # NEW: Streaming endpoint
83
  @app.post("/chat/stream")
84
  async def chat_stream_endpoint(request: ChatRequest):
85
  """
86
  Streaming Chat Endpoint.
87
+ Streams the LLM response as Server-Sent Events (SSE).
88
  """
 
89
  async def event_generator():
90
  try:
91
  config = {"configurable": {"thread_id": request.thread_id}}
 
92
  inputs = {
93
  "query": request.query,
94
  "RAG": request.use_rag,
 
98
  "metadata": [],
99
  "web_context": "",
100
  }
101
+
102
+ # Use astream or astream_events depending on your LangGraph version
103
+ async for event in rag_app.astream(inputs, config=config):
104
+ # Extract the content from the streaming event
105
+ # The structure depends on your graph, adjust as needed
106
  if "response" in event:
107
+ messages = event["response"]
108
+ if messages and len(messages) > 0:
109
+ last_msg = messages[-1]
110
+ if hasattr(last_msg, 'content'):
111
+ chunk = {
112
+ "type": "content",
113
+ "data": last_msg.content,
114
+ "thread_id": request.thread_id
115
+ }
116
+ yield f"data: {json.dumps(chunk)}\n\n"
117
+
118
+ # If your graph streams token by token, handle it here
119
+ elif "chunk" in event:
120
+ chunk = {
121
+ "type": "token",
122
+ "data": event["chunk"],
123
+ "thread_id": request.thread_id
124
+ }
125
+ yield f"data: {json.dumps(chunk)}\n\n"
126
+
127
+ # Send completion signal
128
+ yield f"data: {json.dumps({'type': 'done', 'thread_id': request.thread_id})}\n\n"
129
+
130
+ except Exception as e:
131
+ error_data = {
132
+ "type": "error",
133
+ "error": str(e),
134
+ "thread_id": request.thread_id
135
+ }
136
+ yield f"data: {json.dumps(error_data)}\n\n"
137
+
138
+ return StreamingResponse(
139
+ event_generator(),
140
+ media_type="text/event-stream",
141
+ headers={
142
+ "Cache-Control": "no-cache",
143
+ "Connection": "keep-alive",
144
+ "X-Accel-Buffering": "no" # Disable nginx buffering
145
+ }
146
+ )
147
+
148
 
149
+ # ALTERNATIVE: If you need more granular streaming with astream_events
150
+ @app.post("/chat/stream/events")
151
+ async def chat_stream_events_endpoint(request: ChatRequest):
152
+ """
153
+ Streaming Chat Endpoint using astream_events.
154
+ Provides more granular control over streaming events.
155
+ """
156
+ async def event_generator():
157
+ try:
158
+ config = {"configurable": {"thread_id": request.thread_id}}
159
+ inputs = {
160
+ "query": request.query,
161
+ "RAG": request.use_rag,
162
+ "web_search": request.use_web,
163
+ "model_name": request.model_name,
164
+ "context": [],
165
+ "metadata": [],
166
+ "web_context": "",
167
+ }
168
+
169
+ # Stream events from the graph
170
+ async for event in rag_app.astream_events(inputs, config=config, version="v2"):
171
+ event_type = event.get("event")
172
+
173
+ # Handle different event types
174
+ if event_type == "on_chat_model_stream":
175
+ # This captures token-by-token streaming from the LLM
176
+ content = event.get("data", {}).get("chunk", {})
177
+ if hasattr(content, 'content') and content.content:
178
  chunk = {
179
+ "type": "token",
180
+ "data": content.content,
181
+ "thread_id": request.thread_id
182
  }
183
+ yield f"data: {json.dumps(chunk)}\n\n"
184
+
185
+ elif event_type == "on_chain_end":
186
+ # Final result
187
+ output = event.get("data", {}).get("output", {})
188
+ if "response" in output:
189
+ messages = output["response"]
190
+ if messages and len(messages) > 0:
191
+ last_msg = messages[-1]
192
+ chunk = {
193
+ "type": "complete",
194
+ "data": last_msg.content if hasattr(last_msg, 'content') else str(last_msg),
195
+ "thread_id": request.thread_id
196
+ }
197
+ yield f"data: {json.dumps(chunk)}\n\n"
198
+
199
+ # Send completion signal
200
+ yield f"data: {json.dumps({'type': 'done', 'thread_id': request.thread_id})}\n\n"
201
+
202
  except Exception as e:
203
+ error_data = {
204
+ "type": "error",
205
+ "error": str(e),
206
+ "thread_id": request.thread_id
207
+ }
208
+ yield f"data: {json.dumps(error_data)}\n\n"
209
+
210
  return StreamingResponse(
211
  event_generator(),
212
+ media_type="text/event-stream",
213
+ headers={
214
+ "Cache-Control": "no-cache",
215
+ "Connection": "keep-alive",
216
+ "X-Accel-Buffering": "no"
217
+ }
218
  )
219
 
 
220
  # ---------------- STT ---------------- #
221
  @app.post("/stt")
222
  async def transcribe_audio(file: UploadFile = File(...)):