Spaces:
Sleeping
Sleeping
| """ | |
| ChatUI Adapters for LangGraph Workflow Streaming | |
| """ | |
| import logging | |
| import asyncio | |
| import json | |
| from typing import AsyncGenerator, Dict, Any | |
| from components.utils import build_conversation_context | |
| logger = logging.getLogger(__name__) | |
| async def process_query_streaming( | |
| compiled_graph, | |
| query: str, | |
| file_upload=None, | |
| reports_filter: str = "", | |
| sources_filter: str = "", | |
| subtype_filter: str = "", | |
| year_filter: str = "", | |
| conversation_context: str = None, | |
| file_content: bytes = None, | |
| filename: str = None | |
| ): | |
| """ | |
| Process a query through the LangGraph workflow with streaming. | |
| COPIED FROM ORIGINAL ORCHESTRATOR. TO BE REPLACED WITH AGENTIC WORFLOW | |
| """ | |
| # Build metadata filters from filter parameters | |
| metadata_filters = None | |
| if any([reports_filter, sources_filter, subtype_filter, year_filter]): | |
| metadata_filters = {} | |
| if reports_filter: | |
| metadata_filters["reports"] = reports_filter | |
| if sources_filter: | |
| metadata_filters["sources"] = sources_filter | |
| if subtype_filter: | |
| metadata_filters["subtype"] = subtype_filter | |
| if year_filter: | |
| metadata_filters["year"] = year_filter | |
| initial_state = { | |
| "query": query, | |
| "metadata": {"session_type": "chatui"}, | |
| "raw_documents": [], | |
| "conversation_context": conversation_context, | |
| "metadata_filters": metadata_filters | |
| } | |
| # Add file content if present | |
| if file_content and filename: | |
| initial_state["file_content"] = file_content | |
| initial_state["filename"] = filename | |
| try: | |
| async for output in compiled_graph.astream(initial_state, stream_mode="custom"): | |
| if output.get("event") == "data": | |
| yield {"type": "data", "content": output["data"]} | |
| elif output.get("event") == "final_answer": | |
| # Handle final_answer event with webSources | |
| sources = output["data"].get("webSources", []) | |
| if sources: | |
| yield {"type": "sources", "content": sources} | |
| elif output.get("event") == "error": | |
| yield {"type": "error", "content": output["data"].get("error", "Unknown error")} | |
| yield {"type": "end", "content": ""} | |
| except Exception as e: | |
| logger.error(f"Pipeline error: {e}", exc_info=True) | |
| yield {"type": "error", "content": str(e)} | |
| async def chatui_adapter(data, compiled_graph, max_turns: int = 3, max_chars: int = 8000): | |
| """Text-only adapter for ChatUI with structured message support""" | |
| logger.debug(f"ChatUI adapter called with data type: {type(data)}") | |
| try: | |
| # Handle both dict and object access patterns | |
| if isinstance(data, dict): | |
| text_value = data.get('text', '') | |
| messages_value = data.get('messages', None) | |
| preprompt_value = data.get('preprompt', None) | |
| else: | |
| text_value = getattr(data, 'text', '') | |
| messages_value = getattr(data, 'messages', None) | |
| preprompt_value = getattr(data, 'preprompt', None) | |
| # Convert dict messages to objects if needed | |
| messages = [] | |
| if messages_value: | |
| for msg in messages_value: | |
| if isinstance(msg, dict): | |
| messages.append(type('Message', (), { | |
| 'role': msg.get('role', 'unknown'), | |
| 'content': msg.get('content', '') | |
| })()) | |
| else: | |
| messages.append(msg) | |
| # Extract latest user query | |
| user_messages = [msg for msg in messages if msg.role == 'user'] | |
| query = user_messages[-1].content if user_messages else text_value | |
| # Conversation metadata (troubleshooting purposes) | |
| msg_metadata = { | |
| 'total': len(messages), | |
| 'user': len(user_messages), | |
| 'assistant': len([m for m in messages if m.role == 'assistant']), | |
| 'msg_lengths': [len(m.content) for m in messages] | |
| } | |
| logger.info(f"Processing query: {query[:20]}... | Conversation: {msg_metadata}") | |
| # Build conversation context for generation (last N turns) | |
| conversation_context = build_conversation_context(messages, max_turns=max_turns, max_chars=max_chars) | |
| full_response = "" | |
| sources_collected = None | |
| async for result in process_query_streaming( | |
| compiled_graph=compiled_graph, | |
| query=query, | |
| file_upload=None, | |
| reports_filter="", | |
| sources_filter="", | |
| subtype_filter="", | |
| year_filter="", | |
| conversation_context=conversation_context | |
| ): | |
| if isinstance(result, dict): | |
| result_type = result.get("type", "data") | |
| content = result.get("content", "") | |
| if result_type == "data": | |
| full_response += content | |
| yield content | |
| elif result_type == "sources": | |
| sources_collected = content | |
| elif result_type == "end": | |
| if sources_collected: | |
| # Send sources as markdown with doc:// URLs for ChatUI to parse | |
| sources_text = "\n\n**Sources:**\n" | |
| for i, source in enumerate(sources_collected, 1): | |
| title = source.get('title', 'Unknown') | |
| uri = source.get('uri') or 'doc://#' | |
| sources_text += f"{i}. [{title}]({uri})\n" | |
| logger.info(f"Sending markdown sources with doc:// scheme") | |
| yield sources_text | |
| elif result_type == "error": | |
| yield f"Error: {content}" | |
| else: | |
| yield str(result) | |
| await asyncio.sleep(0) | |
| except Exception as e: | |
| logger.error(f"ChatUI error: {str(e)}") | |
| logger.error(f"Full traceback:", exc_info=True) | |
| yield f"Error: {str(e)}" | |
| async def chatui_file_adapter(data, compiled_graph, max_turns: int = 3, max_chars: int = 8000): | |
| """File upload adapter for ChatUI with structured message support""" | |
| try: | |
| # Handle both dict and object access patterns | |
| if isinstance(data, dict): | |
| text_value = data.get('text', '') | |
| messages_value = data.get('messages', None) | |
| files_value = data.get('files', None) | |
| preprompt_value = data.get('preprompt', None) | |
| else: | |
| text_value = getattr(data, 'text', '') | |
| messages_value = getattr(data, 'messages', None) | |
| files_value = getattr(data, 'files', None) | |
| preprompt_value = getattr(data, 'preprompt', None) | |
| # Extract query - prefer structured messages | |
| conversation_context = None | |
| if messages_value and len(messages_value) > 0: | |
| # Convert dict messages to objects | |
| messages = [] | |
| for msg in messages_value: | |
| if isinstance(msg, dict): | |
| messages.append(type('Message', (), { | |
| 'role': msg.get('role', 'unknown'), | |
| 'content': msg.get('content', '') | |
| })()) | |
| else: | |
| messages.append(msg) | |
| user_messages = [msg for msg in messages if msg.role == 'user'] | |
| query = user_messages[-1].content if user_messages else text_value | |
| # Conversation metadata (troubleshooting purposes) | |
| msg_metadata = { | |
| 'total': len(messages), | |
| 'user': len(user_messages), | |
| 'assistant': len([m for m in messages if m.role == 'assistant']), | |
| 'msg_lengths': [len(m.content) for m in messages] | |
| } | |
| logger.info(f"Processing query with file: {query[:20]}... | Conversation: {msg_metadata}") | |
| conversation_context = build_conversation_context(messages, max_turns=max_turns, max_chars=max_chars) | |
| else: | |
| query = text_value | |
| file_content = None | |
| filename = None | |
| if files_value and len(files_value) > 0: | |
| file_info = files_value[0] | |
| logger.info(f"Processing file: {file_info.get('name', 'unknown')}") | |
| if file_info.get('type') == 'base64' and file_info.get('content'): | |
| try: | |
| import base64 | |
| file_content = base64.b64decode(file_info['content']) | |
| filename = file_info.get('name', 'uploaded_file') | |
| except Exception as e: | |
| logger.error(f"Error decoding base64 file: {str(e)}") | |
| yield f"Error: Failed to decode uploaded file - {str(e)}" | |
| return | |
| sources_collected = None | |
| async for result in process_query_streaming( | |
| compiled_graph=compiled_graph, | |
| query=query, | |
| file_upload=None, | |
| reports_filter="", | |
| sources_filter="", | |
| subtype_filter="", | |
| year_filter="", | |
| conversation_context=conversation_context, | |
| file_content=file_content, | |
| filename=filename | |
| ): | |
| if isinstance(result, dict): | |
| result_type = result.get("type", "data") | |
| content = result.get("content", "") | |
| if result_type == "data": | |
| yield content | |
| elif result_type == "sources": | |
| sources_collected = content | |
| elif result_type == "end": | |
| if sources_collected: | |
| # Send sources as markdown with doc:// URLs for ChatUI to parse | |
| sources_text = "\n\n**Sources:**\n" | |
| for i, source in enumerate(sources_collected, 1): | |
| if isinstance(source, dict): | |
| title = source.get('title', 'Unknown') | |
| uri = source.get('uri') or 'doc://#' | |
| sources_text += f"{i}. [{title}]({uri})\n" | |
| else: | |
| sources_text += f"{i}. {str(source)}\n" | |
| logger.info(f"Sending markdown sources with doc:// scheme (file)") | |
| yield sources_text | |
| elif result_type == "error": | |
| yield f"Error: {content}" | |
| else: | |
| yield str(result) | |
| await asyncio.sleep(0) | |
| except Exception as e: | |
| logger.error(f"ChatUI file adapter error: {str(e)}") | |
| yield f"Error: {str(e)}" |