"""CVAT API client base class. This module provides the main CvatApiClient class that handles authentication and delegates to specialized method groups. """ import json import logging import time from typing import Any import requests from .annotations import AnnotationsMethods from .auth import AuthMethods from .downloads import DownloadsMethods from .jobs import JobsMethods from .labels import LabelsMethods from .projects import ProjectsMethods from .tasks import TasksMethods class CvatApiClient: """CVAT API client for interacting with CVAT endpoints. This client provides organized methods for working with CVAT API endpoints, including authentication, projects, tasks, jobs, annotations, and downloads. The client is organized into method groups: **Authentication Methods (client.auth.xxx)** - get_auth_token() -> str Authenticate and get authentication token **Project Methods (client.projects.xxx)** - get_project_details(project_id, token=None) -> CvatApiProjectDetails Get project information and metadata - get_project_labels(project_id, token=None) -> list[CvatApiLabelDefinition] Get all label definitions for a project - get_project_job_ids(project_id, token=None) -> list[int] Get all job IDs associated with a project - get_project_tasks(project_id, token=None) -> list[int] Get all task IDs for a project **Task Methods (client.tasks.xxx)** - get_task_details(task_id, token=None) -> CvatApiTaskDetails Get task information and metadata - get_task_media_metainformation(task_id, token=None) -> CvatApiTaskMediasMetainformation Get media metadata (frame info, dimensions) for a task - get_task_job_ids(task_id, token=None) -> list[int] Get all job IDs for a task - update_task(task_id, task_data, token=None) -> CvatApiTaskDetails Update task details (name, assignee, labels, etc.) **Job Methods (client.jobs.xxx)** - list_jobs(request, token=None) -> CvatApiJobsListResponse List jobs with filtering (by project, task, stage, state, assignee, etc.) - get_job_details(job_id, token=None) -> CvatApiJobDetails Get job information and metadata - get_job_media_metainformation(job_id, token=None) -> CvatApiJobMediasMetainformation Get media metadata (frame info, dimensions) for a job - update_job(job_id, job_data, token=None) -> CvatApiJobDetails Update job details (status, assignee, stage, etc.) **Annotation Methods (client.annotations.xxx)** - get_job_annotations(job_id, token=None) -> CvatApiJobList Get all annotations (shapes, tracks, tags) for a job - get_task_annotations(task_id, token=None) -> CvatApiTaskList Get all annotations (shapes, tracks, tags) for a task - put_job_annotations(job_id, annotations, token=None) -> CvatApiJobList Update annotations for a job - put_task_annotations(task_id, annotations, token=None) -> CvatApiTaskList Update annotations for a task **Download Methods (client.downloads.xxx)** - download_job_image(job_id, frame_number, output_path, token=None) -> None Download a single frame image from a job - download_task_image(task_id, frame_number, output_path, token=None) -> None Download a single frame image from a task - download_job_images(job_id, output_dir, token=None) -> list[Path] Download all images from a job - download_task_images(task_id, output_dir, token=None) -> list[Path] Download all images from a task - download_job_data_chunk(job_id, chunk_number, output_path, token=None) -> None Download a specific data chunk from a job - download_task_data_chunk(task_id, chunk_number, output_path, token=None) -> None Download a specific data chunk from a task - download_all_job_chunks(job_id, output_dir, token=None) -> list[Path] Download all data chunks from a job - download_all_task_chunks(task_id, output_dir, token=None) -> list[Path] Download all data chunks from a task **Label Methods (client.labels.xxx)** - get_label_details(label_id, token=None) -> CvatApiLabelDefinition Get label definition details Attributes: cvat_host: Base URL of the CVAT server cvat_username: Username for authentication cvat_password: Password for authentication cvat_organization: Organization name cvat_auth_timeout: Timeout for authentication requests in seconds cvat_api_timeout: Timeout for API requests in seconds cvat_token: Authentication token (set after initialization) Example: >>> client = CvatApiClient( ... cvat_host="https://cvat.example.com", ... cvat_username="user", ... cvat_password="pass", ... cvat_organization="my-org" ... ) >>> # Get project information >>> project = client.projects.get_project_details(12345) >>> # Get all jobs in a project >>> job_ids = client.projects.get_project_job_ids(12345) >>> # Download task annotations >>> annotations = client.annotations.get_task_annotations(67890) >>> # Download all images from a job >>> paths = client.downloads.download_job_images(111, Path("./output")) """ def __init__( self, cvat_host: str, cvat_username: str, cvat_password: str, cvat_organization: str, cvat_auth_timeout: float = 30.0, cvat_api_timeout: float = 60.0, max_retries: int = 5, initial_retry_delay: float = 1.0, max_retry_delay: float = 60.0, ): """Initialize the CVAT API client. Args: cvat_host: Base URL of the CVAT server cvat_username: Username for authentication cvat_password: Password for authentication cvat_organization: Organization name cvat_auth_timeout: Timeout for authentication requests in seconds cvat_api_timeout: Timeout for API requests in seconds max_retries: Maximum number of retry attempts for transient errors initial_retry_delay: Initial delay in seconds before first retry max_retry_delay: Maximum delay in seconds between retries """ self.cvat_host = cvat_host self.cvat_username = cvat_username self.cvat_password = cvat_password self.cvat_organization = cvat_organization self.cvat_auth_timeout = cvat_auth_timeout self.cvat_api_timeout = cvat_api_timeout self.max_retries = max_retries self.initial_retry_delay = initial_retry_delay self.max_retry_delay = max_retry_delay self.logger = logging.getLogger(__name__) # Initialize method groups self.auth = AuthMethods(self) self.projects = ProjectsMethods(self) self.tasks = TasksMethods(self) self.jobs = JobsMethods(self) self.annotations = AnnotationsMethods(self) self.downloads = DownloadsMethods(self) self.labels = LabelsMethods(self) # Authenticate self.cvat_token = self.auth.get_auth_token() self.logger.info("🔑 CVAT API initialized.") # 🔹 Helper methods for common patterns def _get_headers( self, token: str | None = None, with_organization: bool = True ) -> dict[str, str]: """Create standard headers for API requests. Args: token: Authentication token (uses self.cvat_token if None) with_organization: Whether to include X-Organization header Returns: Dictionary of HTTP headers """ headers = {"Authorization": f"Token {token or self.cvat_token}"} if with_organization: headers["X-Organization"] = self.cvat_organization return headers def _handle_response_errors( self, response: requests.Response | None, error_prefix: str ) -> None: """Handle common response errors with consistent logging. Args: response: HTTP response object (optional) error_prefix: Error message prefix for logging """ if isinstance(response, requests.Response): self.logger.error("%s: %d", error_prefix, response.status_code) response_text = json.dumps(response.text, indent=2) if response.text else 'No response text' self.logger.error("❌ Response text:\n%s", response_text) def _make_request( self, method: str, url: str, headers: dict[str, str], resource_name: str, resource_id: int | None = None, params: dict[str, Any] | None = None, json_data: dict[str, Any] | None = None, timeout: float | None = None, response_model: type | None = None, ) -> Any: """Make HTTP request with standard error handling and exponential backoff retry. Automatically retries on transient errors with exponential backoff: - Network timeouts - Connection errors - HTTP 502 (Bad Gateway), 503 (Service Unavailable), 504 (Gateway Timeout) Args: method: HTTP method (GET, POST, PUT, PATCH, DELETE) url: Request URL headers: HTTP headers resource_name: Resource name for logging resource_id: Resource ID for logging (optional) params: Query parameters (optional) json_data: JSON body data (optional) timeout: Request timeout in seconds (optional) response_model: Pydantic model for response validation (optional) Returns: Response object (Pydantic model if response_model provided, else raw response) Raises: requests.RequestException: If request fails after all retries """ timeout = timeout or self.cvat_api_timeout resource_desc = ( f"{resource_name} {resource_id}" if resource_id else resource_name ) # Transient error status codes that should trigger retry TRANSIENT_STATUS_CODES = {502, 503, 504} last_exception = None for attempt in range(self.max_retries + 1): if attempt > 0: # Calculate exponential backoff delay delay = min( self.initial_retry_delay * (2 ** (attempt - 1)), self.max_retry_delay ) self.logger.warning( "⏳ Retry %d/%d for %s after %.1fs delay...", attempt, self.max_retries, resource_desc, delay ) time.sleep(delay) # No emoji for routine API requests (only for milestones) self.logger.debug("%s request for %s (attempt %d)", method.upper(), resource_desc, attempt + 1) try: if method.upper() == "GET": response = requests.get( url, headers=headers, params=params, timeout=timeout ) elif method.upper() == "POST": response = requests.post( url, headers=headers, json=json_data, timeout=timeout ) elif method.upper() == "PUT": response = requests.put( url, headers=headers, json=json_data, timeout=timeout ) elif method.upper() == "PATCH": response = requests.patch( url, headers=headers, json=json_data, timeout=timeout ) elif method.upper() == "DELETE": response = requests.delete(url, headers=headers, timeout=timeout) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() # Parse and validate response if model provided if response_model and response.content: response_data = response.json() return response_model.model_validate(response_data) return response except requests.Timeout as e: last_exception = e self.logger.warning( "⚠️ Request timeout for %s (attempt %d/%d)", resource_desc, attempt + 1, self.max_retries + 1 ) # Continue to retry except requests.ConnectionError as e: last_exception = e self.logger.warning( "⚠️ Connection error for %s (attempt %d/%d)", resource_desc, attempt + 1, self.max_retries + 1 ) # Continue to retry except requests.HTTPError as e: last_exception = e # Only retry on transient HTTP errors if hasattr(e, "response") and e.response.status_code in TRANSIENT_STATUS_CODES: self.logger.warning( "⚠️ Transient HTTP %d error for %s (attempt %d/%d)", e.response.status_code, resource_desc, attempt + 1, self.max_retries + 1 ) # Continue to retry else: # Non-transient error, don't retry self._handle_response_errors( e.response if hasattr(e, "response") else None, f"Failed to {method.upper()} {resource_desc}", ) raise except requests.RequestException as e: # Other request exceptions - don't retry last_exception = e self._handle_response_errors( e.response if hasattr(e, "response") else None, f"Failed to {method.upper()} {resource_desc}", ) raise # All retries exhausted self.logger.error( "❌ All %d retry attempts exhausted for %s", self.max_retries + 1, resource_desc ) if last_exception: self._handle_response_errors( last_exception.response if hasattr(last_exception, "response") else None, f"Failed to {method.upper()} {resource_desc} after {self.max_retries + 1} attempts", ) raise last_exception # Should never reach here, but just in case raise RuntimeError(f"Request failed after {self.max_retries + 1} attempts")