Spaces:
Runtime error
Runtime error
GitHub Copilot
Workflows: New Video Ingestion Protocol - Dolphin Cognitive Analysis -> RJ-1 Encoding
644be9f | """ | |
| connectors.py - External API/Service Adapters | |
| Protocol 4: Autonomous Resource Integration | |
| This module isolates all external dependencies so the core engine remains pure. | |
| Each connector wraps an external API/library with a standardized interface. | |
| """ | |
| import os | |
| from typing import Optional, Dict, Any, List | |
| from dataclasses import dataclass | |
| # ========================================== | |
| # CONFIGURATION | |
| # ========================================== | |
| class ConnectorConfig: | |
| """Configuration for external connectors.""" | |
| hf_token: Optional[str] = None | |
| hf_space_id: Optional[str] = None | |
| def from_env(cls) -> 'ConnectorConfig': | |
| """Load configuration from environment variables.""" | |
| return cls( | |
| hf_token=os.environ.get('HF_TOKEN'), | |
| hf_space_id=os.environ.get('HF_SPACE_ID') | |
| ) | |
| # ========================================== | |
| # HUGGING FACE CONNECTOR | |
| # ========================================== | |
| class HuggingFaceConnector: | |
| """ | |
| Adapter for Hugging Face Hub and Inference API. | |
| Wraps huggingface_hub for model loading and inference. | |
| """ | |
| def __init__(self, config: ConnectorConfig = None): | |
| self.config = config or ConnectorConfig.from_env() | |
| self._client = None | |
| def _ensure_client(self): | |
| """Lazy initialization of HF client.""" | |
| if self._client is None: | |
| try: | |
| from huggingface_hub import InferenceClient | |
| self._client = InferenceClient(token=self.config.hf_token, base_url="https://router.huggingface.co") | |
| except ImportError: | |
| raise ImportError("huggingface_hub not installed. Run: pip install huggingface_hub") | |
| return self._client | |
| def image_to_text(self, image_path: str, model: str = "Salesforce/blip-image-captioning-base") -> str: | |
| """ | |
| Generate text description from image using HF Inference API. | |
| Args: | |
| image_path: Path to image file | |
| model: HF model ID for image captioning | |
| Returns: | |
| Generated text description | |
| """ | |
| client = self._ensure_client() | |
| with open(image_path, 'rb') as f: | |
| result = client.image_to_text(f.read(), model=model) | |
| return result | |
| def text_generation(self, prompt: str, model: str = "gpt2", max_length: int = 100) -> str: | |
| """ | |
| Generate text from prompt using HF Inference API. | |
| Args: | |
| prompt: Input text prompt | |
| model: HF model ID for text generation | |
| max_length: Maximum output length | |
| Returns: | |
| Generated text | |
| """ | |
| client = self._ensure_client() | |
| result = client.text_generation(prompt, model=model, max_new_tokens=max_length) | |
| return result | |
| # ========================================== | |
| # OCR CONNECTOR | |
| # ========================================== | |
| class OCRConnector: | |
| """ | |
| Adapter for Optical Character Recognition via Local Vision Model. | |
| Uses 'google/gemma-3-4b' (or configured local model) to transcribe text from images. | |
| """ | |
| def __init__(self, languages: List[str] = None, gpu: bool = False): | |
| # We rely on the local LLM connector, 'gpu' arg is ignored as it's handled by LM Studio | |
| # Hardcoded to Gemma as requested by user ("gemma is your vision model") | |
| self.client = get_connector('local', model="google/gemma-3-4b") | |
| def extract_text(self, image_path: str) -> Dict[str, Any]: | |
| """ | |
| Extract text from image using Vision Model. | |
| """ | |
| try: | |
| prompt = "Extract and transcribe all visible text from this image exactly as it appears. Return only the text." | |
| full_text, _ = self.client.chat(message=prompt, image_path=image_path) | |
| # Simple heuristic for word count | |
| word_count = len(full_text.split()) | |
| return { | |
| "text_blocks": [], # VLM doesn't give bounding boxes easily | |
| "full_text": full_text, | |
| "word_count": word_count | |
| } | |
| except Exception as e: | |
| return { | |
| "text_blocks": [], | |
| "full_text": f"[OCR ERROR] Vision Model Failed: {e}", | |
| "word_count": 0 | |
| } | |
| # ========================================== | |
| # VISION CONNECTOR (Future: Multi-modal) | |
| # ========================================== | |
| class VisionConnector: | |
| """ | |
| Adapter for computer vision operations. | |
| Wraps OpenCV and scikit-image. | |
| """ | |
| def calculate_ssim(image1_path: str, image2_path: str) -> float: | |
| """ | |
| Calculate Structural Similarity Index between two images. | |
| Uses scikit-image for accurate SSIM calculation. | |
| Args: | |
| image1_path: Path to first image | |
| image2_path: Path to second image | |
| Returns: | |
| SSIM score (0-1, higher is better) | |
| """ | |
| try: | |
| import cv2 | |
| from skimage.metrics import structural_similarity as ssim | |
| img1 = cv2.imread(image1_path) | |
| img2 = cv2.imread(image2_path) | |
| # Resize if needed | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| # Convert to grayscale for SSIM | |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) | |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) | |
| return ssim(gray1, gray2) | |
| except ImportError as e: | |
| raise ImportError(f"Required library not installed: {e}") | |
| def analyze_entropy(image_path: str) -> Dict[str, float]: | |
| """ | |
| Analyze image entropy (information density). | |
| Args: | |
| image_path: Path to image file | |
| Returns: | |
| Dict with entropy metrics | |
| """ | |
| try: | |
| import cv2 | |
| import numpy as np | |
| from skimage.measure import shannon_entropy | |
| img = cv2.imread(image_path) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Calculate entropy | |
| entropy = shannon_entropy(gray) | |
| # Calculate histogram entropy | |
| hist = cv2.calcHist([gray], [0], None, [256], [0, 256]) | |
| hist = hist.flatten() / hist.sum() | |
| hist_entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0])) | |
| return { | |
| "shannon_entropy": entropy, | |
| "histogram_entropy": hist_entropy, | |
| "mean_intensity": float(np.mean(gray)), | |
| "std_intensity": float(np.std(gray)) | |
| } | |
| except ImportError as e: | |
| raise ImportError(f"Required library not installed: {e}") | |
| # ========================================== | |
| # DOLPHIN AGENT CONNECTOR (HF Inference) | |
| # ========================================== | |
| class DolphinAgentConnector: | |
| """ | |
| Adapter for Dolphin AI (via Hugging Face Inference). | |
| Replaces NeMo/OpenAI dependency with open weights. | |
| """ | |
| def __init__(self, model: str = "cognitivecomputations/dolphin-2.9-llama3-8b"): | |
| self.model = model | |
| self.config = ConnectorConfig.from_env() | |
| self._client = None | |
| def _ensure_client(self): | |
| """Lazy initialization of HF Inference Client.""" | |
| if self._client is None: | |
| try: | |
| from huggingface_hub import InferenceClient | |
| self._client = InferenceClient(token=self.config.hf_token, base_url="https://router.huggingface.co") | |
| except ImportError: | |
| raise ImportError("huggingface_hub not installed.") | |
| return self._client | |
| def chat(self, message: str, system_prompt: str = None) -> str: | |
| """ | |
| Chat with Dolphin agent. | |
| """ | |
| try: | |
| client = self._ensure_client() | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({"role": "user", "content": message}) | |
| # Using basic text generation if chat template fails, but try chat first | |
| # Many HF models support chat_completion API via InferenceClient | |
| try: | |
| response = client.chat_completion( | |
| messages=messages, | |
| model=self.model, | |
| max_tokens=500 | |
| ) | |
| return response.choices[0].message.content, response.choices[0].get('logprobs') | |
| except Exception: | |
| # Fallback to text generation | |
| prompt = f"<|im_start|>user\n{message}<|im_end|>\n<|im_start|>assistant\n" | |
| res = client.text_generation(prompt, model=self.model, max_new_tokens=500) | |
| return res, None | |
| except Exception as e: | |
| return f"[Dolphin Error] {e}", None | |
| def analyze_diagram(self, image_path: str, prompt: str = "Describe this architectural diagram.") -> str: | |
| """ | |
| Analyze diagram using visual model (fallback to simple captioning if Dolphin is text-only). | |
| """ | |
| try: | |
| # Dolphin is text-only usually. Route to a Vision model. | |
| from .connectors import get_connector # lazy import | |
| hf = get_connector('hf') | |
| return hf.image_to_text(image_path) | |
| except Exception as e: | |
| return f"[Vision Error] {e}" | |
| # ========================================== | |
| # LOCAL LLM CONNECTOR (Ollama/LM Studio) | |
| # ========================================== | |
| class LocalLLMConnector: | |
| """ | |
| Adapter for Local Inference (Ollama / LM Studio). | |
| Uses OpenAI-compatible endpoint structure. | |
| Optimization: Direct localhost access (no Docker bridge lag). | |
| """ | |
| def __init__(self, base_url: str = None, model: str = "dolphin-x1-8b"): | |
| # Prioritize Environment -> Argument -> Default | |
| env_url = os.environ.get("LOGOS_LLM_ENDPOINT") | |
| self.base_url = base_url or env_url or "http://localhost:1234/v1" | |
| self.model = model | |
| async def chat_async(self, message: str, system_prompt: str = None, model: str = None, **kwargs): | |
| """ | |
| Asynchronous chat with local model via aiohttp. | |
| Supports extra params via kwargs (e.g., max_tokens, temperature). | |
| """ | |
| import aiohttp | |
| import json | |
| target_model = model or self.model | |
| payload = { | |
| "model": target_model, | |
| "messages": [], | |
| "temperature": 0.7, | |
| "stream": False | |
| } | |
| if system_prompt: | |
| payload["messages"].append({"role": "system", "content": system_prompt}) | |
| payload["messages"].append({"role": "user", "content": message}) | |
| payload["logprobs"] = True | |
| payload["top_logprobs"] = 1 | |
| # Merge extra args (e.g. max_tokens) | |
| payload.update(kwargs) | |
| endpoint = f"{self.base_url}/chat/completions" | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(endpoint, json=payload, timeout=30) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| content = data['choices'][0]['message'].get('content', "") | |
| logprobs = data['choices'][0].get('logprobs') | |
| return content, logprobs | |
| else: | |
| return f"[Error] Local LLM returned status {response.status}", None | |
| except Exception as e: | |
| return f"[Async Local LLM Error] {e}", None | |
| def chat(self, message: str, system_prompt: str = None, model: str = None, image_path: str = None) -> str: | |
| """ | |
| Chat with local model via requests. Supports Vision if image_path is provided. | |
| Auto-detects Docker host. | |
| """ | |
| import requests | |
| import json | |
| import base64 | |
| import os | |
| # Helper to encode image | |
| def encode_image(path): | |
| with open(path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode('utf-8') | |
| # Potential endpoints to try | |
| endpoints = [self.base_url] | |
| if "localhost" in self.base_url: | |
| endpoints.append(self.base_url.replace("localhost", "host.docker.internal")) | |
| # Use instance default if no specific model requested | |
| target_model = model or self.model | |
| payload = { | |
| "model": target_model, | |
| "messages": [], | |
| "temperature": 0.7, | |
| "stream": False, | |
| "logprobs": True, | |
| "top_logprobs": 1 | |
| } | |
| if system_prompt: | |
| payload["messages"].append({"role": "system", "content": system_prompt}) | |
| if image_path and os.path.exists(image_path): | |
| # Format message for Vision API (OpenAI compatible) | |
| base64_image = encode_image(image_path) | |
| user_content = [ | |
| {"type": "text", "text": message}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{base64_image}" | |
| } | |
| } | |
| ] | |
| payload["messages"].append({"role": "user", "content": user_content}) | |
| else: | |
| # Standard Text Chat | |
| payload["messages"].append({"role": "user", "content": message}) | |
| last_error = "" | |
| for base in endpoints: | |
| endpoint = f"{base}/chat/completions" | |
| try: | |
| # Increased timeout for complex/vision tasks | |
| response = requests.post(endpoint, json=payload, timeout=30) | |
| response.raise_for_status() | |
| if response.status_code == 200: | |
| data = response.json() | |
| content = data['choices'][0]['message'].get('content', "") | |
| logprobs = data['choices'][0].get('logprobs') | |
| return content, logprobs | |
| except Exception as e: | |
| last_error = str(e) | |
| continue | |
| return f"[Local LLM Error] Could not connect to Local Swarm on {endpoints}. Is LM Studio running? ({last_error})", None | |
| # ========================================== | |
| # BROWSER AUTOMATION CONNECTOR (smolagents) | |
| # ========================================== | |
| class BrowserAutomationConnector: | |
| """ | |
| Adapter for HuggingFace smolagents + helium for browser automation. | |
| Enables autonomous web navigation, form filling, and data extraction. | |
| """ | |
| def __init__(self, headless: bool = True): | |
| self.headless = headless | |
| self._driver = None | |
| self._agent = None | |
| def _ensure_browser(self): | |
| """Initialize browser with helium.""" | |
| if self._driver is None: | |
| try: | |
| from helium import start_chrome, go_to, click, write, get_driver | |
| from selenium.webdriver.chrome.options import Options | |
| options = Options() | |
| if self.headless: | |
| options.add_argument('--headless') | |
| options.add_argument('--no-sandbox') | |
| options.add_argument('--disable-dev-shm-usage') | |
| start_chrome(headless=self.headless) | |
| self._driver = get_driver() | |
| except ImportError: | |
| raise ImportError("helium/selenium not installed. Run: pip install helium selenium") | |
| return self._driver | |
| def navigate(self, url: str) -> str: | |
| """Navigate to a URL and return page title.""" | |
| try: | |
| from helium import go_to, get_driver | |
| self._ensure_browser() | |
| go_to(url) | |
| return f"Navigated to: {get_driver().title}" | |
| except Exception as e: | |
| return f"[Navigation Error] {e}" | |
| def search_page(self, query: str) -> str: | |
| """Search for text on current page.""" | |
| try: | |
| driver = self._ensure_browser() | |
| page_source = driver.page_source.lower() | |
| if query.lower() in page_source: | |
| return f"Found '{query}' on page" | |
| else: | |
| return f"'{query}' not found on page" | |
| except Exception as e: | |
| return f"[Search Error] {e}" | |
| def get_page_text(self) -> str: | |
| """Extract visible text from current page.""" | |
| try: | |
| driver = self._ensure_browser() | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(driver.page_source, 'html.parser') | |
| text = soup.get_text(separator=' ', strip=True) | |
| return text[:5000] # Limit to 5k chars | |
| except Exception as e: | |
| return f"[Extraction Error] {e}" | |
| def click_element(self, text: str) -> str: | |
| """Click an element by its text.""" | |
| try: | |
| from helium import click | |
| self._ensure_browser() | |
| click(text) | |
| return f"Clicked: {text}" | |
| except Exception as e: | |
| return f"[Click Error] {e}" | |
| def type_text(self, field: str, text: str) -> str: | |
| """Type text into a field.""" | |
| try: | |
| from helium import write | |
| self._ensure_browser() | |
| write(text, into=field) | |
| return f"Typed into {field}" | |
| except Exception as e: | |
| return f"[Type Error] {e}" | |
| def close(self): | |
| """Close the browser.""" | |
| try: | |
| from helium import kill_browser | |
| kill_browser() | |
| self._driver = None | |
| except: | |
| pass | |
| def run_agent_task(self, task: str, model: str = "Qwen/Qwen2.5-Coder-32B-Instruct") -> str: | |
| """ | |
| Run a browser automation task using smolagents Code Agent. | |
| Args: | |
| task: Natural language task description | |
| model: HF model for the code agent | |
| Returns: | |
| Task result | |
| """ | |
| try: | |
| from smolagents import CodeAgent, HfApiModel | |
| from smolagents.tools import Tool | |
| # Create agent with browser tools | |
| model_instance = HfApiModel(model) | |
| agent = CodeAgent(tools=[], model=model_instance) | |
| # Run the task | |
| result = agent.run(task) | |
| return str(result) | |
| except ImportError: | |
| return "[Error] smolagents not installed. Run: pip install smolagents" | |
| except Exception as e: | |
| return f"[Agent Error] {e}" | |
| # ========================================== | |
| # FACTORY | |
| # ========================================== | |
| def get_connector(connector_type: str, **kwargs) -> Any: | |
| """ | |
| Factory function for connectors. | |
| Args: | |
| connector_type: One of 'hf', 'ocr', 'vision', 'nemo', 'browser' | |
| **kwargs: Connector-specific arguments | |
| Returns: | |
| Initialized connector instance | |
| """ | |
| connectors = { | |
| 'hf': HuggingFaceConnector, | |
| 'ocr': OCRConnector, | |
| 'vision': VisionConnector, | |
| 'dolphin': DolphinAgentConnector, | |
| 'browser': BrowserAutomationConnector, | |
| 'local': LocalLLMConnector | |
| } | |
| if connector_type not in connectors: | |
| raise ValueError(f"Unknown connector type: {connector_type}. Available: {list(connectors.keys())}") | |
| return connectors[connector_type](**kwargs) | |
| # ========================================== | |
| # REGISTRY (For Protocol 4 Discovery) | |
| # ========================================== | |
| AVAILABLE_CONNECTORS = { | |
| 'hf': { | |
| 'name': 'Hugging Face', | |
| 'capabilities': ['image_to_text', 'text_generation'], | |
| 'requires': ['huggingface_hub'], | |
| 'env_vars': ['HF_TOKEN'] | |
| }, | |
| 'ocr': { | |
| 'name': 'EasyOCR', | |
| 'capabilities': ['extract_text'], | |
| 'requires': ['easyocr'], | |
| 'env_vars': [] | |
| }, | |
| 'vision': { | |
| 'name': 'Vision (OpenCV/scikit-image)', | |
| 'capabilities': ['calculate_ssim', 'analyze_entropy'], | |
| 'requires': ['opencv-python-headless', 'scikit-image'], | |
| 'env_vars': [] | |
| }, | |
| 'dolphin': { | |
| 'name': 'Dolphin AI (HF Inference)', | |
| 'capabilities': ['chat', 'analyze_diagram'], | |
| 'requires': ['huggingface_hub'], | |
| 'env_vars': ['HF_TOKEN'] | |
| }, | |
| 'browser': { | |
| 'name': 'Browser Automation (smolagents)', | |
| 'capabilities': ['navigate', 'click_element', 'type_text', 'get_page_text', 'run_agent_task'], | |
| 'requires': ['smolagents', 'helium', 'selenium', 'beautifulsoup4'], | |
| 'env_vars': [] | |
| }, | |
| 'local': { | |
| 'name': 'Local LLM (Ollama/LM Studio)', | |
| 'capabilities': ['chat'], | |
| 'requires': ['requests'], | |
| 'env_vars': [] | |
| } | |
| } | |
| # ========================================== | |
| # CLI / TESTING | |
| # ========================================== | |
| def main(): | |
| """ | |
| CLI for testing connectors directly. | |
| Usage: python -m logos.connectors --test local --model google/gemma-3-4b | |
| """ | |
| import argparse | |
| import sys | |
| parser = argparse.ArgumentParser(description="LOGOS Connectors Utilities") | |
| parser.add_argument("--test", choices=list(AVAILABLE_CONNECTORS.keys()), help="Connector to test") | |
| parser.add_argument("--model", help="Model name for HF or Local connector") | |
| parser.add_argument("--prompt", default="Hello, are you online?", help="Prompt to send") | |
| parser.add_argument("--image", help="Path to image for OCR or Vision test") | |
| args = parser.parse_args() | |
| if not args.test: | |
| print("Available Connectors:") | |
| for k, v in AVAILABLE_CONNECTORS.items(): | |
| print(f" - {k:<10} : {v['name']}") | |
| print("\nRun with --test <name> to verify a connection.") | |
| return | |
| print(f"--- Testing Connector: {args.test.upper()} ---") | |
| try: | |
| if args.test == 'local': | |
| # Local LLM / Vision Test | |
| model = args.model or "local-model" | |
| print(f"Targeting Model: {model}") | |
| client = get_connector('local', model=model) | |
| if args.image: | |
| print(f"Sending Vision Request with {args.image}...") | |
| resp = client.chat(args.prompt, image_path=args.image) | |
| else: | |
| print(f"Sending Chat Request: '{args.prompt}'...") | |
| resp = client.chat(args.prompt) | |
| print(f"\n[RESPONSE]\n{resp}") | |
| elif args.test == 'ocr': | |
| # OCR Test (via Vision) | |
| if not args.image: | |
| print("Error: --image argument required for OCR test.") | |
| return | |
| client = get_connector('ocr') | |
| print(f"Extracting text from {args.image}...") | |
| res = client.extract_text(args.image) | |
| print(f"\n[RESULT]\n{res['full_text']}") | |
| elif args.test == 'hf': | |
| # Hugging Face Test | |
| client = get_connector('hf') | |
| if args.image: | |
| # Image Captioning | |
| resp = client.image_to_text(args.image) | |
| else: | |
| # Text Gen | |
| resp = client.text_generation(args.prompt) | |
| print(f"\n[RESPONSE]\n{resp}") | |
| else: | |
| print(f"Test CLI not yet implemented for {args.test}. Import and use in Python.") | |
| except Exception as e: | |
| print(f"\n[FAIL] {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| main() | |