LOGOS-SPCW-Matroska / logos /ingest_knowledge.py
GitHub Copilot
Fix visualization: Persist Physics Geometry to Global State
b460317
"""
logos/ingest_knowledge.py - The "Token Consumption Plan"
Protocol 4: Autonomous Resource Integration -> Knowledge Synthesis
This script executes the "Ingestion" workflow:
1. Scans 'LOGOS Notes' for diagrammatic knowledge.
2. Uses the Nano Swarm (Gemma Vision) to "consume" the visual tokens.
3. Transmutes visual logic into textual knowledge (Markdown).
Target: 'LOGOS Notes/*.png' -> 'knowledge_base/diagram_analysis.md'
"""
import sys
import os
import glob
import time
from typing import List
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from logos.connectors import get_connector
# Configuration
SOURCE_DIR = "LOGOS Notes"
OUTPUT_FILE = "knowledge_base/diagram_analysis.md"
VISION_MODEL = "google/gemma-3-4b"
TEXT_MODEL = "dolphin-x1-8b"
def ingest_diagrams():
print(f"--- LOGOS Knowledge Ingestion [Protocol 4: Context-Primed Vision] ---")
print(f"Targeting: {SOURCE_DIR}")
print(f"Agents: {TEXT_MODEL} (Context) -> {VISION_MODEL} (Analysis)")
# 1. Scout Resources
images = glob.glob(os.path.join(SOURCE_DIR, "*.png")) + glob.glob(os.path.join(SOURCE_DIR, "*.jpg"))
images.sort()
if not images:
print(f"[WARN] No assets found in {SOURCE_DIR}")
return
# 2. Initialize Agents
try:
# We reuse the connector but switch 'model' param per call to stay hardware aligned (one active stream)
connector = get_connector('local')
except Exception as e:
print(f"[FAIL] Could not spawn connector: {e}")
return
# 3. Execution Loop
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
f.write("# LOGOS Diagram Analysis (Context-Primed)\n")
f.write(f"**Generated:** {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"**Pipeline:** Context({TEXT_MODEL}) -> Vision({VISION_MODEL})\n\n")
for i, img_path in enumerate(images):
filename = os.path.basename(img_path)
clean_name = os.path.splitext(filename)[0].replace("_", " ").replace("-", " ")
print(f"[{i+1}/{len(images)}] Processing: {filename}...")
try:
# Force Hardware Cool-down between items
time.sleep(2.0)
# STEP A: Context Priming (Dolphin)
print(f" > [Dolphin] Extracting Context (timeout=30s)...")
try:
context_prompt = f"Analyze filename '{clean_name}'. Return JSON {{'title': '...', 'description': '...'}}."
# Debug: Print ensuring valid string
# print(f"DEBUG: Prompting Dolphin with: {context_prompt}")
context_resp, _ = connector.chat(context_prompt, model=TEXT_MODEL)
# Robust JSON extraction
import json
try:
# Find first '{' and last '}'
s = context_resp.find('{')
e = context_resp.rfind('}') + 1
if s != -1 and e != -1:
meta = json.loads(context_resp[s:e])
refined_context = f"Context: {meta.get('title', clean_name)}. {meta.get('description', '')}"
else:
refined_context = f"Context: {clean_name}"
except:
refined_context = f"Context: {context_resp[:200].replace(chr(10), ' ')}"
except Exception as e:
print(f" > [Dolphin] Bypass (Error: {e})")
refined_context = f"Context: {clean_name}"
# Hardware Switch Delay
print(f" > [System] Swapping to Vision Model...")
time.sleep(1.0)
# STEP B: Vision Analysis (Gemma)
print(f" > [Gemma] performing Context-Aware Analysis...")
vision_prompt = f"""
Role: Senior Hardware Architect.
Context: {refined_context}
Task: Analyze the diagram. Validate if the visual data matches the context.
Extract:
1. Hardware components (Bus, ALU, Cache).
2. Data flow direction.
3. Mathematical formulas.
Output: Actionable Markdown.
"""
start_ts = time.time()
analysis, _ = connector.chat(vision_prompt, image_path=img_path, model=VISION_MODEL)
duration = time.time() - start_ts
# Anneal into Knowledge Base
f.write(f"## {filename}\n")
f.write(f"**Context ({TEXT_MODEL}):** {refined_context}\n\n")
f.write(f"![{filename}](../{img_path.replace(os.sep, '/')})\n\n")
f.write(f"{analysis}\n\n")
f.write(f"*Analysis time: {duration:.2f}s*\n")
f.write("---\n\n")
f.flush()
print(f" > Ingested ({duration:.2f}s)")
except Exception as e:
print(f" > [FAIL] Error: {e}")
f.write(f"## {filename}\n")
f.write(f"**Error:** {e}\n\n---\n\n")
print(f"\n[SUCCESS] Knowledge synthesis complete.")
print(f"Artifact: {OUTPUT_FILE}")
# --- INGESTION REGISTRY (Deduplication) ---
class IngestionRegistry:
def __init__(self, registry_path="logos/ingestion_registry.json"):
self.registry_path = registry_path
self.data = self._load()
def _load(self):
if os.path.exists(self.registry_path):
try:
import json
with open(self.registry_path, 'r') as f:
return json.load(f)
except:
pass
return {}
def save(self):
import json
with open(self.registry_path, 'w') as f:
json.dump(self.data, f, indent=2)
def is_processed(self, filepath):
"""Checks if file is already ingested based on mtime."""
stat = os.stat(filepath)
key = os.path.abspath(filepath)
last_mtime = self.data.get(key, {}).get("mtime", 0)
return stat.st_mtime <= last_mtime
def mark_processed(self, filepath, meta=None):
"""Tags data as ingested."""
key = os.path.abspath(filepath)
self.data[key] = {
"mtime": os.stat(filepath).st_mtime,
"timestamp": time.time(),
"meta": meta or {}
}
self.save()
def ingest_diagrams():
print(f"--- LOGOS Knowledge Ingestion [Protocol 4: Context-Primed Vision] ---")
print(f"Targeting: {SOURCE_DIR}")
print(f"Agents: {TEXT_MODEL} (Context) -> {VISION_MODEL} (Analysis)")
registry = IngestionRegistry()
# 1. Scout Resources
images = glob.glob(os.path.join(SOURCE_DIR, "*.png")) + glob.glob(os.path.join(SOURCE_DIR, "*.jpg"))
images.sort()
if not images:
print(f"[WARN] No assets found in {SOURCE_DIR}")
return
# 2. Initialize Agents
try:
# We reuse the connector but switch 'model' param per call to stay hardware aligned (one active stream)
connector = get_connector('local')
except Exception as e:
print(f"[FAIL] Could not spawn connector: {e}")
return
# 3. Execution Loop
# Write mode 'a' (append) to preserve history if we are skipping?
# Or 'w' but reading old content?
# For now, let's just append new findings or overwrite if it's a full run.
# User asked to "tag data that we have already ingested", usually implies skipping.
# Check if output file exists, if so append, else write header
mode = 'a' if os.path.exists(OUTPUT_FILE) else 'w'
with open(OUTPUT_FILE, mode, encoding="utf-8") as f:
if mode == 'w':
f.write("# LOGOS Diagram Analysis (Context-Primed)\n")
f.write(f"**Generated:** {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"**Pipeline:** Context({TEXT_MODEL}) -> Vision({VISION_MODEL})\n\n")
for i, img_path in enumerate(images):
filename = os.path.basename(img_path)
# [DEDUPLICATION CHECK]
if registry.is_processed(img_path):
print(f"[{i+1}/{len(images)}] Skipping {filename} (Already Ingested) [Duplicate Path Atom]")
continue
clean_name = os.path.splitext(filename)[0].replace("_", " ").replace("-", " ")
print(f"[{i+1}/{len(images)}] Processing: {filename}...")
try:
# Force Hardware Cool-down between items
time.sleep(1.0)
# STEP A: Context Priming (Dolphin)
print(f" > [Dolphin] Extracting Context (timeout=30s)...")
try:
context_prompt = f"Analyze filename '{clean_name}'. Return JSON {{'title': '...', 'description': '...'}}."
context_resp, _ = connector.chat(context_prompt, model=TEXT_MODEL)
# Robust JSON extraction
import json
try:
s = context_resp.find('{')
e = context_resp.rfind('}') + 1
if s != -1 and e != -1:
meta = json.loads(context_resp[s:e])
refined_context = f"Context: {meta.get('title', clean_name)}. {meta.get('description', '')}"
else:
refined_context = f"Context: {clean_name}"
except:
refined_context = f"Context: {context_resp[:200].replace(chr(10), ' ')}"
except Exception as e:
print(f" > [Dolphin] Bypass (Error: {e})")
refined_context = f"Context: {clean_name}"
# Hardware Switch Delay
print(f" > [System] Swapping to Vision Model...")
time.sleep(1.0)
# STEP B: Vision Analysis (Gemma)
print(f" > [Gemma] performing Context-Aware Analysis...")
# [CACHE OPTIMIZATION]
# We move the Static Instructions to the FRONT and the Dynamic Context to the BACK.
# Dynamic part
user_content = f"Context: {refined_context}\nTarget: {filename}"
# Static System/Instruction Block (Cached)
vision_system = (
"Role: Senior Hardware Architect.\n"
"Task: Analyze the diagram. Validate if the visual data matches the context.\n"
"Extract:\n"
"1. Hardware components (Bus, ALU, Cache).\n"
"2. Data flow direction.\n"
"3. Mathematical formulas.\n"
"Output: Actionable Markdown."
)
full_prompt = f"{vision_system}\n\n[DATA]: {user_content}"
start_ts = time.time()
analysis, _ = connector.chat(full_prompt, image_path=img_path, model=VISION_MODEL)
duration = time.time() - start_ts
# Anneal into Knowledge Base
f.write(f"## {filename}\n")
f.write(f"**Context ({TEXT_MODEL}):** {refined_context}\n\n")
f.write(f"![{filename}](../{img_path.replace(os.sep, '/')})\n\n")
f.write(f"{analysis}\n\n")
f.write(f"*Analysis time: {duration:.2f}s*\n")
f.write("---\n\n")
f.flush()
print(f" > Ingested ({duration:.2f}s)")
# [PROTOCOL 26: ANALOG TO DIGITAL INDEXING]
# We send the Vision Analysis to the Server to be converted into a Gödel Number
try:
import requests
payload = {
"filepath": img_path,
"content": f"{refined_context}\n{analysis}"
}
# Assuming default port
res = requests.post("http://localhost:5000/index-module", json=payload, timeout=5)
if res.status_code == 200:
data = res.json()
mid = data.get('manifold_id')
primes = data.get('prime_coordinates')
print(f" > [GÖDEL] Indexing Complete. Manifold ID: {mid}")
print(f" > [PRIME SPACE] Active Domains: {primes}")
f.write(f"**Gödel ID:** `{mid}`\n**Prime Vectors:** `{primes}`\n\n")
else:
print(f" > [GÖDEL] Indexing Failed: {res.status_code}")
except Exception as e:
print(f" > [GÖDEL] Indexing Error: {e}")
# [REGISTRY UPDATE]
registry.mark_processed(img_path, meta={"duration": duration, "context": refined_context})
except Exception as e:
print(f" > [FAIL] Error: {e}")
f.write(f"## {filename}\n")
f.write(f"**Error:** {e}\n\n---\n\n")
print(f"\n[SUCCESS] Knowledge synthesis complete.")
if __name__ == "__main__":
ingest_diagrams()
ingest_documents()