LOGOS-SPCW-Matroska / logos /connectors.py
GitHub Copilot
Workflows: New Video Ingestion Protocol - Dolphin Cognitive Analysis -> RJ-1 Encoding
644be9f
"""
connectors.py - External API/Service Adapters
Protocol 4: Autonomous Resource Integration
This module isolates all external dependencies so the core engine remains pure.
Each connector wraps an external API/library with a standardized interface.
"""
import os
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
# ==========================================
# CONFIGURATION
# ==========================================
@dataclass
class ConnectorConfig:
"""Configuration for external connectors."""
hf_token: Optional[str] = None
hf_space_id: Optional[str] = None
@classmethod
def from_env(cls) -> 'ConnectorConfig':
"""Load configuration from environment variables."""
return cls(
hf_token=os.environ.get('HF_TOKEN'),
hf_space_id=os.environ.get('HF_SPACE_ID')
)
# ==========================================
# HUGGING FACE CONNECTOR
# ==========================================
class HuggingFaceConnector:
"""
Adapter for Hugging Face Hub and Inference API.
Wraps huggingface_hub for model loading and inference.
"""
def __init__(self, config: ConnectorConfig = None):
self.config = config or ConnectorConfig.from_env()
self._client = None
def _ensure_client(self):
"""Lazy initialization of HF client."""
if self._client is None:
try:
from huggingface_hub import InferenceClient
self._client = InferenceClient(token=self.config.hf_token, base_url="https://router.huggingface.co")
except ImportError:
raise ImportError("huggingface_hub not installed. Run: pip install huggingface_hub")
return self._client
def image_to_text(self, image_path: str, model: str = "Salesforce/blip-image-captioning-base") -> str:
"""
Generate text description from image using HF Inference API.
Args:
image_path: Path to image file
model: HF model ID for image captioning
Returns:
Generated text description
"""
client = self._ensure_client()
with open(image_path, 'rb') as f:
result = client.image_to_text(f.read(), model=model)
return result
def text_generation(self, prompt: str, model: str = "gpt2", max_length: int = 100) -> str:
"""
Generate text from prompt using HF Inference API.
Args:
prompt: Input text prompt
model: HF model ID for text generation
max_length: Maximum output length
Returns:
Generated text
"""
client = self._ensure_client()
result = client.text_generation(prompt, model=model, max_new_tokens=max_length)
return result
# ==========================================
# OCR CONNECTOR
# ==========================================
class OCRConnector:
"""
Adapter for Optical Character Recognition via Local Vision Model.
Uses 'google/gemma-3-4b' (or configured local model) to transcribe text from images.
"""
def __init__(self, languages: List[str] = None, gpu: bool = False):
# We rely on the local LLM connector, 'gpu' arg is ignored as it's handled by LM Studio
# Hardcoded to Gemma as requested by user ("gemma is your vision model")
self.client = get_connector('local', model="google/gemma-3-4b")
def extract_text(self, image_path: str) -> Dict[str, Any]:
"""
Extract text from image using Vision Model.
"""
try:
prompt = "Extract and transcribe all visible text from this image exactly as it appears. Return only the text."
full_text, _ = self.client.chat(message=prompt, image_path=image_path)
# Simple heuristic for word count
word_count = len(full_text.split())
return {
"text_blocks": [], # VLM doesn't give bounding boxes easily
"full_text": full_text,
"word_count": word_count
}
except Exception as e:
return {
"text_blocks": [],
"full_text": f"[OCR ERROR] Vision Model Failed: {e}",
"word_count": 0
}
# ==========================================
# VISION CONNECTOR (Future: Multi-modal)
# ==========================================
class VisionConnector:
"""
Adapter for computer vision operations.
Wraps OpenCV and scikit-image.
"""
@staticmethod
def calculate_ssim(image1_path: str, image2_path: str) -> float:
"""
Calculate Structural Similarity Index between two images.
Uses scikit-image for accurate SSIM calculation.
Args:
image1_path: Path to first image
image2_path: Path to second image
Returns:
SSIM score (0-1, higher is better)
"""
try:
import cv2
from skimage.metrics import structural_similarity as ssim
img1 = cv2.imread(image1_path)
img2 = cv2.imread(image2_path)
# Resize if needed
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
# Convert to grayscale for SSIM
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
return ssim(gray1, gray2)
except ImportError as e:
raise ImportError(f"Required library not installed: {e}")
@staticmethod
def analyze_entropy(image_path: str) -> Dict[str, float]:
"""
Analyze image entropy (information density).
Args:
image_path: Path to image file
Returns:
Dict with entropy metrics
"""
try:
import cv2
import numpy as np
from skimage.measure import shannon_entropy
img = cv2.imread(image_path)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Calculate entropy
entropy = shannon_entropy(gray)
# Calculate histogram entropy
hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
hist = hist.flatten() / hist.sum()
hist_entropy = -np.sum(hist[hist > 0] * np.log2(hist[hist > 0]))
return {
"shannon_entropy": entropy,
"histogram_entropy": hist_entropy,
"mean_intensity": float(np.mean(gray)),
"std_intensity": float(np.std(gray))
}
except ImportError as e:
raise ImportError(f"Required library not installed: {e}")
# ==========================================
# DOLPHIN AGENT CONNECTOR (HF Inference)
# ==========================================
class DolphinAgentConnector:
"""
Adapter for Dolphin AI (via Hugging Face Inference).
Replaces NeMo/OpenAI dependency with open weights.
"""
def __init__(self, model: str = "cognitivecomputations/dolphin-2.9-llama3-8b"):
self.model = model
self.config = ConnectorConfig.from_env()
self._client = None
def _ensure_client(self):
"""Lazy initialization of HF Inference Client."""
if self._client is None:
try:
from huggingface_hub import InferenceClient
self._client = InferenceClient(token=self.config.hf_token, base_url="https://router.huggingface.co")
except ImportError:
raise ImportError("huggingface_hub not installed.")
return self._client
def chat(self, message: str, system_prompt: str = None) -> str:
"""
Chat with Dolphin agent.
"""
try:
client = self._ensure_client()
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": message})
# Using basic text generation if chat template fails, but try chat first
# Many HF models support chat_completion API via InferenceClient
try:
response = client.chat_completion(
messages=messages,
model=self.model,
max_tokens=500
)
return response.choices[0].message.content, response.choices[0].get('logprobs')
except Exception:
# Fallback to text generation
prompt = f"<|im_start|>user\n{message}<|im_end|>\n<|im_start|>assistant\n"
res = client.text_generation(prompt, model=self.model, max_new_tokens=500)
return res, None
except Exception as e:
return f"[Dolphin Error] {e}", None
def analyze_diagram(self, image_path: str, prompt: str = "Describe this architectural diagram.") -> str:
"""
Analyze diagram using visual model (fallback to simple captioning if Dolphin is text-only).
"""
try:
# Dolphin is text-only usually. Route to a Vision model.
from .connectors import get_connector # lazy import
hf = get_connector('hf')
return hf.image_to_text(image_path)
except Exception as e:
return f"[Vision Error] {e}"
# ==========================================
# LOCAL LLM CONNECTOR (Ollama/LM Studio)
# ==========================================
class LocalLLMConnector:
"""
Adapter for Local Inference (Ollama / LM Studio).
Uses OpenAI-compatible endpoint structure.
Optimization: Direct localhost access (no Docker bridge lag).
"""
def __init__(self, base_url: str = None, model: str = "dolphin-x1-8b"):
# Prioritize Environment -> Argument -> Default
env_url = os.environ.get("LOGOS_LLM_ENDPOINT")
self.base_url = base_url or env_url or "http://localhost:1234/v1"
self.model = model
async def chat_async(self, message: str, system_prompt: str = None, model: str = None, **kwargs):
"""
Asynchronous chat with local model via aiohttp.
Supports extra params via kwargs (e.g., max_tokens, temperature).
"""
import aiohttp
import json
target_model = model or self.model
payload = {
"model": target_model,
"messages": [],
"temperature": 0.7,
"stream": False
}
if system_prompt:
payload["messages"].append({"role": "system", "content": system_prompt})
payload["messages"].append({"role": "user", "content": message})
payload["logprobs"] = True
payload["top_logprobs"] = 1
# Merge extra args (e.g. max_tokens)
payload.update(kwargs)
endpoint = f"{self.base_url}/chat/completions"
try:
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, json=payload, timeout=30) as response:
if response.status == 200:
data = await response.json()
content = data['choices'][0]['message'].get('content', "")
logprobs = data['choices'][0].get('logprobs')
return content, logprobs
else:
return f"[Error] Local LLM returned status {response.status}", None
except Exception as e:
return f"[Async Local LLM Error] {e}", None
def chat(self, message: str, system_prompt: str = None, model: str = None, image_path: str = None) -> str:
"""
Chat with local model via requests. Supports Vision if image_path is provided.
Auto-detects Docker host.
"""
import requests
import json
import base64
import os
# Helper to encode image
def encode_image(path):
with open(path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
# Potential endpoints to try
endpoints = [self.base_url]
if "localhost" in self.base_url:
endpoints.append(self.base_url.replace("localhost", "host.docker.internal"))
# Use instance default if no specific model requested
target_model = model or self.model
payload = {
"model": target_model,
"messages": [],
"temperature": 0.7,
"stream": False,
"logprobs": True,
"top_logprobs": 1
}
if system_prompt:
payload["messages"].append({"role": "system", "content": system_prompt})
if image_path and os.path.exists(image_path):
# Format message for Vision API (OpenAI compatible)
base64_image = encode_image(image_path)
user_content = [
{"type": "text", "text": message},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
payload["messages"].append({"role": "user", "content": user_content})
else:
# Standard Text Chat
payload["messages"].append({"role": "user", "content": message})
last_error = ""
for base in endpoints:
endpoint = f"{base}/chat/completions"
try:
# Increased timeout for complex/vision tasks
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
if response.status_code == 200:
data = response.json()
content = data['choices'][0]['message'].get('content', "")
logprobs = data['choices'][0].get('logprobs')
return content, logprobs
except Exception as e:
last_error = str(e)
continue
return f"[Local LLM Error] Could not connect to Local Swarm on {endpoints}. Is LM Studio running? ({last_error})", None
# ==========================================
# BROWSER AUTOMATION CONNECTOR (smolagents)
# ==========================================
class BrowserAutomationConnector:
"""
Adapter for HuggingFace smolagents + helium for browser automation.
Enables autonomous web navigation, form filling, and data extraction.
"""
def __init__(self, headless: bool = True):
self.headless = headless
self._driver = None
self._agent = None
def _ensure_browser(self):
"""Initialize browser with helium."""
if self._driver is None:
try:
from helium import start_chrome, go_to, click, write, get_driver
from selenium.webdriver.chrome.options import Options
options = Options()
if self.headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
start_chrome(headless=self.headless)
self._driver = get_driver()
except ImportError:
raise ImportError("helium/selenium not installed. Run: pip install helium selenium")
return self._driver
def navigate(self, url: str) -> str:
"""Navigate to a URL and return page title."""
try:
from helium import go_to, get_driver
self._ensure_browser()
go_to(url)
return f"Navigated to: {get_driver().title}"
except Exception as e:
return f"[Navigation Error] {e}"
def search_page(self, query: str) -> str:
"""Search for text on current page."""
try:
driver = self._ensure_browser()
page_source = driver.page_source.lower()
if query.lower() in page_source:
return f"Found '{query}' on page"
else:
return f"'{query}' not found on page"
except Exception as e:
return f"[Search Error] {e}"
def get_page_text(self) -> str:
"""Extract visible text from current page."""
try:
driver = self._ensure_browser()
from bs4 import BeautifulSoup
soup = BeautifulSoup(driver.page_source, 'html.parser')
text = soup.get_text(separator=' ', strip=True)
return text[:5000] # Limit to 5k chars
except Exception as e:
return f"[Extraction Error] {e}"
def click_element(self, text: str) -> str:
"""Click an element by its text."""
try:
from helium import click
self._ensure_browser()
click(text)
return f"Clicked: {text}"
except Exception as e:
return f"[Click Error] {e}"
def type_text(self, field: str, text: str) -> str:
"""Type text into a field."""
try:
from helium import write
self._ensure_browser()
write(text, into=field)
return f"Typed into {field}"
except Exception as e:
return f"[Type Error] {e}"
def close(self):
"""Close the browser."""
try:
from helium import kill_browser
kill_browser()
self._driver = None
except:
pass
def run_agent_task(self, task: str, model: str = "Qwen/Qwen2.5-Coder-32B-Instruct") -> str:
"""
Run a browser automation task using smolagents Code Agent.
Args:
task: Natural language task description
model: HF model for the code agent
Returns:
Task result
"""
try:
from smolagents import CodeAgent, HfApiModel
from smolagents.tools import Tool
# Create agent with browser tools
model_instance = HfApiModel(model)
agent = CodeAgent(tools=[], model=model_instance)
# Run the task
result = agent.run(task)
return str(result)
except ImportError:
return "[Error] smolagents not installed. Run: pip install smolagents"
except Exception as e:
return f"[Agent Error] {e}"
# ==========================================
# FACTORY
# ==========================================
def get_connector(connector_type: str, **kwargs) -> Any:
"""
Factory function for connectors.
Args:
connector_type: One of 'hf', 'ocr', 'vision', 'nemo', 'browser'
**kwargs: Connector-specific arguments
Returns:
Initialized connector instance
"""
connectors = {
'hf': HuggingFaceConnector,
'ocr': OCRConnector,
'vision': VisionConnector,
'dolphin': DolphinAgentConnector,
'browser': BrowserAutomationConnector,
'local': LocalLLMConnector
}
if connector_type not in connectors:
raise ValueError(f"Unknown connector type: {connector_type}. Available: {list(connectors.keys())}")
return connectors[connector_type](**kwargs)
# ==========================================
# REGISTRY (For Protocol 4 Discovery)
# ==========================================
AVAILABLE_CONNECTORS = {
'hf': {
'name': 'Hugging Face',
'capabilities': ['image_to_text', 'text_generation'],
'requires': ['huggingface_hub'],
'env_vars': ['HF_TOKEN']
},
'ocr': {
'name': 'EasyOCR',
'capabilities': ['extract_text'],
'requires': ['easyocr'],
'env_vars': []
},
'vision': {
'name': 'Vision (OpenCV/scikit-image)',
'capabilities': ['calculate_ssim', 'analyze_entropy'],
'requires': ['opencv-python-headless', 'scikit-image'],
'env_vars': []
},
'dolphin': {
'name': 'Dolphin AI (HF Inference)',
'capabilities': ['chat', 'analyze_diagram'],
'requires': ['huggingface_hub'],
'env_vars': ['HF_TOKEN']
},
'browser': {
'name': 'Browser Automation (smolagents)',
'capabilities': ['navigate', 'click_element', 'type_text', 'get_page_text', 'run_agent_task'],
'requires': ['smolagents', 'helium', 'selenium', 'beautifulsoup4'],
'env_vars': []
},
'local': {
'name': 'Local LLM (Ollama/LM Studio)',
'capabilities': ['chat'],
'requires': ['requests'],
'env_vars': []
}
}
# ==========================================
# CLI / TESTING
# ==========================================
def main():
"""
CLI for testing connectors directly.
Usage: python -m logos.connectors --test local --model google/gemma-3-4b
"""
import argparse
import sys
parser = argparse.ArgumentParser(description="LOGOS Connectors Utilities")
parser.add_argument("--test", choices=list(AVAILABLE_CONNECTORS.keys()), help="Connector to test")
parser.add_argument("--model", help="Model name for HF or Local connector")
parser.add_argument("--prompt", default="Hello, are you online?", help="Prompt to send")
parser.add_argument("--image", help="Path to image for OCR or Vision test")
args = parser.parse_args()
if not args.test:
print("Available Connectors:")
for k, v in AVAILABLE_CONNECTORS.items():
print(f" - {k:<10} : {v['name']}")
print("\nRun with --test <name> to verify a connection.")
return
print(f"--- Testing Connector: {args.test.upper()} ---")
try:
if args.test == 'local':
# Local LLM / Vision Test
model = args.model or "local-model"
print(f"Targeting Model: {model}")
client = get_connector('local', model=model)
if args.image:
print(f"Sending Vision Request with {args.image}...")
resp = client.chat(args.prompt, image_path=args.image)
else:
print(f"Sending Chat Request: '{args.prompt}'...")
resp = client.chat(args.prompt)
print(f"\n[RESPONSE]\n{resp}")
elif args.test == 'ocr':
# OCR Test (via Vision)
if not args.image:
print("Error: --image argument required for OCR test.")
return
client = get_connector('ocr')
print(f"Extracting text from {args.image}...")
res = client.extract_text(args.image)
print(f"\n[RESULT]\n{res['full_text']}")
elif args.test == 'hf':
# Hugging Face Test
client = get_connector('hf')
if args.image:
# Image Captioning
resp = client.image_to_text(args.image)
else:
# Text Gen
resp = client.text_generation(args.prompt)
print(f"\n[RESPONSE]\n{resp}")
else:
print(f"Test CLI not yet implemented for {args.test}. Import and use in Python.")
except Exception as e:
print(f"\n[FAIL] {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()