""" ChatUI Adapters for LangGraph Workflow Streaming """ import logging import asyncio import json from typing import AsyncGenerator, Dict, Any from components.utils import build_conversation_context logger = logging.getLogger(__name__) async def process_query_streaming( compiled_graph, query: str, file_upload=None, reports_filter: str = "", sources_filter: str = "", subtype_filter: str = "", year_filter: str = "", conversation_context: str = None, file_content: bytes = None, filename: str = None ): """ Process a query through the LangGraph workflow with streaming. COPIED FROM ORIGINAL ORCHESTRATOR. TO BE REPLACED WITH AGENTIC WORFLOW """ # Build metadata filters from filter parameters metadata_filters = None if any([reports_filter, sources_filter, subtype_filter, year_filter]): metadata_filters = {} if reports_filter: metadata_filters["reports"] = reports_filter if sources_filter: metadata_filters["sources"] = sources_filter if subtype_filter: metadata_filters["subtype"] = subtype_filter if year_filter: metadata_filters["year"] = year_filter initial_state = { "query": query, "metadata": {"session_type": "chatui"}, "raw_documents": [], "conversation_context": conversation_context, "metadata_filters": metadata_filters } # Add file content if present if file_content and filename: initial_state["file_content"] = file_content initial_state["filename"] = filename try: async for output in compiled_graph.astream(initial_state, stream_mode="custom"): if output.get("event") == "data": yield {"type": "data", "content": output["data"]} elif output.get("event") == "final_answer": # Handle final_answer event with webSources sources = output["data"].get("webSources", []) if sources: yield {"type": "sources", "content": sources} elif output.get("event") == "error": yield {"type": "error", "content": output["data"].get("error", "Unknown error")} yield {"type": "end", "content": ""} except Exception as e: logger.error(f"Pipeline error: {e}", exc_info=True) yield {"type": "error", "content": str(e)} async def chatui_adapter(data, compiled_graph, max_turns: int = 3, max_chars: int = 8000): """Text-only adapter for ChatUI with structured message support""" logger.debug(f"ChatUI adapter called with data type: {type(data)}") try: # Handle both dict and object access patterns if isinstance(data, dict): text_value = data.get('text', '') messages_value = data.get('messages', None) preprompt_value = data.get('preprompt', None) else: text_value = getattr(data, 'text', '') messages_value = getattr(data, 'messages', None) preprompt_value = getattr(data, 'preprompt', None) # Convert dict messages to objects if needed messages = [] if messages_value: for msg in messages_value: if isinstance(msg, dict): messages.append(type('Message', (), { 'role': msg.get('role', 'unknown'), 'content': msg.get('content', '') })()) else: messages.append(msg) # Extract latest user query user_messages = [msg for msg in messages if msg.role == 'user'] query = user_messages[-1].content if user_messages else text_value # Conversation metadata (troubleshooting purposes) msg_metadata = { 'total': len(messages), 'user': len(user_messages), 'assistant': len([m for m in messages if m.role == 'assistant']), 'msg_lengths': [len(m.content) for m in messages] } logger.info(f"Processing query: {query[:20]}... | Conversation: {msg_metadata}") # Build conversation context for generation (last N turns) conversation_context = build_conversation_context(messages, max_turns=max_turns, max_chars=max_chars) full_response = "" sources_collected = None async for result in process_query_streaming( compiled_graph=compiled_graph, query=query, file_upload=None, reports_filter="", sources_filter="", subtype_filter="", year_filter="", conversation_context=conversation_context ): if isinstance(result, dict): result_type = result.get("type", "data") content = result.get("content", "") if result_type == "data": full_response += content yield content elif result_type == "sources": sources_collected = content elif result_type == "end": if sources_collected: # Send sources as markdown with doc:// URLs for ChatUI to parse sources_text = "\n\n**Sources:**\n" for i, source in enumerate(sources_collected, 1): title = source.get('title', 'Unknown') uri = source.get('uri') or 'doc://#' sources_text += f"{i}. [{title}]({uri})\n" logger.info(f"Sending markdown sources with doc:// scheme") yield sources_text elif result_type == "error": yield f"Error: {content}" else: yield str(result) await asyncio.sleep(0) except Exception as e: logger.error(f"ChatUI error: {str(e)}") logger.error(f"Full traceback:", exc_info=True) yield f"Error: {str(e)}" async def chatui_file_adapter(data, compiled_graph, max_turns: int = 3, max_chars: int = 8000): """File upload adapter for ChatUI with structured message support""" try: # Handle both dict and object access patterns if isinstance(data, dict): text_value = data.get('text', '') messages_value = data.get('messages', None) files_value = data.get('files', None) preprompt_value = data.get('preprompt', None) else: text_value = getattr(data, 'text', '') messages_value = getattr(data, 'messages', None) files_value = getattr(data, 'files', None) preprompt_value = getattr(data, 'preprompt', None) # Extract query - prefer structured messages conversation_context = None if messages_value and len(messages_value) > 0: # Convert dict messages to objects messages = [] for msg in messages_value: if isinstance(msg, dict): messages.append(type('Message', (), { 'role': msg.get('role', 'unknown'), 'content': msg.get('content', '') })()) else: messages.append(msg) user_messages = [msg for msg in messages if msg.role == 'user'] query = user_messages[-1].content if user_messages else text_value # Conversation metadata (troubleshooting purposes) msg_metadata = { 'total': len(messages), 'user': len(user_messages), 'assistant': len([m for m in messages if m.role == 'assistant']), 'msg_lengths': [len(m.content) for m in messages] } logger.info(f"Processing query with file: {query[:20]}... | Conversation: {msg_metadata}") conversation_context = build_conversation_context(messages, max_turns=max_turns, max_chars=max_chars) else: query = text_value file_content = None filename = None if files_value and len(files_value) > 0: file_info = files_value[0] logger.info(f"Processing file: {file_info.get('name', 'unknown')}") if file_info.get('type') == 'base64' and file_info.get('content'): try: import base64 file_content = base64.b64decode(file_info['content']) filename = file_info.get('name', 'uploaded_file') except Exception as e: logger.error(f"Error decoding base64 file: {str(e)}") yield f"Error: Failed to decode uploaded file - {str(e)}" return sources_collected = None async for result in process_query_streaming( compiled_graph=compiled_graph, query=query, file_upload=None, reports_filter="", sources_filter="", subtype_filter="", year_filter="", conversation_context=conversation_context, file_content=file_content, filename=filename ): if isinstance(result, dict): result_type = result.get("type", "data") content = result.get("content", "") if result_type == "data": yield content elif result_type == "sources": sources_collected = content elif result_type == "end": if sources_collected: # Send sources as markdown with doc:// URLs for ChatUI to parse sources_text = "\n\n**Sources:**\n" for i, source in enumerate(sources_collected, 1): if isinstance(source, dict): title = source.get('title', 'Unknown') uri = source.get('uri') or 'doc://#' sources_text += f"{i}. [{title}]({uri})\n" else: sources_text += f"{i}. {str(source)}\n" logger.info(f"Sending markdown sources with doc:// scheme (file)") yield sources_text elif result_type == "error": yield f"Error: {content}" else: yield str(result) await asyncio.sleep(0) except Exception as e: logger.error(f"ChatUI file adapter error: {str(e)}") yield f"Error: {str(e)}"