""" Dify Workflow API Client with Streaming Support. This module replaces the DeepSeek direct API calls with Dify Chatflow API, supporting streaming responses for long-running R1 reasoning processes. """ import os import json import httpx from typing import Optional, AsyncGenerator, Dict, Any from dataclasses import dataclass from dotenv import load_dotenv from app.models import LLMAnalysis load_dotenv() @dataclass class DifyStreamEvent: """Represents a single event from Dify's streaming response.""" event: str data: Dict[str, Any] thought: Optional[str] = None answer: Optional[str] = None outputs: Optional[Dict[str, Any]] = None @dataclass class TechnicalMapping: """Technical mapping analysis from Dify workflow.""" token_vs_patch: str = "" temporal_logic: str = "" frequency_domain: str = "" @dataclass class DifyAnalysisResult: """Complete analysis result from Dify workflow.""" summary_zh: str relevance_score: float relevance_reason: str technical_mapping: TechnicalMapping heuristic_idea: str thought_process: Optional[str] = None # R1 thinking process class DifyClientError(Exception): """Base exception for Dify client errors.""" pass class DifyEntityTooLargeError(DifyClientError): """Raised when request payload exceeds Dify's limit (413).""" pass class DifyTimeoutError(DifyClientError): """Raised when request times out.""" pass class DifyRateLimitError(DifyClientError): """Raised when rate limit is exceeded (429).""" pass class DifyClient: """Dify Chatflow API client with streaming support.""" def __init__(self): self.api_key = os.getenv("DIFY_API_KEY") if not self.api_key: raise ValueError("DIFY_API_KEY environment variable is not set") self.base_url = os.getenv("DIFY_API_BASE", "http://82.157.209.193:8080/v1") self.endpoint = f"{self.base_url}/chat-messages" self.timeout = httpx.Timeout(120.0, connect=10.0) # 2 min for R1 reasoning def _format_query( self, topic: str, background: str, method: str, contribution: str, ) -> str: """Format input according to Dify workflow variable specification.""" return f"""研究主题:{topic} 技术背景:{background} 核心方法:{method} 预期贡献:{contribution}""" def _build_request_body( self, query: str, user_id: str = "paper-insight-user", conversation_id: Optional[str] = None, ) -> Dict[str, Any]: """Build the request body for Dify API.""" body = { "inputs": { "query": query, }, "query": query, # Also send as direct query for compatibility "response_mode": "streaming", "user": user_id, } if conversation_id: body["conversation_id"] = conversation_id return body def _get_headers(self) -> Dict[str, str]: """Get request headers with authentication.""" return { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } async def analyze_paper_stream( self, title: str, abstract: str, user_id: str = "paper-insight-user", ) -> AsyncGenerator[DifyStreamEvent, None]: """ Analyze a paper using Dify workflow with streaming. Yields DifyStreamEvent objects for each SSE event received. """ # Format the query using paper information query = self._format_query( topic=title, background="arXiv论文,需要分析其与DiT/KV Cache研究的相关性", method=abstract[:500] if len(abstract) > 500 else abstract, contribution="待分析", ) body = self._build_request_body(query, user_id) headers = self._get_headers() async with httpx.AsyncClient(timeout=self.timeout) as client: try: async with client.stream( "POST", self.endpoint, json=body, headers=headers, ) as response: # Handle error responses if response.status_code == 413: raise DifyEntityTooLargeError( "Request payload too large. Consider shortening the abstract." ) elif response.status_code == 429: raise DifyRateLimitError( "Rate limit exceeded. Please try again later." ) elif response.status_code >= 400: error_text = await response.aread() raise DifyClientError( f"Dify API error {response.status_code}: {error_text.decode()}" ) # Parse SSE stream buffer = "" async for chunk in response.aiter_text(): buffer += chunk # Process complete SSE events while "\n\n" in buffer: event_str, buffer = buffer.split("\n\n", 1) event = self._parse_sse_event(event_str) if event: yield event except httpx.TimeoutException as e: raise DifyTimeoutError(f"Request timed out: {e}") except httpx.RequestError as e: raise DifyClientError(f"Request failed: {e}") def _parse_sse_event(self, event_str: str) -> Optional[DifyStreamEvent]: """Parse a single SSE event string into DifyStreamEvent.""" lines = event_str.strip().split("\n") event_type = "" data_str = "" for line in lines: if line.startswith("event:"): event_type = line[6:].strip() elif line.startswith("data:"): data_str = line[5:].strip() if not data_str: return None try: data = json.loads(data_str) except json.JSONDecodeError: return None event = DifyStreamEvent( event=event_type or data.get("event", ""), data=data, ) # Extract common fields if "thought" in data: event.thought = data["thought"] if "answer" in data: event.answer = data["answer"] if "outputs" in data: event.outputs = data["outputs"] return event async def analyze_paper( self, title: str, abstract: str, user_id: str = "paper-insight-user", ) -> Optional[DifyAnalysisResult]: """ Analyze a paper and return the complete result. This method consumes the entire stream and returns the final result. Use analyze_paper_stream() for real-time streaming updates. """ thought_parts = [] answer_parts = [] final_outputs = None try: async for event in self.analyze_paper_stream(title, abstract, user_id): if event.thought: thought_parts.append(event.thought) if event.answer: answer_parts.append(event.answer) if event.outputs: final_outputs = event.outputs # Check for workflow completion if event.event == "workflow_finished" and event.outputs: final_outputs = event.outputs # Parse the final outputs if final_outputs: return self._parse_outputs(final_outputs, "".join(thought_parts)) # Try to parse from answer if outputs not available full_answer = "".join(answer_parts) if full_answer: return self._parse_answer(full_answer, "".join(thought_parts)) return None except DifyClientError as e: print(f"Dify analysis error: {e}") return None def _parse_outputs( self, outputs: Dict[str, Any], thought_process: str = "", ) -> DifyAnalysisResult: """Parse Dify workflow outputs into DifyAnalysisResult.""" technical_mapping = TechnicalMapping() if "technical_mapping" in outputs: tm = outputs["technical_mapping"] if isinstance(tm, dict): technical_mapping = TechnicalMapping( token_vs_patch=tm.get("token_vs_patch", ""), temporal_logic=tm.get("temporal_logic", ""), frequency_domain=tm.get("frequency_domain", ""), ) return DifyAnalysisResult( summary_zh=outputs.get("summary_zh", ""), relevance_score=float(outputs.get("relevance_score", 0)), relevance_reason=outputs.get("relevance_reason", ""), technical_mapping=technical_mapping, heuristic_idea=outputs.get("heuristic_idea", ""), thought_process=thought_process if thought_process else None, ) def _parse_answer( self, answer: str, thought_process: str = "", ) -> Optional[DifyAnalysisResult]: """Parse answer string (JSON) into DifyAnalysisResult.""" try: data = json.loads(answer) return self._parse_outputs(data, thought_process) except json.JSONDecodeError: # If not JSON, try to extract fields manually return DifyAnalysisResult( summary_zh=answer[:200] if answer else "", relevance_score=0, relevance_reason="无法解析结构化输出", technical_mapping=TechnicalMapping(), heuristic_idea="", thought_process=thought_process if thought_process else None, ) def to_llm_analysis(self, result: DifyAnalysisResult) -> LLMAnalysis: """Convert DifyAnalysisResult to legacy LLMAnalysis model.""" # Combine technical mapping into heuristic_idea for backward compatibility tech_mapping_str = "" if result.technical_mapping: parts = [] if result.technical_mapping.token_vs_patch: parts.append(f"Token/Patch映射: {result.technical_mapping.token_vs_patch}") if result.technical_mapping.temporal_logic: parts.append(f"时序逻辑: {result.technical_mapping.temporal_logic}") if result.technical_mapping.frequency_domain: parts.append(f"频域分析: {result.technical_mapping.frequency_domain}") if parts: tech_mapping_str = "\n\n【技术映射】\n" + "\n".join(parts) heuristic_with_mapping = result.heuristic_idea + tech_mapping_str return LLMAnalysis( summary_zh=result.summary_zh, relevance_score=result.relevance_score, relevance_reason=result.relevance_reason, heuristic_idea=heuristic_with_mapping, ) # Singleton instance _dify_client: Optional[DifyClient] = None def get_dify_client() -> DifyClient: """Get or create DifyClient singleton.""" global _dify_client if _dify_client is None: _dify_client = DifyClient() return _dify_client