""" Backend API client for communication with Go API server. Feature: 012-profile-contact-ui """ import os import time from typing import Any, Dict, List, Optional import requests from opentelemetry import trace from opentelemetry.trace import Status, StatusCode from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from ..models import Message class BackendAPIError(Exception): """Raised when backend API returns an error.""" pass class BackendAPIClient: """Client for communicating with the backend API.""" def __init__(self): self.base_url = os.getenv("BACKEND_API_URL", "http://localhost:8080/v1") self.timeout = int(os.getenv("BACKEND_API_TIMEOUT", "5")) self.bearer_token = os.getenv("API_BEARER_TOKEN", "") def _get_headers(self, user_id: str = None) -> Dict[str, str]: """ Get HTTP headers including Authorization bearer token, user ID, and trace context. Args: user_id: Optional user ID to include in X-User-ID header Automatically injects OpenTelemetry trace context from current span. """ headers = {"Content-Type": "application/json"} if self.bearer_token: headers["Authorization"] = f"Bearer {self.bearer_token}" if user_id: headers["X-User-ID"] = user_id # Inject trace context into headers automatically (only if tracing enabled) try: from ..utils.tracing import is_tracing_enabled if is_tracing_enabled(): TraceContextTextMapPropagator().inject(headers) except Exception: # Don't fail request if tracing injection fails pass return headers def _track_latency(self, start_time: float): """Track backend API latency in Flask request context.""" try: from flask import g latency_ms = (time.time() - start_time) * 1000 # Accumulate latency if multiple backend calls in one request if hasattr(g, "backend_latency_ms"): g.backend_latency_ms += latency_ms else: g.backend_latency_ms = latency_ms except (ImportError, RuntimeError): # Not in Flask request context - ignore pass def get_session(self, session_id: str, user_id: str, producer_id: str = None) -> Dict[str, Any]: """ Get full session including all messages (facts + chat messages). Args: session_id: Profile or contact session ID user_id: User ID (required for API authentication) producer_id: Optional producer ID for contact sessions (e.g., "testuser_jane_1") Returns: Session dict with messages array Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/sessions/{session_id}" # Add producer_id query parameter if provided params = {} if producer_id: params["producer_id"] = producer_id # Create span for backend call tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("backend.get_session") as span: span.set_attribute("http.method", "GET") span.set_attribute("http.url", url) span.set_attribute("session_id", session_id) span.set_attribute("user_id", user_id) if producer_id: span.set_attribute("producer_id", producer_id) start_time = time.time() try: response = requests.get(url, params=params, headers=self._get_headers(user_id), timeout=self.timeout) response.raise_for_status() span.set_attribute("http.status_code", response.status_code) return response.json() except requests.exceptions.Timeout: span.set_status(Status(StatusCode.ERROR, "timeout")) span.set_attribute("timeout", self.timeout) raise BackendAPIError(f"Request timed out after {self.timeout}s") except requests.exceptions.RequestException as e: span.set_status(Status(StatusCode.ERROR, str(e))) raise BackendAPIError(f"Failed to fetch session: {str(e)}") finally: self._track_latency(start_time) def send_message( self, session_id: str, content: str, user_id: str, mode: str = "chat", sender: Optional[str] = "user", reference_session_ids: Optional[List[str]] = None, producer: Optional[str] = None, produced_for: Optional[str] = None, ) -> Dict[str, Any]: """ Send a message or fact to a session. Args: session_id: Profile or contact session ID content: Message or fact content user_id: User ID (required for API authentication) mode: 'chat' for messages, 'memorize' for facts sender: 'user' or 'assistant' (only for mode='chat') reference_session_ids: Optional list of session IDs to use as context (e.g., user's profile session) producer: Optional explicit producer for memory attribution (e.g., "testuser_janedoe_1") produced_for: Optional explicit produced_for for memory attribution (e.g., "testuser") Returns: Response dict with created message(s) Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/sessions/{session_id}/messages" # Add reference_id as query parameter if provided params = {} if reference_session_ids and len(reference_session_ids) > 0: params["reference_id"] = reference_session_ids[0] payload = {"mode": mode, "message": content, "save_to_memory": True} # Add sender only for chat mode if mode == "chat" and sender: payload["sender"] = sender # Add reference session IDs if provided (also in body for backward compatibility) if reference_session_ids: payload["reference_session_ids"] = reference_session_ids # Add producer/produced_for if provided (Feature: 001-refine-memory-producer-logic) if producer: payload["producer"] = producer if produced_for: payload["produced_for"] = produced_for # Create span for backend call with OpenTelemetry tracer = trace.get_tracer(__name__) start_time = time.time() with tracer.start_as_current_span("backend.send_message") as span: span.set_attribute("http.method", "POST") span.set_attribute("http.url", url) span.set_attribute("session_id", session_id) span.set_attribute("user_id", user_id) span.set_attribute("mode", mode) span.set_attribute("content_length", len(content)) # Add reference_id to trace if provided (profile session for contact messages) if reference_session_ids and len(reference_session_ids) > 0: span.set_attribute("reference_id", reference_session_ids[0]) # Add producer and produced_for to span for memory attribution tracking if producer: span.set_attribute("producer", producer) if produced_for: span.set_attribute("produced_for", produced_for) try: response = requests.post(url, params=params, json=payload, headers=self._get_headers(user_id), timeout=self.timeout) response.raise_for_status() span.set_attribute("http.status_code", response.status_code) return response.json() except requests.exceptions.Timeout: span.set_attribute("error", True) span.add_event("timeout", {"timeout": self.timeout}) raise BackendAPIError(f"Request timed out after {self.timeout}s") except requests.exceptions.RequestException as e: span.set_attribute("error", True) span.add_event("error", {"message": str(e)}) raise BackendAPIError(f"Failed to send message: {str(e)}") finally: self._track_latency(start_time) def create_session( self, user_id: str, contact_id: str, description: str = "", is_reference: bool = False ) -> Dict[str, Any]: """ Create a new session in the backend API (v2 format). Args: user_id: User ID (required for API authentication) contact_id: Contact identifier (maps to project_id in v2) description: Session description (optional) is_reference: Whether this is a reference session Returns: Response dict with session_id, contact_id, description, org_id, project_id Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/sessions" payload = { "contact_id": contact_id, "description": description, "is_reference": is_reference } # Create span for backend call with OpenTelemetry tracer = trace.get_tracer(__name__) start_time = time.time() # Add user_id header required by API headers = self._get_headers() headers["X-User-ID"] = user_id with tracer.start_as_current_span("backend.create_session") as span: span.set_attribute("http.method", "POST") span.set_attribute("http.url", url) span.set_attribute("user_id", user_id) span.set_attribute("contact_id", contact_id) span.set_attribute("description", description) span.set_attribute("is_reference", is_reference) try: response = requests.post(url, json=payload, headers=headers, timeout=self.timeout) response.raise_for_status() span.set_attribute("http.status_code", response.status_code) return response.json() except requests.exceptions.Timeout: span.set_attribute("error", True) span.add_event("timeout", {"timeout": self.timeout}) raise BackendAPIError(f"Request timed out after {self.timeout}s") except requests.exceptions.RequestException as e: span.set_attribute("error", True) span.add_event("error", {"message": str(e)}) raise BackendAPIError(f"Failed to create session: {str(e)}") finally: self._track_latency(start_time) def delete_session(self, session_id: str, user_id: str) -> bool: """ Delete a session (rollback mechanism for two-phase commit). Feature: 001-contact-session-fixes Args: session_id: Session ID to delete user_id: User ID who owns the session Returns: True if deleted or already gone (204/404), False on error Raises: BackendAPIError: If API request fails with non-recoverable error """ url = f"{self.base_url}/sessions/{session_id}" # Create span for backend call with OpenTelemetry tracer = trace.get_tracer(__name__) start_time = time.time() # Add user_id header required by API headers = self._get_headers() headers["X-User-ID"] = user_id with tracer.start_as_current_span("backend.delete_session") as span: span.set_attribute("http.method", "DELETE") span.set_attribute("http.url", url) span.set_attribute("session_id", session_id) span.set_attribute("user_id", user_id) try: response = requests.delete(url, headers=headers, timeout=self.timeout) # 204 = successfully deleted, 404 = already gone (both acceptable for rollback) if response.status_code in (204, 404): span.set_attribute("http.status_code", response.status_code) span.set_attribute("rollback_success", True) return True else: span.set_attribute("http.status_code", response.status_code) span.set_attribute("rollback_success", False) span.add_event("unexpected_status", {"status_code": response.status_code}) return False except requests.exceptions.Timeout: span.set_attribute("error", True) span.add_event("timeout", {"timeout": self.timeout}) raise BackendAPIError(f"Delete session timed out after {self.timeout}s") except requests.exceptions.RequestException as e: span.set_attribute("error", True) span.add_event("error", {"message": str(e)}) raise BackendAPIError(f"Failed to delete session: {str(e)}") finally: self._track_latency(start_time) def list_sessions(self, user_id: str) -> List[Dict[str, Any]]: """ List all sessions for a user. Args: user_id: User ID to list sessions for Returns: List of session dicts with fields: - id: Session ID - contact_id: Contact identifier (project_id in v2) - title: Session title (legacy, same as description) - description: Session description - user_id: User ID - created_at: Unix timestamp - last_interaction: Unix timestamp - message_count: Number of messages - is_reference: Reference flag Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/sessions" params = {"user_id": user_id} # Create span for backend call with OpenTelemetry tracer = trace.get_tracer(__name__) start_time = time.time() with tracer.start_as_current_span("backend.list_sessions") as span: span.set_attribute("http.method", "GET") span.set_attribute("http.url", url) span.set_attribute("user_id", user_id) try: response = requests.get(url, params=params, headers=self._get_headers(user_id), timeout=self.timeout) response.raise_for_status() span.set_attribute("http.status_code", response.status_code) return response.json() except requests.exceptions.Timeout: span.set_attribute("error", True) span.add_event("timeout", {"timeout": self.timeout}) raise BackendAPIError(f"Request timed out after {self.timeout}s") except requests.exceptions.RequestException as e: span.set_attribute("error", True) span.add_event("error", {"message": str(e)}) raise BackendAPIError(f"Failed to list sessions: {str(e)}") finally: self._track_latency(start_time) def get_messages( self, session_id: str, user_id: str, mode: Optional[str] = None ) -> List[Message]: """ Get messages from a session, optionally filtered by mode. Args: session_id: Session ID user_id: User ID (required for API authentication) mode: Optional filter - 'chat' or 'memorize' Returns: List of Message objects Raises: BackendAPIError: If API request fails """ session = self.get_session(session_id, user_id) messages_data = session.get("messages", []) messages = [] for msg_data in messages_data: # Filter by mode if specified if mode and msg_data.get("mode") != mode: continue messages.append( Message( message_id=msg_data.get("message_id", msg_data.get("id", "")), session_id=msg_data.get("session_id", session_id), mode=msg_data.get("mode", "chat"), content=msg_data.get("content", ""), created_at=msg_data.get("created_at", msg_data.get("timestamp", "")), sender=msg_data.get("sender"), metadata=msg_data.get("metadata"), ) ) return messages def get_user_settings(self, user_id: str) -> Dict[str, Any]: """ Get user settings (AI model preferences). Args: user_id: User ID (required for API authentication) Returns: Settings dict with fact_processing_model, reply_generation_model, valid_models, updated_at Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/api/settings" # Create span for backend call tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("backend.get_user_settings") as span: span.set_attribute("http.method", "GET") span.set_attribute("http.url", url) span.set_attribute("user_id", user_id) start_time = time.time() try: response = requests.get( url, headers=self._get_headers(user_id), timeout=self.timeout ) response.raise_for_status() span.set_attribute("http.status_code", response.status_code) return response.json() except requests.exceptions.Timeout: span.set_status(Status(StatusCode.ERROR, "timeout")) span.set_attribute("timeout", self.timeout) raise BackendAPIError(f"Request timed out after {self.timeout}s") except requests.exceptions.RequestException as e: span.set_status(Status(StatusCode.ERROR, str(e))) raise BackendAPIError(f"Failed to fetch user settings: {str(e)}") finally: self._track_latency(start_time) def update_user_settings( self, user_id: str, fact_processing_model: str, reply_generation_model: str ) -> Dict[str, Any]: """ Update user settings (AI model preferences). Args: user_id: User ID (required for API authentication) fact_processing_model: Model to use for fact processing/memorization reply_generation_model: Model to use for generating chat responses Returns: Updated settings dict Raises: BackendAPIError: If API request fails or validation fails """ url = f"{self.base_url}/api/settings" payload = { "fact_processing_model": fact_processing_model, "reply_generation_model": reply_generation_model, } # Create span for backend call tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("backend.update_user_settings") as span: span.set_attribute("http.method", "PUT") span.set_attribute("http.url", url) span.set_attribute("user_id", user_id) span.set_attribute("fact_model", fact_processing_model) span.set_attribute("reply_model", reply_generation_model) start_time = time.time() try: response = requests.put( url, json=payload, headers=self._get_headers(user_id), timeout=self.timeout, ) response.raise_for_status() span.set_attribute("http.status_code", response.status_code) return response.json() except requests.exceptions.Timeout: span.set_status(Status(StatusCode.ERROR, "timeout")) span.set_attribute("timeout", self.timeout) raise BackendAPIError(f"Request timed out after {self.timeout}s") except requests.exceptions.HTTPError as e: span.set_status(Status(StatusCode.ERROR, str(e))) # Extract error message from response if available try: error_data = e.response.json() error_msg = error_data.get("error", str(e)) except Exception: error_msg = str(e) raise BackendAPIError(f"Failed to update settings: {error_msg}") except requests.exceptions.RequestException as e: span.set_status(Status(StatusCode.ERROR, str(e))) raise BackendAPIError(f"Failed to update settings: {str(e)}") finally: self._track_latency(start_time) def get_active_template(self, user_id: str) -> Dict[str, Any]: """ Get user's active response template. Args: user_id: User ID (required for API authentication) Returns: Template dict with version_id, template_content, is_active, created_at Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/api/templates/active" # Create span for backend call tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("backend.get_active_template") as span: span.set_attribute("http.method", "GET") span.set_attribute("http.url", url) span.set_attribute("user_id", user_id) start_time = time.time() try: response = requests.get( url, headers=self._get_headers(user_id), timeout=self.timeout ) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: span.set_status(Status(StatusCode.ERROR, str(e))) try: error_data = e.response.json() error_msg = error_data.get("error", str(e)) except Exception: error_msg = str(e) raise BackendAPIError(f"Failed to fetch active template: {error_msg}") except requests.exceptions.RequestException as e: span.set_status(Status(StatusCode.ERROR, str(e))) raise BackendAPIError(f"Failed to fetch active template: {str(e)}") finally: self._track_latency(start_time) def list_templates(self, user_id: str) -> List[Dict[str, Any]]: """ List all template versions for a user. Args: user_id: User ID (required for API authentication) Returns: List of template dicts with version_id, template_content, is_active, created_at Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/api/templates" # Create span for backend call tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("backend.list_templates") as span: span.set_attribute("http.method", "GET") span.set_attribute("http.url", url) span.set_attribute("user_id", user_id) start_time = time.time() try: response = requests.get( url, headers=self._get_headers(user_id), timeout=self.timeout ) response.raise_for_status() data = response.json() return data.get("templates", []) except requests.exceptions.HTTPError as e: span.set_status(Status(StatusCode.ERROR, str(e))) try: error_data = e.response.json() error_msg = error_data.get("error", str(e)) except Exception: error_msg = str(e) raise BackendAPIError(f"Failed to list templates: {error_msg}") except requests.exceptions.RequestException as e: span.set_status(Status(StatusCode.ERROR, str(e))) raise BackendAPIError(f"Failed to list templates: {str(e)}") finally: self._track_latency(start_time) def create_template(self, user_id: str, template_content: str) -> Dict[str, Any]: """ Create a new template version. Args: user_id: User ID (required for API authentication) template_content: Template content (max 10000 characters) Returns: Created template dict with version_id, template_content, is_active, created_at Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/api/templates" payload = {"template_content": template_content} # Create span for backend call tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("backend.create_template") as span: span.set_attribute("http.method", "POST") span.set_attribute("http.url", url) span.set_attribute("user_id", user_id) span.set_attribute("content_length", len(template_content)) start_time = time.time() try: response = requests.post( url, headers=self._get_headers(user_id), json=payload, timeout=self.timeout, ) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: span.set_status(Status(StatusCode.ERROR, str(e))) try: error_data = e.response.json() error_msg = error_data.get("error", str(e)) except Exception: error_msg = str(e) raise BackendAPIError(f"Failed to create template: {error_msg}") except requests.exceptions.RequestException as e: span.set_status(Status(StatusCode.ERROR, str(e))) raise BackendAPIError(f"Failed to create template: {str(e)}") finally: self._track_latency(start_time) def activate_template(self, user_id: str, version_id: str) -> Dict[str, Any]: """ Activate a specific template version. Args: user_id: User ID (required for API authentication) version_id: Template version ID to activate Returns: Success response dict Raises: BackendAPIError: If API request fails """ url = f"{self.base_url}/api/templates/{version_id}/activate" # Create span for backend call tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("backend.activate_template") as span: span.set_attribute("http.method", "PUT") span.set_attribute("http.url", url) span.set_attribute("user_id", user_id) span.set_attribute("version_id", version_id) start_time = time.time() try: response = requests.put( url, headers=self._get_headers(user_id), timeout=self.timeout ) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: span.set_status(Status(StatusCode.ERROR, str(e))) try: error_data = e.response.json() error_msg = error_data.get("error", str(e)) except Exception: error_msg = str(e) raise BackendAPIError(f"Failed to activate template: {error_msg}") except requests.exceptions.RequestException as e: span.set_status(Status(StatusCode.ERROR, str(e))) raise BackendAPIError(f"Failed to activate template: {str(e)}") finally: self._track_latency(start_time) # Global client instance backend_client = BackendAPIClient()