| """
|
| Autonomous Gap-Filling via LLM Integration (Phase 4.0)
|
| ======================================================
|
| Bridges the GapDetector with the existing HAIMLLMIntegrator to autonomously
|
| fill detected knowledge gaps by generating and storing synthetic memories.
|
|
|
| Pipeline:
|
| 1. GapFiller polls GapDetector for high-priority open gaps.
|
| 2. For each gap, it constructs a prompt asking the LLM to fill it.
|
| 3. The LLM response is parsed into discrete factual statements.
|
| 4. Each statement is stored in the engine as a new memory node, tagged
|
| with metadata: {"source": "llm_gap_fill", "gap_id": ..., "query": ...}.
|
| 5. The gap record is marked as filled in the detector registry.
|
|
|
| Safety controls:
|
| - Rate-limiting: max N gap-fill calls per hour (configurable).
|
| - Confidence gate: only fill gaps that stay unresolved after min_reqs queries.
|
| - Dry-run mode: generate responses but don't store them.
|
| - Minimum priority threshold before triggering LLM calls.
|
|
|
| Usage:
|
| filler = GapFiller(engine, integrator, detector)
|
| await filler.start() # background task
|
| results = await filler.fill_now(n=5) # immediate fill of top-5 gaps
|
| await filler.stop()
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import asyncio
|
| import time
|
| from dataclasses import dataclass, field
|
| from datetime import datetime, timezone
|
| from typing import Dict, List, Optional
|
| from loguru import logger
|
|
|
| from .gap_detector import GapDetector, GapRecord
|
|
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class GapFillerConfig:
|
| """Controls how/when the gap filler triggers LLM calls."""
|
| poll_interval_seconds: float = 600.0
|
| max_fills_per_hour: int = 20
|
| min_priority_to_fill: float = 0.3
|
| min_seen_before_fill: int = 2
|
| max_statements_per_gap: int = 5
|
| dry_run: bool = False
|
| store_tag: str = "llm_gap_fill"
|
| enabled: bool = True
|
|
|
|
|
|
|
|
|
|
|
|
|
| _FILL_PROMPT_TEMPLATE = """You are an expert knowledge assistant integrated into a cognitive memory system.
|
| A user recently queried for information that the system could not adequately answer.
|
|
|
| Query topic: "{query}"
|
|
|
| Please provide a concise, factual response about this topic. Structure your answer as
|
| {max_statements} distinct, standalone factual statements (one per line, no numbering needed).
|
| Each statement should be directly useful for answering future questions about this topic.
|
| Keep each statement under 150 words. Be objective and accurate.
|
|
|
| Statements:"""
|
|
|
| _REFINE_PROMPT_TEMPLATE = """You are helping fill a knowledge gap in a memory system.
|
|
|
| The topic "{query}" was queried {seen} times without a satisfactory answer.
|
|
|
| Provide {max_statements} concise factual statements that would help answer this topic.
|
| One statement per line. Be specific, factual, and succinct (max 120 words each).
|
|
|
| Statements:"""
|
|
|
|
|
|
|
|
|
|
|
|
|
| class GapFiller:
|
| """
|
| Autonomous LLM-driven knowledge gap filler.
|
|
|
| Integrates with GapDetector (finds gaps) and HAIMLLMIntegrator (fills them).
|
| """
|
|
|
| def __init__(
|
| self,
|
| engine,
|
| llm_integrator,
|
| gap_detector: GapDetector,
|
| config: Optional[GapFillerConfig] = None,
|
| ):
|
| self.engine = engine
|
| self.llm = llm_integrator
|
| self.detector = gap_detector
|
| self.cfg = config or GapFillerConfig()
|
| self._task: Optional[asyncio.Task] = None
|
| self._running = False
|
| self._fill_timestamps: List[float] = []
|
| self.stats: Dict = {
|
| "gaps_filled": 0,
|
| "statements_stored": 0,
|
| "llm_calls": 0,
|
| "errors": 0,
|
| }
|
|
|
|
|
|
|
| async def start(self) -> None:
|
| if not self.cfg.enabled:
|
| logger.info("GapFiller disabled by config.")
|
| return
|
| self._running = True
|
| self._task = asyncio.create_task(self._poll_loop(), name="gap_filler")
|
| logger.info(
|
| f"GapFiller started — polling every {self.cfg.poll_interval_seconds}s"
|
| )
|
|
|
| async def stop(self) -> None:
|
| self._running = False
|
| if self._task and not self._task.done():
|
| self._task.cancel()
|
| try:
|
| await self._task
|
| except asyncio.CancelledError:
|
| pass
|
| logger.info("GapFiller stopped.")
|
|
|
|
|
|
|
| async def _poll_loop(self) -> None:
|
| while self._running:
|
| try:
|
| await asyncio.sleep(self.cfg.poll_interval_seconds)
|
| if self._running:
|
| await self.fill_now(n=5)
|
| except asyncio.CancelledError:
|
| break
|
| except Exception as exc:
|
| logger.error(f"GapFiller poll error: {exc}", exc_info=True)
|
| self.stats["errors"] += 1
|
| await asyncio.sleep(60)
|
|
|
|
|
|
|
| async def fill_now(self, n: int = 5) -> List[Dict]:
|
| """
|
| Immediately fill the top-n open gaps.
|
|
|
| Returns:
|
| List of fill result dicts.
|
| """
|
| if not self._rate_check():
|
| logger.warning("GapFiller rate limit reached — skipping fill cycle.")
|
| return []
|
|
|
| open_gaps = self.detector.get_open_gaps(top_n=n * 3)
|
| eligible = [
|
| g for g in open_gaps
|
| if g.priority_score >= self.cfg.min_priority_to_fill
|
| and g.seen_count >= self.cfg.min_seen_before_fill
|
| ][:n]
|
|
|
| results = []
|
| for gap in eligible:
|
| if not self._rate_check():
|
| break
|
| result = await self._fill_gap(gap)
|
| results.append(result)
|
|
|
| return results
|
|
|
|
|
|
|
| async def _fill_gap(self, gap: GapRecord) -> Dict:
|
| """Generate and store knowledge for a single gap."""
|
| logger.info(
|
| f"Filling gap '{gap.query_text[:60]}' "
|
| f"(priority={gap.priority_score:.3f} seen={gap.seen_count})"
|
| )
|
|
|
|
|
| prompt = _REFINE_PROMPT_TEMPLATE.format(
|
| query=gap.query_text,
|
| seen=gap.seen_count,
|
| max_statements=self.cfg.max_statements_per_gap,
|
| )
|
|
|
|
|
| try:
|
| loop = asyncio.get_running_loop()
|
| raw_response = await loop.run_in_executor(
|
| None, self.llm._call_llm, prompt, 512
|
| )
|
| self._record_call()
|
| self.stats["llm_calls"] += 1
|
| except Exception as exc:
|
| logger.error(f"LLM call failed for gap {gap.gap_id}: {exc}")
|
| self.stats["errors"] += 1
|
| return {"gap_id": gap.gap_id, "status": "error", "error": str(exc)}
|
|
|
|
|
| statements = self._parse_statements(raw_response)
|
| if not statements:
|
| logger.warning(f"LLM returned no parseable statements for gap {gap.gap_id}")
|
| return {"gap_id": gap.gap_id, "status": "empty_response"}
|
|
|
|
|
| stored_ids = []
|
| if not self.cfg.dry_run:
|
| for stmt in statements:
|
| if not stmt.strip():
|
| continue
|
| meta = {
|
| "source": self.cfg.store_tag,
|
| "gap_id": gap.gap_id,
|
| "gap_query": gap.query_text,
|
| "gap_signal": gap.signal,
|
| "generated_at": datetime.now(timezone.utc).isoformat(),
|
| "tags": ["gap_fill", "llm_generated"],
|
| }
|
| try:
|
| node_id = await self.engine.store(stmt.strip(), metadata=meta)
|
| stored_ids.append(node_id)
|
| self.stats["statements_stored"] += 1
|
| except Exception as exc:
|
| logger.error(f"Failed to store gap-fill statement: {exc}")
|
|
|
|
|
| self.detector.mark_filled(gap.gap_id)
|
| self.stats["gaps_filled"] += 1
|
|
|
| result = {
|
| "gap_id": gap.gap_id,
|
| "query": gap.query_text,
|
| "status": "filled" if not self.cfg.dry_run else "dry_run",
|
| "statements": statements,
|
| "stored_node_ids": stored_ids,
|
| }
|
|
|
| logger.info(
|
| f"Gap filled: '{gap.query_text[:50]}' "
|
| f"→ {len(stored_ids)} statements stored"
|
| )
|
| return result
|
|
|
|
|
|
|
| def _parse_statements(self, raw: str) -> List[str]:
|
| """
|
| Split LLM response into individual factual statements.
|
| Handles bullet points, numbered lists, and plain line-breaks.
|
| """
|
| import re
|
| lines = raw.strip().split("\n")
|
| statements = []
|
| for line in lines:
|
|
|
| clean = re.sub(r"^[\s\-\*\d\.\)]+", "", line).strip()
|
| if len(clean) > 20:
|
| statements.append(clean)
|
| return statements[: self.cfg.max_statements_per_gap]
|
|
|
| def _rate_check(self) -> bool:
|
| """True if under the hourly rate limit."""
|
| now = time.time()
|
|
|
| self._fill_timestamps = [t for t in self._fill_timestamps if now - t < 3600]
|
| return len(self._fill_timestamps) < self.cfg.max_fills_per_hour
|
|
|
| def _record_call(self) -> None:
|
| """Record a fill call timestamp for rate limiting."""
|
| self._fill_timestamps.append(time.time())
|
|
|