""" connectors.py - External API/Service Adapters Protocol 4: Autonomous Resource Integration This module isolates all external dependencies so the core engine remains pure. Each connector wraps an external API/library with a standardized interface. """ import os from typing import Optional, Dict, Any, List from dataclasses import dataclass # ========================================== # CONFIGURATION # ========================================== @dataclass class ConnectorConfig: """Configuration for external connectors.""" hf_token: Optional[str] = None hf_space_id: Optional[str] = None @classmethod def from_env(cls) -> 'ConnectorConfig': """Load configuration from environment variables.""" return cls( hf_token=os.environ.get('HF_TOKEN'), hf_space_id=os.environ.get('HF_SPACE_ID') ) # ========================================== # HUGGING FACE CONNECTOR # ========================================== class HuggingFaceConnector: """ Adapter for Hugging Face Hub and Inference API. Wraps huggingface_hub for model loading and inference. """ def __init__(self, config: ConnectorConfig = None): self.config = config or ConnectorConfig.from_env() self._client = None def _ensure_client(self): """Lazy initialization of HF client.""" if self._client is None: try: from huggingface_hub import InferenceClient self._client = InferenceClient(token=self.config.hf_token, base_url="https://router.huggingface.co") except ImportError: raise ImportError("huggingface_hub not installed. Run: pip install huggingface_hub") return self._client def image_to_text(self, image_path: str, model: str = "Salesforce/blip-image-captioning-base") -> str: """ Generate text description from image using HF Inference API. Args: image_path: Path to image file model: HF model ID for image captioning Returns: Generated text description """ client = self._ensure_client() with open(image_path, 'rb') as f: result = client.image_to_text(f.read(), model=model) return result def text_generation(self, prompt: str, model: str = "gpt2", max_length: int = 100) -> str: """ Generate text from prompt using HF Inference API. Args: prompt: Input text prompt model: HF model ID for text generation max_length: Maximum output length Returns: Generated text """ client = self._ensure_client() result = client.text_generation(prompt, model=model, max_new_tokens=max_length) return result # ========================================== # OCR CONNECTOR # ========================================== class OCRConnector: """ Adapter for Optical Character Recognition via Local Vision Model. Uses 'google/gemma-3-4b' (or configured local model) to transcribe text from images. """ def __init__(self, languages: List[str] = None, gpu: bool = False): # We rely on the local LLM connector, 'gpu' arg is ignored as it's handled by LM Studio # Hardcoded to Gemma as requested by user ("gemma is your vision model") self.client = get_connector('local', model="google/gemma-3-4b") def extract_text(self, image_path: str) -> Dict[str, Any]: """ Extract text from image using Vision Model. """ try: prompt = "Extract and transcribe all visible text from this image exactly as it appears. Return only the text." full_text, _ = self.client.chat(message=prompt, image_path=image_path) # Simple heuristic for word count word_count = len(full_text.split()) return { "text_blocks": [], # VLM doesn't give bounding boxes easily "full_text": full_text, "word_count": word_count } except Exception as e: return { "text_blocks": [], "full_text": f"[OCR ERROR] Vision Model Failed: {e}", "word_count": 0 } # ========================================== # VISION CONNECTOR (Future: Multi-modal) # ========================================== class VisionConnector: """ Adapter for computer vision operations. Wraps OpenCV and scikit-image. """ @staticmethod def calculate_ssim(image1_path: str, image2_path: str) -> float: """ Calculate Structural Similarity Index between two images. Uses scikit-image for accurate SSIM calculation. Args: image1_path: Path to first image image2_path: Path to second image Returns: SSIM score (0-1, higher is better) """ try: import cv2 from skimage.metrics import structural_similarity as ssim img1 = cv2.imread(image1_path) img2 = cv2.imread(image2_path) # Resize if needed if img1.shape != img2.shape: img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) # Convert to grayscale for SSIM gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) return ssim(gray1, gray2) except ImportError as e: raise ImportError(f"Required library not installed: {e}") @staticmethod def analyze_entropy(image_path: str) -> Dict[str, float]: """ Analyze image entropy (information density). Args: image_path: Path to image file Returns: Dict with entropy metrics """ try: import cv2 import numpy as np from skimage.measure import shannon_entropy img = cv2.imread(image_path) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Calculate entropy entropy = shannon_entropy(gray) # Calculate histogram entropy hist = cv2.calcHist([gray], [0], None, [256], [0, 256]) hist = hist.flatten() / hist.sum() hist_entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0])) return { "shannon_entropy": entropy, "histogram_entropy": hist_entropy, "mean_intensity": float(np.mean(gray)), "std_intensity": float(np.std(gray)) } except ImportError as e: raise ImportError(f"Required library not installed: {e}") # ========================================== # DOLPHIN AGENT CONNECTOR (HF Inference) # ========================================== class DolphinAgentConnector: """ Adapter for Dolphin AI (via Hugging Face Inference). Replaces NeMo/OpenAI dependency with open weights. """ def __init__(self, model: str = "cognitivecomputations/dolphin-2.9-llama3-8b"): self.model = model self.config = ConnectorConfig.from_env() self._client = None def _ensure_client(self): """Lazy initialization of HF Inference Client.""" if self._client is None: try: from huggingface_hub import InferenceClient self._client = InferenceClient(token=self.config.hf_token, base_url="https://router.huggingface.co") except ImportError: raise ImportError("huggingface_hub not installed.") return self._client def chat(self, message: str, system_prompt: str = None) -> str: """ Chat with Dolphin agent. """ try: client = self._ensure_client() messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": message}) # Using basic text generation if chat template fails, but try chat first # Many HF models support chat_completion API via InferenceClient try: response = client.chat_completion( messages=messages, model=self.model, max_tokens=500 ) return response.choices[0].message.content, response.choices[0].get('logprobs') except Exception: # Fallback to text generation prompt = f"<|im_start|>user\n{message}<|im_end|>\n<|im_start|>assistant\n" res = client.text_generation(prompt, model=self.model, max_new_tokens=500) return res, None except Exception as e: return f"[Dolphin Error] {e}", None def analyze_diagram(self, image_path: str, prompt: str = "Describe this architectural diagram.") -> str: """ Analyze diagram using visual model (fallback to simple captioning if Dolphin is text-only). """ try: # Dolphin is text-only usually. Route to a Vision model. from .connectors import get_connector # lazy import hf = get_connector('hf') return hf.image_to_text(image_path) except Exception as e: return f"[Vision Error] {e}" # ========================================== # LOCAL LLM CONNECTOR (Ollama/LM Studio) # ========================================== class LocalLLMConnector: """ Adapter for Local Inference (Ollama / LM Studio). Uses OpenAI-compatible endpoint structure. Optimization: Direct localhost access (no Docker bridge lag). """ def __init__(self, base_url: str = None, model: str = "dolphin-x1-8b"): # Prioritize Environment -> Argument -> Default env_url = os.environ.get("LOGOS_LLM_ENDPOINT") self.base_url = base_url or env_url or "http://localhost:1234/v1" self.model = model async def chat_async(self, message: str, system_prompt: str = None, model: str = None, **kwargs): """ Asynchronous chat with local model via aiohttp. Supports extra params via kwargs (e.g., max_tokens, temperature). """ import aiohttp import json target_model = model or self.model payload = { "model": target_model, "messages": [], "temperature": 0.7, "stream": False } if system_prompt: payload["messages"].append({"role": "system", "content": system_prompt}) payload["messages"].append({"role": "user", "content": message}) payload["logprobs"] = True payload["top_logprobs"] = 1 # Merge extra args (e.g. max_tokens) payload.update(kwargs) endpoint = f"{self.base_url}/chat/completions" try: async with aiohttp.ClientSession() as session: async with session.post(endpoint, json=payload, timeout=30) as response: if response.status == 200: data = await response.json() content = data['choices'][0]['message'].get('content', "") logprobs = data['choices'][0].get('logprobs') return content, logprobs else: return f"[Error] Local LLM returned status {response.status}", None except Exception as e: return f"[Async Local LLM Error] {e}", None def chat(self, message: str, system_prompt: str = None, model: str = None, image_path: str = None) -> str: """ Chat with local model via requests. Supports Vision if image_path is provided. Auto-detects Docker host. """ import requests import json import base64 import os # Helper to encode image def encode_image(path): with open(path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') # Potential endpoints to try endpoints = [self.base_url] if "localhost" in self.base_url: endpoints.append(self.base_url.replace("localhost", "host.docker.internal")) # Use instance default if no specific model requested target_model = model or self.model payload = { "model": target_model, "messages": [], "temperature": 0.7, "stream": False, "logprobs": True, "top_logprobs": 1 } if system_prompt: payload["messages"].append({"role": "system", "content": system_prompt}) if image_path and os.path.exists(image_path): # Format message for Vision API (OpenAI compatible) base64_image = encode_image(image_path) user_content = [ {"type": "text", "text": message}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } } ] payload["messages"].append({"role": "user", "content": user_content}) else: # Standard Text Chat payload["messages"].append({"role": "user", "content": message}) last_error = "" for base in endpoints: endpoint = f"{base}/chat/completions" try: # Increased timeout for complex/vision tasks response = requests.post(endpoint, json=payload, timeout=30) response.raise_for_status() if response.status_code == 200: data = response.json() content = data['choices'][0]['message'].get('content', "") logprobs = data['choices'][0].get('logprobs') return content, logprobs except Exception as e: last_error = str(e) continue return f"[Local LLM Error] Could not connect to Local Swarm on {endpoints}. Is LM Studio running? ({last_error})", None # ========================================== # BROWSER AUTOMATION CONNECTOR (smolagents) # ========================================== class BrowserAutomationConnector: """ Adapter for HuggingFace smolagents + helium for browser automation. Enables autonomous web navigation, form filling, and data extraction. """ def __init__(self, headless: bool = True): self.headless = headless self._driver = None self._agent = None def _ensure_browser(self): """Initialize browser with helium.""" if self._driver is None: try: from helium import start_chrome, go_to, click, write, get_driver from selenium.webdriver.chrome.options import Options options = Options() if self.headless: options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') start_chrome(headless=self.headless) self._driver = get_driver() except ImportError: raise ImportError("helium/selenium not installed. Run: pip install helium selenium") return self._driver def navigate(self, url: str) -> str: """Navigate to a URL and return page title.""" try: from helium import go_to, get_driver self._ensure_browser() go_to(url) return f"Navigated to: {get_driver().title}" except Exception as e: return f"[Navigation Error] {e}" def search_page(self, query: str) -> str: """Search for text on current page.""" try: driver = self._ensure_browser() page_source = driver.page_source.lower() if query.lower() in page_source: return f"Found '{query}' on page" else: return f"'{query}' not found on page" except Exception as e: return f"[Search Error] {e}" def get_page_text(self) -> str: """Extract visible text from current page.""" try: driver = self._ensure_browser() from bs4 import BeautifulSoup soup = BeautifulSoup(driver.page_source, 'html.parser') text = soup.get_text(separator=' ', strip=True) return text[:5000] # Limit to 5k chars except Exception as e: return f"[Extraction Error] {e}" def click_element(self, text: str) -> str: """Click an element by its text.""" try: from helium import click self._ensure_browser() click(text) return f"Clicked: {text}" except Exception as e: return f"[Click Error] {e}" def type_text(self, field: str, text: str) -> str: """Type text into a field.""" try: from helium import write self._ensure_browser() write(text, into=field) return f"Typed into {field}" except Exception as e: return f"[Type Error] {e}" def close(self): """Close the browser.""" try: from helium import kill_browser kill_browser() self._driver = None except: pass def run_agent_task(self, task: str, model: str = "Qwen/Qwen2.5-Coder-32B-Instruct") -> str: """ Run a browser automation task using smolagents Code Agent. Args: task: Natural language task description model: HF model for the code agent Returns: Task result """ try: from smolagents import CodeAgent, HfApiModel from smolagents.tools import Tool # Create agent with browser tools model_instance = HfApiModel(model) agent = CodeAgent(tools=[], model=model_instance) # Run the task result = agent.run(task) return str(result) except ImportError: return "[Error] smolagents not installed. Run: pip install smolagents" except Exception as e: return f"[Agent Error] {e}" # ========================================== # FACTORY # ========================================== def get_connector(connector_type: str, **kwargs) -> Any: """ Factory function for connectors. Args: connector_type: One of 'hf', 'ocr', 'vision', 'nemo', 'browser' **kwargs: Connector-specific arguments Returns: Initialized connector instance """ connectors = { 'hf': HuggingFaceConnector, 'ocr': OCRConnector, 'vision': VisionConnector, 'dolphin': DolphinAgentConnector, 'browser': BrowserAutomationConnector, 'local': LocalLLMConnector } if connector_type not in connectors: raise ValueError(f"Unknown connector type: {connector_type}. Available: {list(connectors.keys())}") return connectors[connector_type](**kwargs) # ========================================== # REGISTRY (For Protocol 4 Discovery) # ========================================== AVAILABLE_CONNECTORS = { 'hf': { 'name': 'Hugging Face', 'capabilities': ['image_to_text', 'text_generation'], 'requires': ['huggingface_hub'], 'env_vars': ['HF_TOKEN'] }, 'ocr': { 'name': 'EasyOCR', 'capabilities': ['extract_text'], 'requires': ['easyocr'], 'env_vars': [] }, 'vision': { 'name': 'Vision (OpenCV/scikit-image)', 'capabilities': ['calculate_ssim', 'analyze_entropy'], 'requires': ['opencv-python-headless', 'scikit-image'], 'env_vars': [] }, 'dolphin': { 'name': 'Dolphin AI (HF Inference)', 'capabilities': ['chat', 'analyze_diagram'], 'requires': ['huggingface_hub'], 'env_vars': ['HF_TOKEN'] }, 'browser': { 'name': 'Browser Automation (smolagents)', 'capabilities': ['navigate', 'click_element', 'type_text', 'get_page_text', 'run_agent_task'], 'requires': ['smolagents', 'helium', 'selenium', 'beautifulsoup4'], 'env_vars': [] }, 'local': { 'name': 'Local LLM (Ollama/LM Studio)', 'capabilities': ['chat'], 'requires': ['requests'], 'env_vars': [] } } # ========================================== # CLI / TESTING # ========================================== def main(): """ CLI for testing connectors directly. Usage: python -m logos.connectors --test local --model google/gemma-3-4b """ import argparse import sys parser = argparse.ArgumentParser(description="LOGOS Connectors Utilities") parser.add_argument("--test", choices=list(AVAILABLE_CONNECTORS.keys()), help="Connector to test") parser.add_argument("--model", help="Model name for HF or Local connector") parser.add_argument("--prompt", default="Hello, are you online?", help="Prompt to send") parser.add_argument("--image", help="Path to image for OCR or Vision test") args = parser.parse_args() if not args.test: print("Available Connectors:") for k, v in AVAILABLE_CONNECTORS.items(): print(f" - {k:<10} : {v['name']}") print("\nRun with --test to verify a connection.") return print(f"--- Testing Connector: {args.test.upper()} ---") try: if args.test == 'local': # Local LLM / Vision Test model = args.model or "local-model" print(f"Targeting Model: {model}") client = get_connector('local', model=model) if args.image: print(f"Sending Vision Request with {args.image}...") resp = client.chat(args.prompt, image_path=args.image) else: print(f"Sending Chat Request: '{args.prompt}'...") resp = client.chat(args.prompt) print(f"\n[RESPONSE]\n{resp}") elif args.test == 'ocr': # OCR Test (via Vision) if not args.image: print("Error: --image argument required for OCR test.") return client = get_connector('ocr') print(f"Extracting text from {args.image}...") res = client.extract_text(args.image) print(f"\n[RESULT]\n{res['full_text']}") elif args.test == 'hf': # Hugging Face Test client = get_connector('hf') if args.image: # Image Captioning resp = client.image_to_text(args.image) else: # Text Gen resp = client.text_generation(args.prompt) print(f"\n[RESPONSE]\n{resp}") else: print(f"Test CLI not yet implemented for {args.test}. Import and use in Python.") except Exception as e: print(f"\n[FAIL] {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()