Spaces:
Runtime error
Runtime error
| """ | |
| logos/ingest_knowledge.py - The "Token Consumption Plan" | |
| Protocol 4: Autonomous Resource Integration -> Knowledge Synthesis | |
| This script executes the "Ingestion" workflow: | |
| 1. Scans 'LOGOS Notes' for diagrammatic knowledge. | |
| 2. Uses the Nano Swarm (Gemma Vision) to "consume" the visual tokens. | |
| 3. Transmutes visual logic into textual knowledge (Markdown). | |
| Target: 'LOGOS Notes/*.png' -> 'knowledge_base/diagram_analysis.md' | |
| """ | |
| import sys | |
| import os | |
| import glob | |
| import time | |
| from typing import List | |
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
| from logos.connectors import get_connector | |
| # Configuration | |
| SOURCE_DIR = "LOGOS Notes" | |
| OUTPUT_FILE = "knowledge_base/diagram_analysis.md" | |
| VISION_MODEL = "google/gemma-3-4b" | |
| TEXT_MODEL = "dolphin-x1-8b" | |
| def ingest_diagrams(): | |
| print(f"--- LOGOS Knowledge Ingestion [Protocol 4: Context-Primed Vision] ---") | |
| print(f"Targeting: {SOURCE_DIR}") | |
| print(f"Agents: {TEXT_MODEL} (Context) -> {VISION_MODEL} (Analysis)") | |
| # 1. Scout Resources | |
| images = glob.glob(os.path.join(SOURCE_DIR, "*.png")) + glob.glob(os.path.join(SOURCE_DIR, "*.jpg")) | |
| images.sort() | |
| if not images: | |
| print(f"[WARN] No assets found in {SOURCE_DIR}") | |
| return | |
| # 2. Initialize Agents | |
| try: | |
| # We reuse the connector but switch 'model' param per call to stay hardware aligned (one active stream) | |
| connector = get_connector('local') | |
| except Exception as e: | |
| print(f"[FAIL] Could not spawn connector: {e}") | |
| return | |
| # 3. Execution Loop | |
| with open(OUTPUT_FILE, "w", encoding="utf-8") as f: | |
| f.write("# LOGOS Diagram Analysis (Context-Primed)\n") | |
| f.write(f"**Generated:** {time.strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| f.write(f"**Pipeline:** Context({TEXT_MODEL}) -> Vision({VISION_MODEL})\n\n") | |
| for i, img_path in enumerate(images): | |
| filename = os.path.basename(img_path) | |
| clean_name = os.path.splitext(filename)[0].replace("_", " ").replace("-", " ") | |
| print(f"[{i+1}/{len(images)}] Processing: {filename}...") | |
| try: | |
| # Force Hardware Cool-down between items | |
| time.sleep(2.0) | |
| # STEP A: Context Priming (Dolphin) | |
| print(f" > [Dolphin] Extracting Context (timeout=30s)...") | |
| try: | |
| context_prompt = f"Analyze filename '{clean_name}'. Return JSON {{'title': '...', 'description': '...'}}." | |
| # Debug: Print ensuring valid string | |
| # print(f"DEBUG: Prompting Dolphin with: {context_prompt}") | |
| context_resp, _ = connector.chat(context_prompt, model=TEXT_MODEL) | |
| # Robust JSON extraction | |
| import json | |
| try: | |
| # Find first '{' and last '}' | |
| s = context_resp.find('{') | |
| e = context_resp.rfind('}') + 1 | |
| if s != -1 and e != -1: | |
| meta = json.loads(context_resp[s:e]) | |
| refined_context = f"Context: {meta.get('title', clean_name)}. {meta.get('description', '')}" | |
| else: | |
| refined_context = f"Context: {clean_name}" | |
| except: | |
| refined_context = f"Context: {context_resp[:200].replace(chr(10), ' ')}" | |
| except Exception as e: | |
| print(f" > [Dolphin] Bypass (Error: {e})") | |
| refined_context = f"Context: {clean_name}" | |
| # Hardware Switch Delay | |
| print(f" > [System] Swapping to Vision Model...") | |
| time.sleep(1.0) | |
| # STEP B: Vision Analysis (Gemma) | |
| print(f" > [Gemma] performing Context-Aware Analysis...") | |
| vision_prompt = f""" | |
| Role: Senior Hardware Architect. | |
| Context: {refined_context} | |
| Task: Analyze the diagram. Validate if the visual data matches the context. | |
| Extract: | |
| 1. Hardware components (Bus, ALU, Cache). | |
| 2. Data flow direction. | |
| 3. Mathematical formulas. | |
| Output: Actionable Markdown. | |
| """ | |
| start_ts = time.time() | |
| analysis, _ = connector.chat(vision_prompt, image_path=img_path, model=VISION_MODEL) | |
| duration = time.time() - start_ts | |
| # Anneal into Knowledge Base | |
| f.write(f"## {filename}\n") | |
| f.write(f"**Context ({TEXT_MODEL}):** {refined_context}\n\n") | |
| f.write(f"})\n\n") | |
| f.write(f"{analysis}\n\n") | |
| f.write(f"*Analysis time: {duration:.2f}s*\n") | |
| f.write("---\n\n") | |
| f.flush() | |
| print(f" > Ingested ({duration:.2f}s)") | |
| except Exception as e: | |
| print(f" > [FAIL] Error: {e}") | |
| f.write(f"## {filename}\n") | |
| f.write(f"**Error:** {e}\n\n---\n\n") | |
| print(f"\n[SUCCESS] Knowledge synthesis complete.") | |
| print(f"Artifact: {OUTPUT_FILE}") | |
| # --- INGESTION REGISTRY (Deduplication) --- | |
| class IngestionRegistry: | |
| def __init__(self, registry_path="logos/ingestion_registry.json"): | |
| self.registry_path = registry_path | |
| self.data = self._load() | |
| def _load(self): | |
| if os.path.exists(self.registry_path): | |
| try: | |
| import json | |
| with open(self.registry_path, 'r') as f: | |
| return json.load(f) | |
| except: | |
| pass | |
| return {} | |
| def save(self): | |
| import json | |
| with open(self.registry_path, 'w') as f: | |
| json.dump(self.data, f, indent=2) | |
| def is_processed(self, filepath): | |
| """Checks if file is already ingested based on mtime.""" | |
| stat = os.stat(filepath) | |
| key = os.path.abspath(filepath) | |
| last_mtime = self.data.get(key, {}).get("mtime", 0) | |
| return stat.st_mtime <= last_mtime | |
| def mark_processed(self, filepath, meta=None): | |
| """Tags data as ingested.""" | |
| key = os.path.abspath(filepath) | |
| self.data[key] = { | |
| "mtime": os.stat(filepath).st_mtime, | |
| "timestamp": time.time(), | |
| "meta": meta or {} | |
| } | |
| self.save() | |
| def ingest_diagrams(): | |
| print(f"--- LOGOS Knowledge Ingestion [Protocol 4: Context-Primed Vision] ---") | |
| print(f"Targeting: {SOURCE_DIR}") | |
| print(f"Agents: {TEXT_MODEL} (Context) -> {VISION_MODEL} (Analysis)") | |
| registry = IngestionRegistry() | |
| # 1. Scout Resources | |
| images = glob.glob(os.path.join(SOURCE_DIR, "*.png")) + glob.glob(os.path.join(SOURCE_DIR, "*.jpg")) | |
| images.sort() | |
| if not images: | |
| print(f"[WARN] No assets found in {SOURCE_DIR}") | |
| return | |
| # 2. Initialize Agents | |
| try: | |
| # We reuse the connector but switch 'model' param per call to stay hardware aligned (one active stream) | |
| connector = get_connector('local') | |
| except Exception as e: | |
| print(f"[FAIL] Could not spawn connector: {e}") | |
| return | |
| # 3. Execution Loop | |
| # Write mode 'a' (append) to preserve history if we are skipping? | |
| # Or 'w' but reading old content? | |
| # For now, let's just append new findings or overwrite if it's a full run. | |
| # User asked to "tag data that we have already ingested", usually implies skipping. | |
| # Check if output file exists, if so append, else write header | |
| mode = 'a' if os.path.exists(OUTPUT_FILE) else 'w' | |
| with open(OUTPUT_FILE, mode, encoding="utf-8") as f: | |
| if mode == 'w': | |
| f.write("# LOGOS Diagram Analysis (Context-Primed)\n") | |
| f.write(f"**Generated:** {time.strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| f.write(f"**Pipeline:** Context({TEXT_MODEL}) -> Vision({VISION_MODEL})\n\n") | |
| for i, img_path in enumerate(images): | |
| filename = os.path.basename(img_path) | |
| # [DEDUPLICATION CHECK] | |
| if registry.is_processed(img_path): | |
| print(f"[{i+1}/{len(images)}] Skipping {filename} (Already Ingested) [Duplicate Path Atom]") | |
| continue | |
| clean_name = os.path.splitext(filename)[0].replace("_", " ").replace("-", " ") | |
| print(f"[{i+1}/{len(images)}] Processing: {filename}...") | |
| try: | |
| # Force Hardware Cool-down between items | |
| time.sleep(1.0) | |
| # STEP A: Context Priming (Dolphin) | |
| print(f" > [Dolphin] Extracting Context (timeout=30s)...") | |
| try: | |
| context_prompt = f"Analyze filename '{clean_name}'. Return JSON {{'title': '...', 'description': '...'}}." | |
| context_resp, _ = connector.chat(context_prompt, model=TEXT_MODEL) | |
| # Robust JSON extraction | |
| import json | |
| try: | |
| s = context_resp.find('{') | |
| e = context_resp.rfind('}') + 1 | |
| if s != -1 and e != -1: | |
| meta = json.loads(context_resp[s:e]) | |
| refined_context = f"Context: {meta.get('title', clean_name)}. {meta.get('description', '')}" | |
| else: | |
| refined_context = f"Context: {clean_name}" | |
| except: | |
| refined_context = f"Context: {context_resp[:200].replace(chr(10), ' ')}" | |
| except Exception as e: | |
| print(f" > [Dolphin] Bypass (Error: {e})") | |
| refined_context = f"Context: {clean_name}" | |
| # Hardware Switch Delay | |
| print(f" > [System] Swapping to Vision Model...") | |
| time.sleep(1.0) | |
| # STEP B: Vision Analysis (Gemma) | |
| print(f" > [Gemma] performing Context-Aware Analysis...") | |
| # [CACHE OPTIMIZATION] | |
| # We move the Static Instructions to the FRONT and the Dynamic Context to the BACK. | |
| # Dynamic part | |
| user_content = f"Context: {refined_context}\nTarget: {filename}" | |
| # Static System/Instruction Block (Cached) | |
| vision_system = ( | |
| "Role: Senior Hardware Architect.\n" | |
| "Task: Analyze the diagram. Validate if the visual data matches the context.\n" | |
| "Extract:\n" | |
| "1. Hardware components (Bus, ALU, Cache).\n" | |
| "2. Data flow direction.\n" | |
| "3. Mathematical formulas.\n" | |
| "Output: Actionable Markdown." | |
| ) | |
| full_prompt = f"{vision_system}\n\n[DATA]: {user_content}" | |
| start_ts = time.time() | |
| analysis, _ = connector.chat(full_prompt, image_path=img_path, model=VISION_MODEL) | |
| duration = time.time() - start_ts | |
| # Anneal into Knowledge Base | |
| f.write(f"## {filename}\n") | |
| f.write(f"**Context ({TEXT_MODEL}):** {refined_context}\n\n") | |
| f.write(f"})\n\n") | |
| f.write(f"{analysis}\n\n") | |
| f.write(f"*Analysis time: {duration:.2f}s*\n") | |
| f.write("---\n\n") | |
| f.flush() | |
| print(f" > Ingested ({duration:.2f}s)") | |
| # [PROTOCOL 26: ANALOG TO DIGITAL INDEXING] | |
| # We send the Vision Analysis to the Server to be converted into a Gödel Number | |
| try: | |
| import requests | |
| payload = { | |
| "filepath": img_path, | |
| "content": f"{refined_context}\n{analysis}" | |
| } | |
| # Assuming default port | |
| res = requests.post("http://localhost:5000/index-module", json=payload, timeout=5) | |
| if res.status_code == 200: | |
| data = res.json() | |
| mid = data.get('manifold_id') | |
| primes = data.get('prime_coordinates') | |
| print(f" > [GÖDEL] Indexing Complete. Manifold ID: {mid}") | |
| print(f" > [PRIME SPACE] Active Domains: {primes}") | |
| f.write(f"**Gödel ID:** `{mid}`\n**Prime Vectors:** `{primes}`\n\n") | |
| else: | |
| print(f" > [GÖDEL] Indexing Failed: {res.status_code}") | |
| except Exception as e: | |
| print(f" > [GÖDEL] Indexing Error: {e}") | |
| # [REGISTRY UPDATE] | |
| registry.mark_processed(img_path, meta={"duration": duration, "context": refined_context}) | |
| except Exception as e: | |
| print(f" > [FAIL] Error: {e}") | |
| f.write(f"## {filename}\n") | |
| f.write(f"**Error:** {e}\n\n---\n\n") | |
| print(f"\n[SUCCESS] Knowledge synthesis complete.") | |
| if __name__ == "__main__": | |
| ingest_diagrams() | |
| ingest_documents() | |