""" Board XML to JSON processor. Updated: uses pooled connections, no threading. """ import json import re from typing import Optional, Dict, Any, List from http_pool import board_processor_session class BoardProcessor: def __init__(self, api_url: str = "https://corvo-ai-claude-4-6-opus.hf.space/chat"): self.api_url = api_url self.system_prompt = self._load_system_prompt() def _load_system_prompt(self) -> str: """Load system prompt from system.txt file""" try: with open('system.txt', 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: return """You are a converter that transforms XML board data into JSON format. Convert the given XML to JSON following these rules: 1. Extract all elements from the XML 2. Convert them to proper JSON structure 3. Wrap your JSON output in tags Example: Input: Hello Output: {"board": {"notes": [{"color": "yellow", "x": 100, "y": 200, "text": "Hello"}]}} Always wrap your final JSON output in tags.""" def _call_ai_api( self, user_input: Optional[str] = None, chat_history: Optional[List[Dict[str, Any]]] = None, temperature: float = 0.9, top_p: float = 0.95, max_tokens: Optional[int] = None ) -> str: """Call the AI API using pooled connection.""" payload = { "user_input": user_input, "chat_history": chat_history or [], "temperature": temperature, "top_p": top_p, "max_tokens": max_tokens } try: response = board_processor_session.post( self.api_url, json=payload, timeout=120 ) response.raise_for_status() result = response.json() return result.get("assistant_response", "") except Exception as e: raise Exception(f"API request failed: {str(e)}") def _extract_json_from_response(self, response: str) -> str: """Extract JSON from tags in the AI response.""" print("\nšŸ“‹ DEBUG - Raw AI Response:") print("=" * 80) print(response[:500] + ("..." if len(response) > 500 else "")) print("=" * 80) patterns = [ r'(.*?)', r'```json\s*(.*?)\s*```', r'```\s*(.*?)\s*```', ] for pattern in patterns: json_match = re.search(pattern, response, re.DOTALL) if json_match: json_str = json_match.group(1).strip() try: json.loads(json_str) return json_str except json.JSONDecodeError: continue try: json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: json_str = json_match.group(0).strip() json.loads(json_str) return json_str except json.JSONDecodeError: pass # Try array try: json_match = re.search(r'\[.*\]', response, re.DOTALL) if json_match: json_str = json_match.group(0).strip() json.loads(json_str) return json_str except json.JSONDecodeError: pass raise ValueError("No valid JSON found in AI response.") def convert_xml_to_json( self, xml_text: str, temperature: float = 0.9, top_p: float = 0.95, max_tokens: Optional[int] = None ) -> str: """Convert XML text to Board JSON text using pooled connection.""" chat_history = [ {"role": "system", "content": self.system_prompt} ] ai_response = self._call_ai_api( user_input=xml_text, chat_history=chat_history, temperature=temperature, top_p=top_p, max_tokens=max_tokens ) print(xml_text[:200] + ("..." if len(xml_text) > 200 else "")) json_text = self._extract_json_from_response(ai_response) return json_text