File size: 5,495 Bytes
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f91e0b
24f95f0
5f91e0b
 
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
import glob
import logging
import time
import uuid
from typing import Optional

from app.config import MEMORY_DIR, DATA_DIR

logger = logging.getLogger(__name__)

Path(MEMORY_DIR).mkdir(parents=True, exist_ok=True)

KNOWLEDGE_DIR = DATA_DIR / "knowledge"
KNOWLEDGE_DIR.mkdir(parents=True, exist_ok=True)
KNOWLEDGE_QUERY_LOG = DATA_DIR / "adaptive" / "knowledge_query_log.json"


def save_case(case_id: str, payload: dict) -> str:
    path = Path(MEMORY_DIR) / f"{case_id}.json"
    payload["saved_at"] = datetime.utcnow().isoformat()
    with open(path, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2, ensure_ascii=False)
    return str(path)


class KnowledgeStore:
    """
    Simple keyword match over knowledge JSON files.
    Each file is expected to be a dict or list of dicts with a 'text' field.
    Upgrade to embedding-based retrieval when ready.
    """

    def _load_query_log(self) -> list[dict]:
        if not KNOWLEDGE_QUERY_LOG.exists():
            return []
        try:
            return json.loads(KNOWLEDGE_QUERY_LOG.read_text(encoding="utf-8"))
        except Exception:
            return []

    def _save_query_log(self, entries: list[dict]) -> None:
        try:
            KNOWLEDGE_QUERY_LOG.parent.mkdir(parents=True, exist_ok=True)
            KNOWLEDGE_QUERY_LOG.write_text(
                json.dumps(entries[-100:], indent=2), encoding="utf-8"
            )
        except Exception as exc:
            logger.debug("KnowledgeStore query log save failed: %s", exc)

    def _record_query(self, query: str, domain: str, result_count: int) -> None:
        log = self._load_query_log()
        log.append(
            {
                "query": query,
                "domain": domain,
                "result_count": result_count,
                "timestamp": time.time(),
            }
        )
        self._save_query_log(log)

    def _iter_items(self) -> list[dict]:
        items: list[dict] = []
        pattern = str(KNOWLEDGE_DIR / "*.json")
        for path in glob.glob(pattern):
            try:
                data = json.loads(Path(path).read_text(encoding="utf-8"))
                if isinstance(data, list):
                    items.extend(item for item in data if isinstance(item, dict))
                elif isinstance(data, dict):
                    items.append(data)
            except Exception:
                continue
        return items

    def save_knowledge(self, item: dict) -> str:
        item_id = item.get("id") or str(uuid.uuid4())
        payload = dict(item)
        payload["id"] = item_id
        payload.setdefault("saved_at", datetime.utcnow().isoformat())
        path = KNOWLEDGE_DIR / f"{item_id}.json"
        path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
        return item_id

    def list_all(self, limit: Optional[int] = None) -> list[dict]:
        items = self._iter_items()

        def _sort_key(item: dict) -> float:
            marker = item.get("saved_at") or item.get("timestamp")
            if isinstance(marker, (int, float)):
                return float(marker)
            if isinstance(marker, str) and marker:
                try:
                    return datetime.fromisoformat(marker.replace("Z", "+00:00")).timestamp()
                except ValueError:
                    return 0.0
            return 0.0

        items.sort(
            key=_sort_key,
            reverse=True,
        )
        return items[:limit] if limit else items

    def search(
        self,
        query: str,
        domain: str = "general",
        top_k: int = 5,
        limit: Optional[int] = None,
        **kwargs,
    ) -> list[dict]:
        if kwargs:
             logger.debug(f"KnowledgeStore.search ignoring: {kwargs}")
        results = []
        query_lower = query.lower()
        requested = limit or top_k
        for item in self._iter_items():
            item_domain = item.get("domain") or item.get("topic") or "general"
            if domain not in ("", "general") and domain not in str(item_domain).lower():
                continue
            text = " ".join(
                [
                    str(item.get("text", "")),
                    str(item.get("content", "")),
                    str(item.get("summary", "")),
                    str(item.get("title", "")),
                    str(item.get("topic", "")),
                ]
            ).lower()
            if any(word for word in query_lower.split() if word in text):
                results.append(item)
                if len(results) >= requested:
                    break

        self._record_query(query, domain, len(results))
        return results

    def get_recent_queries(self, limit: int = 20) -> list[dict]:
        return list(reversed(self._load_query_log()))[:limit]

    def get_stats(self) -> dict:
        items = self._iter_items()
        domain_counts: dict[str, int] = {}
        for item in items:
            domain = str(item.get("domain") or item.get("topic") or "general")
            domain_counts[domain] = domain_counts.get(domain, 0) + 1

        return {
            "total_queries": len(self._load_query_log()),
            "total_entities": 0,
            "total_links": 0,
            "domain_counts": domain_counts,
            "knowledge_items": len(items),
        }


knowledge_store = KnowledgeStore()