"""
Board XML to JSON processor.
Updated: uses pooled connections, no threading.
"""
import json
import re
from typing import Optional, Dict, Any, List
from http_pool import board_processor_session
class BoardProcessor:
def __init__(self, api_url: str = "https://corvo-ai-claude-4-6-opus.hf.space/chat"):
self.api_url = api_url
self.system_prompt = self._load_system_prompt()
def _load_system_prompt(self) -> str:
"""Load system prompt from system.txt file"""
try:
with open('system.txt', 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return """You are a converter that transforms XML board data into JSON format.
Convert the given XML to JSON following these rules:
1. Extract all elements from the XML
2. Convert them to proper JSON structure
3. Wrap your JSON output in tags
Example:
Input: Hello
Output: {"board": {"notes": [{"color": "yellow", "x": 100, "y": 200, "text": "Hello"}]}}
Always wrap your final JSON output in tags."""
def _call_ai_api(
self,
user_input: Optional[str] = None,
chat_history: Optional[List[Dict[str, Any]]] = None,
temperature: float = 0.9,
top_p: float = 0.95,
max_tokens: Optional[int] = None
) -> str:
"""Call the AI API using pooled connection."""
payload = {
"user_input": user_input,
"chat_history": chat_history or [],
"temperature": temperature,
"top_p": top_p,
"max_tokens": max_tokens
}
try:
response = board_processor_session.post(
self.api_url, json=payload, timeout=120
)
response.raise_for_status()
result = response.json()
return result.get("assistant_response", "")
except Exception as e:
raise Exception(f"API request failed: {str(e)}")
def _extract_json_from_response(self, response: str) -> str:
"""Extract JSON from tags in the AI response."""
print("\nš DEBUG - Raw AI Response:")
print("=" * 80)
print(response[:500] + ("..." if len(response) > 500 else ""))
print("=" * 80)
patterns = [
r'(.*?)',
r'```json\s*(.*?)\s*```',
r'```\s*(.*?)\s*```',
]
for pattern in patterns:
json_match = re.search(pattern, response, re.DOTALL)
if json_match:
json_str = json_match.group(1).strip()
try:
json.loads(json_str)
return json_str
except json.JSONDecodeError:
continue
try:
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0).strip()
json.loads(json_str)
return json_str
except json.JSONDecodeError:
pass
# Try array
try:
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if json_match:
json_str = json_match.group(0).strip()
json.loads(json_str)
return json_str
except json.JSONDecodeError:
pass
raise ValueError("No valid JSON found in AI response.")
def convert_xml_to_json(
self,
xml_text: str,
temperature: float = 0.9,
top_p: float = 0.95,
max_tokens: Optional[int] = None
) -> str:
"""Convert XML text to Board JSON text using pooled connection."""
chat_history = [
{"role": "system", "content": self.system_prompt}
]
ai_response = self._call_ai_api(
user_input=xml_text,
chat_history=chat_history,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens
)
print(xml_text[:200] + ("..." if len(xml_text) > 200 else ""))
json_text = self._extract_json_from_response(ai_response)
return json_text