""" logos/ingest_knowledge.py - The "Token Consumption Plan" Protocol 4: Autonomous Resource Integration -> Knowledge Synthesis This script executes the "Ingestion" workflow: 1. Scans 'LOGOS Notes' for diagrammatic knowledge. 2. Uses the Nano Swarm (Gemma Vision) to "consume" the visual tokens. 3. Transmutes visual logic into textual knowledge (Markdown). Target: 'LOGOS Notes/*.png' -> 'knowledge_base/diagram_analysis.md' """ import sys import os import glob import time from typing import List sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from logos.connectors import get_connector # Configuration SOURCE_DIR = "LOGOS Notes" OUTPUT_FILE = "knowledge_base/diagram_analysis.md" VISION_MODEL = "google/gemma-3-4b" TEXT_MODEL = "dolphin-x1-8b" def ingest_diagrams(): print(f"--- LOGOS Knowledge Ingestion [Protocol 4: Context-Primed Vision] ---") print(f"Targeting: {SOURCE_DIR}") print(f"Agents: {TEXT_MODEL} (Context) -> {VISION_MODEL} (Analysis)") # 1. Scout Resources images = glob.glob(os.path.join(SOURCE_DIR, "*.png")) + glob.glob(os.path.join(SOURCE_DIR, "*.jpg")) images.sort() if not images: print(f"[WARN] No assets found in {SOURCE_DIR}") return # 2. Initialize Agents try: # We reuse the connector but switch 'model' param per call to stay hardware aligned (one active stream) connector = get_connector('local') except Exception as e: print(f"[FAIL] Could not spawn connector: {e}") return # 3. Execution Loop with open(OUTPUT_FILE, "w", encoding="utf-8") as f: f.write("# LOGOS Diagram Analysis (Context-Primed)\n") f.write(f"**Generated:** {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"**Pipeline:** Context({TEXT_MODEL}) -> Vision({VISION_MODEL})\n\n") for i, img_path in enumerate(images): filename = os.path.basename(img_path) clean_name = os.path.splitext(filename)[0].replace("_", " ").replace("-", " ") print(f"[{i+1}/{len(images)}] Processing: {filename}...") try: # Force Hardware Cool-down between items time.sleep(2.0) # STEP A: Context Priming (Dolphin) print(f" > [Dolphin] Extracting Context (timeout=30s)...") try: context_prompt = f"Analyze filename '{clean_name}'. Return JSON {{'title': '...', 'description': '...'}}." # Debug: Print ensuring valid string # print(f"DEBUG: Prompting Dolphin with: {context_prompt}") context_resp, _ = connector.chat(context_prompt, model=TEXT_MODEL) # Robust JSON extraction import json try: # Find first '{' and last '}' s = context_resp.find('{') e = context_resp.rfind('}') + 1 if s != -1 and e != -1: meta = json.loads(context_resp[s:e]) refined_context = f"Context: {meta.get('title', clean_name)}. {meta.get('description', '')}" else: refined_context = f"Context: {clean_name}" except: refined_context = f"Context: {context_resp[:200].replace(chr(10), ' ')}" except Exception as e: print(f" > [Dolphin] Bypass (Error: {e})") refined_context = f"Context: {clean_name}" # Hardware Switch Delay print(f" > [System] Swapping to Vision Model...") time.sleep(1.0) # STEP B: Vision Analysis (Gemma) print(f" > [Gemma] performing Context-Aware Analysis...") vision_prompt = f""" Role: Senior Hardware Architect. Context: {refined_context} Task: Analyze the diagram. Validate if the visual data matches the context. Extract: 1. Hardware components (Bus, ALU, Cache). 2. Data flow direction. 3. Mathematical formulas. Output: Actionable Markdown. """ start_ts = time.time() analysis, _ = connector.chat(vision_prompt, image_path=img_path, model=VISION_MODEL) duration = time.time() - start_ts # Anneal into Knowledge Base f.write(f"## {filename}\n") f.write(f"**Context ({TEXT_MODEL}):** {refined_context}\n\n") f.write(f"![{filename}](../{img_path.replace(os.sep, '/')})\n\n") f.write(f"{analysis}\n\n") f.write(f"*Analysis time: {duration:.2f}s*\n") f.write("---\n\n") f.flush() print(f" > Ingested ({duration:.2f}s)") except Exception as e: print(f" > [FAIL] Error: {e}") f.write(f"## {filename}\n") f.write(f"**Error:** {e}\n\n---\n\n") print(f"\n[SUCCESS] Knowledge synthesis complete.") print(f"Artifact: {OUTPUT_FILE}") # --- INGESTION REGISTRY (Deduplication) --- class IngestionRegistry: def __init__(self, registry_path="logos/ingestion_registry.json"): self.registry_path = registry_path self.data = self._load() def _load(self): if os.path.exists(self.registry_path): try: import json with open(self.registry_path, 'r') as f: return json.load(f) except: pass return {} def save(self): import json with open(self.registry_path, 'w') as f: json.dump(self.data, f, indent=2) def is_processed(self, filepath): """Checks if file is already ingested based on mtime.""" stat = os.stat(filepath) key = os.path.abspath(filepath) last_mtime = self.data.get(key, {}).get("mtime", 0) return stat.st_mtime <= last_mtime def mark_processed(self, filepath, meta=None): """Tags data as ingested.""" key = os.path.abspath(filepath) self.data[key] = { "mtime": os.stat(filepath).st_mtime, "timestamp": time.time(), "meta": meta or {} } self.save() def ingest_diagrams(): print(f"--- LOGOS Knowledge Ingestion [Protocol 4: Context-Primed Vision] ---") print(f"Targeting: {SOURCE_DIR}") print(f"Agents: {TEXT_MODEL} (Context) -> {VISION_MODEL} (Analysis)") registry = IngestionRegistry() # 1. Scout Resources images = glob.glob(os.path.join(SOURCE_DIR, "*.png")) + glob.glob(os.path.join(SOURCE_DIR, "*.jpg")) images.sort() if not images: print(f"[WARN] No assets found in {SOURCE_DIR}") return # 2. Initialize Agents try: # We reuse the connector but switch 'model' param per call to stay hardware aligned (one active stream) connector = get_connector('local') except Exception as e: print(f"[FAIL] Could not spawn connector: {e}") return # 3. Execution Loop # Write mode 'a' (append) to preserve history if we are skipping? # Or 'w' but reading old content? # For now, let's just append new findings or overwrite if it's a full run. # User asked to "tag data that we have already ingested", usually implies skipping. # Check if output file exists, if so append, else write header mode = 'a' if os.path.exists(OUTPUT_FILE) else 'w' with open(OUTPUT_FILE, mode, encoding="utf-8") as f: if mode == 'w': f.write("# LOGOS Diagram Analysis (Context-Primed)\n") f.write(f"**Generated:** {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"**Pipeline:** Context({TEXT_MODEL}) -> Vision({VISION_MODEL})\n\n") for i, img_path in enumerate(images): filename = os.path.basename(img_path) # [DEDUPLICATION CHECK] if registry.is_processed(img_path): print(f"[{i+1}/{len(images)}] Skipping {filename} (Already Ingested) [Duplicate Path Atom]") continue clean_name = os.path.splitext(filename)[0].replace("_", " ").replace("-", " ") print(f"[{i+1}/{len(images)}] Processing: {filename}...") try: # Force Hardware Cool-down between items time.sleep(1.0) # STEP A: Context Priming (Dolphin) print(f" > [Dolphin] Extracting Context (timeout=30s)...") try: context_prompt = f"Analyze filename '{clean_name}'. Return JSON {{'title': '...', 'description': '...'}}." context_resp, _ = connector.chat(context_prompt, model=TEXT_MODEL) # Robust JSON extraction import json try: s = context_resp.find('{') e = context_resp.rfind('}') + 1 if s != -1 and e != -1: meta = json.loads(context_resp[s:e]) refined_context = f"Context: {meta.get('title', clean_name)}. {meta.get('description', '')}" else: refined_context = f"Context: {clean_name}" except: refined_context = f"Context: {context_resp[:200].replace(chr(10), ' ')}" except Exception as e: print(f" > [Dolphin] Bypass (Error: {e})") refined_context = f"Context: {clean_name}" # Hardware Switch Delay print(f" > [System] Swapping to Vision Model...") time.sleep(1.0) # STEP B: Vision Analysis (Gemma) print(f" > [Gemma] performing Context-Aware Analysis...") # [CACHE OPTIMIZATION] # We move the Static Instructions to the FRONT and the Dynamic Context to the BACK. # Dynamic part user_content = f"Context: {refined_context}\nTarget: {filename}" # Static System/Instruction Block (Cached) vision_system = ( "Role: Senior Hardware Architect.\n" "Task: Analyze the diagram. Validate if the visual data matches the context.\n" "Extract:\n" "1. Hardware components (Bus, ALU, Cache).\n" "2. Data flow direction.\n" "3. Mathematical formulas.\n" "Output: Actionable Markdown." ) full_prompt = f"{vision_system}\n\n[DATA]: {user_content}" start_ts = time.time() analysis, _ = connector.chat(full_prompt, image_path=img_path, model=VISION_MODEL) duration = time.time() - start_ts # Anneal into Knowledge Base f.write(f"## {filename}\n") f.write(f"**Context ({TEXT_MODEL}):** {refined_context}\n\n") f.write(f"![{filename}](../{img_path.replace(os.sep, '/')})\n\n") f.write(f"{analysis}\n\n") f.write(f"*Analysis time: {duration:.2f}s*\n") f.write("---\n\n") f.flush() print(f" > Ingested ({duration:.2f}s)") # [PROTOCOL 26: ANALOG TO DIGITAL INDEXING] # We send the Vision Analysis to the Server to be converted into a Gödel Number try: import requests payload = { "filepath": img_path, "content": f"{refined_context}\n{analysis}" } # Assuming default port res = requests.post("http://localhost:5000/index-module", json=payload, timeout=5) if res.status_code == 200: data = res.json() mid = data.get('manifold_id') primes = data.get('prime_coordinates') print(f" > [GÖDEL] Indexing Complete. Manifold ID: {mid}") print(f" > [PRIME SPACE] Active Domains: {primes}") f.write(f"**Gödel ID:** `{mid}`\n**Prime Vectors:** `{primes}`\n\n") else: print(f" > [GÖDEL] Indexing Failed: {res.status_code}") except Exception as e: print(f" > [GÖDEL] Indexing Error: {e}") # [REGISTRY UPDATE] registry.mark_processed(img_path, meta={"duration": duration, "context": refined_context}) except Exception as e: print(f" > [FAIL] Error: {e}") f.write(f"## {filename}\n") f.write(f"**Error:** {e}\n\n---\n\n") print(f"\n[SUCCESS] Knowledge synthesis complete.") if __name__ == "__main__": ingest_diagrams() ingest_documents()