Spaces:
Sleeping
Sleeping
| from logger_config import setup_logger | |
| from typing import Dict, Any, Optional, List, Union | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| import json | |
| from dify_client_python.dify_client.models.stream import ( | |
| StreamEvent, | |
| StreamResponse, | |
| build_chat_stream_response | |
| ) | |
| import re | |
| logger = setup_logger() | |
| class EventType(Enum): | |
| AGENT_THOUGHT = "agent_thought" | |
| AGENT_MESSAGE = "agent_message" | |
| MESSAGE_END = "message_end" | |
| PING = "ping" | |
| class ToolCall: | |
| tool_name: str | |
| tool_input: Dict[str, Any] | |
| tool_output: Optional[str] | |
| tool_labels: Dict[str, Dict[str, str]] | |
| class Citation: | |
| dataset_id: str | |
| dataset_name: str | |
| document_id: str | |
| document_name: str | |
| segment_id: str | |
| score: float | |
| content: str | |
| class ProcessedResponse: | |
| event_type: EventType | |
| task_id: str | |
| message_id: str | |
| conversation_id: str | |
| content: str | |
| tool_calls: List[ToolCall] | |
| citations: List[Citation] | |
| metadata: Dict[str, Any] | |
| created_at: int | |
| class EnumEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, Enum): | |
| return obj.value | |
| if hasattr(obj, 'dict'): | |
| return obj.dict() | |
| return super().default(obj) | |
| class SSEParser: | |
| def __init__(self): | |
| self.logger = setup_logger("sse_parser") | |
| def parse_sse_event(self, data: str) -> Optional[Dict]: | |
| """Parse SSE event data and return cleaned dictionary""" | |
| self.logger.debug("Parsing SSE event") | |
| try: | |
| # Extract the data portion | |
| if "data:" in data: | |
| data = data.split("data:", 1)[1].strip() | |
| # Parse JSON data | |
| parsed_data = json.loads(data) | |
| # Clean tool outputs if present | |
| if "observation" in parsed_data: | |
| try: | |
| observation = parsed_data["observation"] | |
| if observation and isinstance(observation, str): | |
| tool_data = json.loads(observation) | |
| # Extract relevant tool output | |
| for key, value in tool_data.items(): | |
| if isinstance(value, str) and "llm_result" in value: | |
| tool_result = json.loads(value)["llm_result"] | |
| parsed_data["observation"] = self.clean_tool_output(tool_result) | |
| except: | |
| pass # Keep original observation if parsing fails | |
| return parsed_data | |
| except json.JSONDecodeError as e: | |
| self.logger.error(f"JSON decode error: {str(e)}") | |
| return None | |
| except Exception as e: | |
| self.logger.error(f"Parse error: {str(e)}") | |
| return None | |
| def clean_tool_output(self, output: str) -> str: | |
| """Clean tool output by removing markdown and other formatting""" | |
| # Remove markdown code blocks | |
| output = re.sub(r'```.*?```', '', output, flags=re.DOTALL) | |
| # Remove other markdown formatting | |
| output = re.sub(r'[*_`#]', '', output) | |
| # Clean up whitespace | |
| output = re.sub(r'\n{3,}', '\n\n', output.strip()) | |
| return output |