Janus-backend / backend /app /memory.py
DevodG's picture
feat: stable janus intelligence with kaggle distillation
5f91e0b
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
import glob
import logging
import time
import uuid
from typing import Optional
from app.config import MEMORY_DIR, DATA_DIR
logger = logging.getLogger(__name__)
Path(MEMORY_DIR).mkdir(parents=True, exist_ok=True)
KNOWLEDGE_DIR = DATA_DIR / "knowledge"
KNOWLEDGE_DIR.mkdir(parents=True, exist_ok=True)
KNOWLEDGE_QUERY_LOG = DATA_DIR / "adaptive" / "knowledge_query_log.json"
def save_case(case_id: str, payload: dict) -> str:
path = Path(MEMORY_DIR) / f"{case_id}.json"
payload["saved_at"] = datetime.utcnow().isoformat()
with open(path, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, ensure_ascii=False)
return str(path)
class KnowledgeStore:
"""
Simple keyword match over knowledge JSON files.
Each file is expected to be a dict or list of dicts with a 'text' field.
Upgrade to embedding-based retrieval when ready.
"""
def _load_query_log(self) -> list[dict]:
if not KNOWLEDGE_QUERY_LOG.exists():
return []
try:
return json.loads(KNOWLEDGE_QUERY_LOG.read_text(encoding="utf-8"))
except Exception:
return []
def _save_query_log(self, entries: list[dict]) -> None:
try:
KNOWLEDGE_QUERY_LOG.parent.mkdir(parents=True, exist_ok=True)
KNOWLEDGE_QUERY_LOG.write_text(
json.dumps(entries[-100:], indent=2), encoding="utf-8"
)
except Exception as exc:
logger.debug("KnowledgeStore query log save failed: %s", exc)
def _record_query(self, query: str, domain: str, result_count: int) -> None:
log = self._load_query_log()
log.append(
{
"query": query,
"domain": domain,
"result_count": result_count,
"timestamp": time.time(),
}
)
self._save_query_log(log)
def _iter_items(self) -> list[dict]:
items: list[dict] = []
pattern = str(KNOWLEDGE_DIR / "*.json")
for path in glob.glob(pattern):
try:
data = json.loads(Path(path).read_text(encoding="utf-8"))
if isinstance(data, list):
items.extend(item for item in data if isinstance(item, dict))
elif isinstance(data, dict):
items.append(data)
except Exception:
continue
return items
def save_knowledge(self, item: dict) -> str:
item_id = item.get("id") or str(uuid.uuid4())
payload = dict(item)
payload["id"] = item_id
payload.setdefault("saved_at", datetime.utcnow().isoformat())
path = KNOWLEDGE_DIR / f"{item_id}.json"
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
return item_id
def list_all(self, limit: Optional[int] = None) -> list[dict]:
items = self._iter_items()
def _sort_key(item: dict) -> float:
marker = item.get("saved_at") or item.get("timestamp")
if isinstance(marker, (int, float)):
return float(marker)
if isinstance(marker, str) and marker:
try:
return datetime.fromisoformat(marker.replace("Z", "+00:00")).timestamp()
except ValueError:
return 0.0
return 0.0
items.sort(
key=_sort_key,
reverse=True,
)
return items[:limit] if limit else items
def search(
self,
query: str,
domain: str = "general",
top_k: int = 5,
limit: Optional[int] = None,
**kwargs,
) -> list[dict]:
if kwargs:
logger.debug(f"KnowledgeStore.search ignoring: {kwargs}")
results = []
query_lower = query.lower()
requested = limit or top_k
for item in self._iter_items():
item_domain = item.get("domain") or item.get("topic") or "general"
if domain not in ("", "general") and domain not in str(item_domain).lower():
continue
text = " ".join(
[
str(item.get("text", "")),
str(item.get("content", "")),
str(item.get("summary", "")),
str(item.get("title", "")),
str(item.get("topic", "")),
]
).lower()
if any(word for word in query_lower.split() if word in text):
results.append(item)
if len(results) >= requested:
break
self._record_query(query, domain, len(results))
return results
def get_recent_queries(self, limit: int = 20) -> list[dict]:
return list(reversed(self._load_query_log()))[:limit]
def get_stats(self) -> dict:
items = self._iter_items()
domain_counts: dict[str, int] = {}
for item in items:
domain = str(item.get("domain") or item.get("topic") or "general")
domain_counts[domain] = domain_counts.get(domain, 0) + 1
return {
"total_queries": len(self._load_query_log()),
"total_entities": 0,
"total_links": 0,
"domain_counts": domain_counts,
"knowledge_items": len(items),
}
knowledge_store = KnowledgeStore()