Spaces:
Running
Running
| """ | |
| API client for Open Notebook API. | |
| This module provides a client interface to interact with the Open Notebook API. | |
| """ | |
| import os | |
| from typing import Any, Dict, List, Optional, Union | |
| import httpx | |
| from loguru import logger | |
| class APIClient: | |
| """Client for Open Notebook API.""" | |
| def __init__(self, base_url: Optional[str] = None): | |
| self.base_url = base_url or os.getenv("API_BASE_URL", "http://127.0.0.1:5055") | |
| # Timeout increased to 5 minutes (300s) to accommodate slow LLM operations | |
| # (transformations, insights) on slower hardware (Ollama, LM Studio, remote APIs) | |
| # Configurable via API_CLIENT_TIMEOUT environment variable (in seconds) | |
| timeout_str = os.getenv("API_CLIENT_TIMEOUT", "300.0") | |
| try: | |
| timeout_value = float(timeout_str) | |
| # Validate timeout is within reasonable bounds (30s - 3600s / 1 hour) | |
| if timeout_value < 30: | |
| logger.warning(f"API_CLIENT_TIMEOUT={timeout_value}s is too low, using minimum of 30s") | |
| timeout_value = 30.0 | |
| elif timeout_value > 3600: | |
| logger.warning(f"API_CLIENT_TIMEOUT={timeout_value}s is too high, using maximum of 3600s") | |
| timeout_value = 3600.0 | |
| self.timeout = timeout_value | |
| except ValueError: | |
| logger.error(f"Invalid API_CLIENT_TIMEOUT value '{timeout_str}', using default 300s") | |
| self.timeout = 300.0 | |
| # Add authentication header if password is set | |
| self.headers = {} | |
| password = os.getenv("OPEN_NOTEBOOK_PASSWORD") | |
| if password: | |
| self.headers["Authorization"] = f"Bearer {password}" | |
| def _make_request( | |
| self, method: str, endpoint: str, timeout: Optional[float] = None, **kwargs | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Make HTTP request to the API.""" | |
| url = f"{self.base_url}{endpoint}" | |
| request_timeout = timeout if timeout is not None else self.timeout | |
| # Merge headers | |
| headers = kwargs.get("headers", {}) | |
| headers.update(self.headers) | |
| kwargs["headers"] = headers | |
| try: | |
| with httpx.Client(timeout=request_timeout) as client: | |
| response = client.request(method, url, **kwargs) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.RequestError as e: | |
| logger.error(f"Request error for {method} {url}: {str(e)}") | |
| raise ConnectionError(f"Failed to connect to API: {str(e)}") | |
| except httpx.HTTPStatusError as e: | |
| logger.error( | |
| f"HTTP error {e.response.status_code} for {method} {url}: {e.response.text}" | |
| ) | |
| raise RuntimeError( | |
| f"API request failed: {e.response.status_code} - {e.response.text}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error for {method} {url}: {str(e)}") | |
| raise | |
| # Notebooks API methods | |
| def get_notebooks( | |
| self, archived: Optional[bool] = None, order_by: str = "updated desc" | |
| ) -> List[Dict[Any, Any]]: | |
| """Get all notebooks.""" | |
| params: Dict[str, Any] = {"order_by": order_by} | |
| if archived is not None: | |
| params["archived"] = str(archived).lower() | |
| result = self._make_request("GET", "/api/notebooks", params=params) | |
| return result if isinstance(result, list) else [result] | |
| def create_notebook(self, name: str, description: str = "") -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Create a new notebook.""" | |
| data = {"name": name, "description": description} | |
| return self._make_request("POST", "/api/notebooks", json=data) | |
| def get_notebook(self, notebook_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get a specific notebook.""" | |
| return self._make_request("GET", f"/api/notebooks/{notebook_id}") | |
| def update_notebook(self, notebook_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Update a notebook.""" | |
| return self._make_request("PUT", f"/api/notebooks/{notebook_id}", json=updates) | |
| def delete_notebook(self, notebook_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Delete a notebook.""" | |
| return self._make_request("DELETE", f"/api/notebooks/{notebook_id}") | |
| # Search API methods | |
| def search( | |
| self, | |
| query: str, | |
| search_type: str = "text", | |
| limit: int = 100, | |
| search_sources: bool = True, | |
| search_notes: bool = True, | |
| minimum_score: float = 0.2, | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Search the knowledge base.""" | |
| data = { | |
| "query": query, | |
| "type": search_type, | |
| "limit": limit, | |
| "search_sources": search_sources, | |
| "search_notes": search_notes, | |
| "minimum_score": minimum_score, | |
| } | |
| return self._make_request("POST", "/api/search", json=data) | |
| def ask_simple( | |
| self, | |
| question: str, | |
| strategy_model: str, | |
| answer_model: str, | |
| final_answer_model: str, | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Ask the knowledge base a question (simple, non-streaming).""" | |
| data = { | |
| "question": question, | |
| "strategy_model": strategy_model, | |
| "answer_model": answer_model, | |
| "final_answer_model": final_answer_model, | |
| } | |
| # Use configured timeout for long-running ask operations | |
| return self._make_request( | |
| "POST", "/api/search/ask/simple", json=data, timeout=self.timeout | |
| ) | |
| # Models API methods | |
| def get_models(self, model_type: Optional[str] = None) -> List[Dict[Any, Any]]: | |
| """Get all models with optional type filtering.""" | |
| params = {} | |
| if model_type: | |
| params["type"] = model_type | |
| result = self._make_request("GET", "/api/models", params=params) | |
| return result if isinstance(result, list) else [result] | |
| def create_model(self, name: str, provider: str, model_type: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Create a new model.""" | |
| data = { | |
| "name": name, | |
| "provider": provider, | |
| "type": model_type, | |
| } | |
| return self._make_request("POST", "/api/models", json=data) | |
| def delete_model(self, model_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Delete a model.""" | |
| return self._make_request("DELETE", f"/api/models/{model_id}") | |
| def get_default_models(self) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get default model assignments.""" | |
| return self._make_request("GET", "/api/models/defaults") | |
| def update_default_models(self, **defaults) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Update default model assignments.""" | |
| return self._make_request("PUT", "/api/models/defaults", json=defaults) | |
| # Transformations API methods | |
| def get_transformations(self) -> List[Dict[Any, Any]]: | |
| """Get all transformations.""" | |
| result = self._make_request("GET", "/api/transformations") | |
| return result if isinstance(result, list) else [result] | |
| def create_transformation( | |
| self, | |
| name: str, | |
| title: str, | |
| description: str, | |
| prompt: str, | |
| apply_default: bool = False, | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Create a new transformation.""" | |
| data = { | |
| "name": name, | |
| "title": title, | |
| "description": description, | |
| "prompt": prompt, | |
| "apply_default": apply_default, | |
| } | |
| return self._make_request("POST", "/api/transformations", json=data) | |
| def get_transformation(self, transformation_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get a specific transformation.""" | |
| return self._make_request("GET", f"/api/transformations/{transformation_id}") | |
| def update_transformation(self, transformation_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Update a transformation.""" | |
| return self._make_request( | |
| "PUT", f"/api/transformations/{transformation_id}", json=updates | |
| ) | |
| def delete_transformation(self, transformation_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Delete a transformation.""" | |
| return self._make_request("DELETE", f"/api/transformations/{transformation_id}") | |
| def execute_transformation( | |
| self, transformation_id: str, input_text: str, model_id: str | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Execute a transformation on input text.""" | |
| data = { | |
| "transformation_id": transformation_id, | |
| "input_text": input_text, | |
| "model_id": model_id, | |
| } | |
| # Use configured timeout for transformation operations | |
| return self._make_request( | |
| "POST", "/api/transformations/execute", json=data, timeout=self.timeout | |
| ) | |
| # Notes API methods | |
| def get_notes(self, notebook_id: Optional[str] = None) -> List[Dict[Any, Any]]: | |
| """Get all notes with optional notebook filtering.""" | |
| params = {} | |
| if notebook_id: | |
| params["notebook_id"] = notebook_id | |
| result = self._make_request("GET", "/api/notes", params=params) | |
| return result if isinstance(result, list) else [result] | |
| def create_note( | |
| self, | |
| content: str, | |
| title: Optional[str] = None, | |
| note_type: str = "human", | |
| notebook_id: Optional[str] = None, | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Create a new note.""" | |
| data = { | |
| "content": content, | |
| "note_type": note_type, | |
| } | |
| if title: | |
| data["title"] = title | |
| if notebook_id: | |
| data["notebook_id"] = notebook_id | |
| return self._make_request("POST", "/api/notes", json=data) | |
| def get_note(self, note_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get a specific note.""" | |
| return self._make_request("GET", f"/api/notes/{note_id}") | |
| def update_note(self, note_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Update a note.""" | |
| return self._make_request("PUT", f"/api/notes/{note_id}", json=updates) | |
| def delete_note(self, note_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Delete a note.""" | |
| return self._make_request("DELETE", f"/api/notes/{note_id}") | |
| # Embedding API methods | |
| def embed_content(self, item_id: str, item_type: str, async_processing: bool = False) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Embed content for vector search.""" | |
| data = { | |
| "item_id": item_id, | |
| "item_type": item_type, | |
| "async_processing": async_processing, | |
| } | |
| # Use configured timeout for embedding operations | |
| return self._make_request("POST", "/api/embed", json=data, timeout=self.timeout) | |
| def rebuild_embeddings( | |
| self, | |
| mode: str = "existing", | |
| include_sources: bool = True, | |
| include_notes: bool = True, | |
| include_insights: bool = True | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Rebuild embeddings in bulk. | |
| Note: This operation can take a long time for large databases. | |
| Consider increasing API_CLIENT_TIMEOUT to 600-900s for bulk rebuilds. | |
| """ | |
| data = { | |
| "mode": mode, | |
| "include_sources": include_sources, | |
| "include_notes": include_notes, | |
| "include_insights": include_insights, | |
| } | |
| # Use double the configured timeout for bulk rebuild operations (or configured value if already high) | |
| rebuild_timeout = max(self.timeout, min(self.timeout * 2, 3600.0)) | |
| return self._make_request("POST", "/api/embeddings/rebuild", json=data, timeout=rebuild_timeout) | |
| def get_rebuild_status(self, command_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get status of a rebuild operation.""" | |
| return self._make_request("GET", f"/api/embeddings/rebuild/{command_id}/status") | |
| # Settings API methods | |
| def get_settings(self) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get all application settings.""" | |
| return self._make_request("GET", "/api/settings") | |
| def update_settings(self, **settings) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Update application settings.""" | |
| return self._make_request("PUT", "/api/settings", json=settings) | |
| # Context API methods | |
| def get_notebook_context( | |
| self, notebook_id: str, context_config: Optional[Dict] = None | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get context for a notebook.""" | |
| data: Dict[str, Any] = {"notebook_id": notebook_id} | |
| if context_config: | |
| data["context_config"] = context_config | |
| result = self._make_request( | |
| "POST", f"/api/notebooks/{notebook_id}/context", json=data | |
| ) | |
| return result if isinstance(result, dict) else {} | |
| # Sources API methods | |
| def get_sources(self, notebook_id: Optional[str] = None) -> List[Dict[Any, Any]]: | |
| """Get all sources with optional notebook filtering.""" | |
| params = {} | |
| if notebook_id: | |
| params["notebook_id"] = notebook_id | |
| result = self._make_request("GET", "/api/sources", params=params) | |
| return result if isinstance(result, list) else [result] | |
| def create_source( | |
| self, | |
| notebook_id: Optional[str] = None, | |
| notebooks: Optional[List[str]] = None, | |
| source_type: str = "text", | |
| url: Optional[str] = None, | |
| file_path: Optional[str] = None, | |
| content: Optional[str] = None, | |
| title: Optional[str] = None, | |
| transformations: Optional[List[str]] = None, | |
| embed: bool = False, | |
| delete_source: bool = False, | |
| async_processing: bool = False, | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Create a new source.""" | |
| data = { | |
| "type": source_type, | |
| "embed": embed, | |
| "delete_source": delete_source, | |
| "async_processing": async_processing, | |
| } | |
| # Handle backward compatibility for notebook_id vs notebooks | |
| if notebooks: | |
| data["notebooks"] = notebooks | |
| elif notebook_id: | |
| data["notebook_id"] = notebook_id | |
| else: | |
| raise ValueError("Either notebook_id or notebooks must be provided") | |
| if url: | |
| data["url"] = url | |
| if file_path: | |
| data["file_path"] = file_path | |
| if content: | |
| data["content"] = content | |
| if title: | |
| data["title"] = title | |
| if transformations: | |
| data["transformations"] = transformations | |
| # Use configured timeout for source creation (especially PDF processing with OCR) | |
| return self._make_request("POST", "/api/sources/json", json=data, timeout=self.timeout) | |
| def get_source(self, source_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get a specific source.""" | |
| return self._make_request("GET", f"/api/sources/{source_id}") | |
| def get_source_status(self, source_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get processing status for a source.""" | |
| return self._make_request("GET", f"/api/sources/{source_id}/status") | |
| def update_source(self, source_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Update a source.""" | |
| return self._make_request("PUT", f"/api/sources/{source_id}", json=updates) | |
| def delete_source(self, source_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Delete a source.""" | |
| return self._make_request("DELETE", f"/api/sources/{source_id}") | |
| # Insights API methods | |
| def get_source_insights(self, source_id: str) -> List[Dict[Any, Any]]: | |
| """Get all insights for a specific source.""" | |
| result = self._make_request("GET", f"/api/sources/{source_id}/insights") | |
| return result if isinstance(result, list) else [result] | |
| def get_insight(self, insight_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get a specific insight.""" | |
| return self._make_request("GET", f"/api/insights/{insight_id}") | |
| def delete_insight(self, insight_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Delete a specific insight.""" | |
| return self._make_request("DELETE", f"/api/insights/{insight_id}") | |
| def save_insight_as_note( | |
| self, insight_id: str, notebook_id: Optional[str] = None | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Convert an insight to a note.""" | |
| data = {} | |
| if notebook_id: | |
| data["notebook_id"] = notebook_id | |
| return self._make_request( | |
| "POST", f"/api/insights/{insight_id}/save-as-note", json=data | |
| ) | |
| def create_source_insight( | |
| self, source_id: str, transformation_id: str, model_id: Optional[str] = None | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Create a new insight for a source by running a transformation.""" | |
| data = {"transformation_id": transformation_id} | |
| if model_id: | |
| data["model_id"] = model_id | |
| return self._make_request( | |
| "POST", f"/api/sources/{source_id}/insights", json=data | |
| ) | |
| # Episode Profiles API methods | |
| def get_episode_profiles(self) -> List[Dict[Any, Any]]: | |
| """Get all episode profiles.""" | |
| result = self._make_request("GET", "/api/episode-profiles") | |
| return result if isinstance(result, list) else [result] | |
| def get_episode_profile(self, profile_name: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Get a specific episode profile by name.""" | |
| return self._make_request("GET", f"/api/episode-profiles/{profile_name}") | |
| def create_episode_profile( | |
| self, | |
| name: str, | |
| description: str = "", | |
| speaker_config: str = "", | |
| outline_provider: str = "", | |
| outline_model: str = "", | |
| transcript_provider: str = "", | |
| transcript_model: str = "", | |
| default_briefing: str = "", | |
| num_segments: int = 5, | |
| ) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Create a new episode profile.""" | |
| data = { | |
| "name": name, | |
| "description": description, | |
| "speaker_config": speaker_config, | |
| "outline_provider": outline_provider, | |
| "outline_model": outline_model, | |
| "transcript_provider": transcript_provider, | |
| "transcript_model": transcript_model, | |
| "default_briefing": default_briefing, | |
| "num_segments": num_segments, | |
| } | |
| return self._make_request("POST", "/api/episode-profiles", json=data) | |
| def update_episode_profile(self, profile_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Update an episode profile.""" | |
| return self._make_request("PUT", f"/api/episode-profiles/{profile_id}", json=updates) | |
| def delete_episode_profile(self, profile_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]: | |
| """Delete an episode profile.""" | |
| return self._make_request("DELETE", f"/api/episode-profiles/{profile_id}") | |
| # Global client instance | |
| api_client = APIClient() | |