NLP-intelligence / nlp_core /knowledge_base.py
Nomio4640's picture
submodule
59f9987
"""
knowledge_base.py — SQLite-backed storage for admin knowledge tree
AND persistent analysis result history.
Changes from original:
1. seed_stopwords(words) — call this once at startup to populate the
stopwords table with all the hardcoded MONGOLIAN_STOPWORDS so the
admin can see and edit them in the UI.
2. Two new tables:
analysis_sessions — one row per upload/analysis run (summary)
analysis_documents — one row per document in that run
This means analysis results survive server restarts and you can
browse history from the admin panel.
3. save_analysis() / get_analysis() / list_analyses() — public API
for the new persistence layer.
4. db_stats() — returns table row counts so the /api/admin/db-stats
endpoint can show a quick health check.
"""
import sqlite3
import json
import os
from datetime import datetime
from typing import List, Optional, Dict, Any
from .models import KnowledgeEntry
class KnowledgeBase:
"""SQLite-backed knowledge base + analysis history store."""
def __init__(self, db_path: str = "knowledge.db"):
self.db_path = db_path
self._ensure_tables()
# ------------------------------------------------------------------
# Connection helper
# ------------------------------------------------------------------
def _get_conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
# Enable WAL mode — better for concurrent reads (FastAPI async)
conn.execute("PRAGMA journal_mode=WAL")
return conn
# ------------------------------------------------------------------
# Schema
# ------------------------------------------------------------------
def _ensure_tables(self):
"""Create all tables on first run. Safe to call repeatedly (IF NOT EXISTS)."""
conn = self._get_conn()
try:
conn.executescript("""
-- Admin knowledge tree
CREATE TABLE IF NOT EXISTS knowledge_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT NOT NULL,
category TEXT DEFAULT '',
entity_type TEXT DEFAULT '',
synonyms TEXT DEFAULT '[]',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Admin custom label mappings (e.g. PER -> "Улс төрч")
CREATE TABLE IF NOT EXISTS custom_labels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_label TEXT NOT NULL,
custom_label TEXT NOT NULL,
label_type TEXT DEFAULT 'entity',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(original_label, label_type)
);
-- Stopwords — ALL stopwords live here (seeded from hardcoded list
-- on first startup, then admin can add/remove freely)
CREATE TABLE IF NOT EXISTS stopwords (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
is_default INTEGER DEFAULT 0, -- 1 = seeded from code, 0 = added by admin
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Analysis history — one row per upload/analysis run
CREATE TABLE IF NOT EXISTS analysis_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_filename TEXT DEFAULT '',
total_documents INTEGER DEFAULT 0,
sentiment_summary TEXT DEFAULT '{}',
entity_summary TEXT DEFAULT '{}',
topic_summary TEXT DEFAULT '[]'
);
-- Per-document results linked to a session
CREATE TABLE IF NOT EXISTS analysis_documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL REFERENCES analysis_sessions(id) ON DELETE CASCADE,
doc_index INTEGER NOT NULL,
raw_text TEXT DEFAULT '',
nlp_text TEXT DEFAULT '',
source TEXT DEFAULT '',
sentiment_label TEXT DEFAULT '',
sentiment_score REAL DEFAULT 0.0,
entities TEXT DEFAULT '[]',
topic_id INTEGER DEFAULT -1,
topic_label TEXT DEFAULT '',
topic_keywords TEXT DEFAULT '[]'
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_knowledge_word ON knowledge_entries(word);
CREATE INDEX IF NOT EXISTS idx_knowledge_category ON knowledge_entries(category);
CREATE INDEX IF NOT EXISTS idx_docs_session ON analysis_documents(session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_created ON analysis_sessions(created_at);
""")
conn.commit()
finally:
conn.close()
# ------------------------------------------------------------------
# Stopword seeding
# ------------------------------------------------------------------
def seed_stopwords(self, words: List[str]) -> int:
"""
Populate the stopwords table from the hardcoded MONGOLIAN_STOPWORDS set.
Call this once at server startup (services.py). Uses INSERT OR IGNORE
so it's safe to call every restart — won't duplicate existing words.
Returns the count of newly inserted words.
is_default=1 marks these as system defaults. The admin UI can
optionally show them differently (e.g. greyed out, not deletable).
"""
conn = self._get_conn()
try:
before = conn.execute("SELECT COUNT(*) FROM stopwords").fetchone()[0]
conn.executemany(
"INSERT OR IGNORE INTO stopwords (word, is_default) VALUES (?, 1)",
[(w.lower().strip(),) for w in words if w.strip()],
)
conn.commit()
after = conn.execute("SELECT COUNT(*) FROM stopwords").fetchone()[0]
return after - before
finally:
conn.close()
def get_stopwords(self) -> List[str]:
conn = self._get_conn()
try:
rows = conn.execute("SELECT word FROM stopwords ORDER BY word").fetchall()
return [r["word"] for r in rows]
finally:
conn.close()
def add_stopword(self, word: str) -> bool:
conn = self._get_conn()
try:
conn.execute(
"INSERT OR IGNORE INTO stopwords (word, is_default) VALUES (?, 0)",
(word.lower().strip(),),
)
conn.commit()
return True
finally:
conn.close()
def delete_stopword(self, word: str) -> bool:
"""Delete a stopword. Default (seeded) stopwords can also be deleted."""
conn = self._get_conn()
try:
conn.execute("DELETE FROM stopwords WHERE word = ?", (word.lower().strip(),))
conn.commit()
return True
finally:
conn.close()
def get_stopwords_with_meta(self) -> List[Dict]:
"""Return stopwords with is_default flag — useful for admin UI display."""
conn = self._get_conn()
try:
rows = conn.execute(
"SELECT id, word, is_default, created_at FROM stopwords ORDER BY word"
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
# ------------------------------------------------------------------
# Analysis persistence
# ------------------------------------------------------------------
def save_analysis(
self,
documents: List[Dict],
sentiment_summary: Dict,
entity_summary: Dict,
topic_summary: List,
source_filename: str = "",
) -> tuple:
"""
Persist a full analysis run to the DB.
Args:
documents: list of dicts with keys: raw_text, nlp_text, source,
sentiment_label, sentiment_score, entities (list),
topic_id, topic_label, topic_keywords (list)
sentiment_summary: {"positive": N, "neutral": N, "negative": N}
entity_summary: {"PER": [...], "LOC": [...], ...}
topic_summary: list of topic dicts from BERTopic
source_filename: original CSV filename if applicable
Returns:
(session_id, doc_ids) — session_id for the session, doc_ids list
of DB ids for each inserted document (in order).
"""
conn = self._get_conn()
try:
cursor = conn.execute(
"""INSERT INTO analysis_sessions
(source_filename, total_documents, sentiment_summary,
entity_summary, topic_summary)
VALUES (?, ?, ?, ?, ?)""",
(
source_filename,
len(documents),
json.dumps(sentiment_summary, ensure_ascii=False),
json.dumps(entity_summary, ensure_ascii=False),
json.dumps(topic_summary, ensure_ascii=False),
),
)
session_id = cursor.lastrowid
doc_ids = []
for i, d in enumerate(documents):
c = conn.execute(
"""INSERT INTO analysis_documents
(session_id, doc_index, raw_text, nlp_text, source,
sentiment_label, sentiment_score, entities,
topic_id, topic_label, topic_keywords)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
i,
d.get("raw_text", ""),
d.get("nlp_text", ""),
d.get("source", ""),
d.get("sentiment_label", ""),
float(d.get("sentiment_score", 0.0)),
json.dumps(d.get("entities", []), ensure_ascii=False),
int(d.get("topic_id", -1)),
d.get("topic_label", ""),
json.dumps(d.get("topic_keywords", []), ensure_ascii=False),
),
)
doc_ids.append(c.lastrowid)
conn.commit()
return session_id, doc_ids
finally:
conn.close()
def list_analyses(self, limit: int = 20) -> List[Dict]:
"""Return the most recent analysis sessions (summary only, no documents)."""
conn = self._get_conn()
try:
rows = conn.execute(
"""SELECT id, created_at, source_filename, total_documents,
sentiment_summary, topic_summary
FROM analysis_sessions
ORDER BY created_at DESC
LIMIT ?""",
(limit,),
).fetchall()
result = []
for r in rows:
result.append({
"id": r["id"],
"created_at": r["created_at"],
"source_filename": r["source_filename"],
"total_documents": r["total_documents"],
"sentiment_summary": json.loads(r["sentiment_summary"]),
"topic_summary": json.loads(r["topic_summary"]),
})
return result
finally:
conn.close()
def get_analysis(self, session_id: int) -> Optional[Dict]:
"""Return a full analysis session including all documents."""
conn = self._get_conn()
try:
session = conn.execute(
"SELECT * FROM analysis_sessions WHERE id = ?", (session_id,)
).fetchone()
if not session:
return None
docs = conn.execute(
"""SELECT * FROM analysis_documents
WHERE session_id = ? ORDER BY doc_index""",
(session_id,),
).fetchall()
return {
"id": session["id"],
"created_at": session["created_at"],
"source_filename": session["source_filename"],
"total_documents": session["total_documents"],
"sentiment_summary": json.loads(session["sentiment_summary"]),
"entity_summary": json.loads(session["entity_summary"]),
"topic_summary": json.loads(session["topic_summary"]),
"documents": [
{
"id": d["id"],
"doc_index": d["doc_index"],
"raw_text": d["raw_text"],
"nlp_text": d["nlp_text"],
"source": d["source"],
"sentiment": {
"label": d["sentiment_label"],
"score": d["sentiment_score"],
},
"entities": json.loads(d["entities"]),
"topic": {
"topic_id": d["topic_id"],
"topic_label": d["topic_label"],
"keywords": json.loads(d["topic_keywords"]),
},
}
for d in docs
],
}
finally:
conn.close()
def delete_analysis(self, session_id: int) -> bool:
conn = self._get_conn()
try:
conn.execute("DELETE FROM analysis_sessions WHERE id = ?", (session_id,))
conn.commit()
return True
finally:
conn.close()
def update_document_annotations(
self,
doc_id: int,
entities: list,
sentiment_label: str,
sentiment_score: float,
) -> bool:
"""Update a single document's entities and sentiment in the DB."""
conn = self._get_conn()
try:
conn.execute(
"""UPDATE analysis_documents
SET entities=?, sentiment_label=?, sentiment_score=?
WHERE id=?""",
(
json.dumps(entities, ensure_ascii=False),
sentiment_label,
float(sentiment_score),
doc_id,
),
)
conn.commit()
return conn.execute(
"SELECT changes()"
).fetchone()[0] > 0
finally:
conn.close()
def get_all_documents(self) -> List[Dict]:
"""Return all documents across all sessions for global re-analysis."""
conn = self._get_conn()
try:
rows = conn.execute(
"""SELECT id, session_id, doc_index, raw_text, nlp_text,
source, sentiment_label, sentiment_score,
entities, topic_id, topic_label, topic_keywords
FROM analysis_documents
ORDER BY session_id, doc_index"""
).fetchall()
result = []
for d in rows:
result.append({
"id": d["id"],
"session_id": d["session_id"],
"doc_index": d["doc_index"],
"raw_text": d["raw_text"],
"nlp_text": d["nlp_text"],
"source": d["source"],
"sentiment_label": d["sentiment_label"],
"sentiment_score": d["sentiment_score"],
"entities": json.loads(d["entities"]),
})
return result
finally:
conn.close()
# ------------------------------------------------------------------
# Knowledge entries (unchanged from original)
# ------------------------------------------------------------------
def add_entry(self, entry: KnowledgeEntry) -> int:
conn = self._get_conn()
try:
cursor = conn.execute(
"INSERT INTO knowledge_entries (word, category, entity_type, synonyms) VALUES (?, ?, ?, ?)",
(entry.word, entry.category, entry.entity_type, json.dumps(entry.synonyms)),
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def get_entries(self, category: str = None) -> List[KnowledgeEntry]:
conn = self._get_conn()
try:
if category:
rows = conn.execute(
"SELECT * FROM knowledge_entries WHERE category = ? ORDER BY word",
(category,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM knowledge_entries ORDER BY category, word"
).fetchall()
return [self._row_to_entry(r) for r in rows]
finally:
conn.close()
def update_entry(self, entry_id: int, entry: KnowledgeEntry) -> bool:
conn = self._get_conn()
try:
conn.execute(
"""UPDATE knowledge_entries
SET word=?, category=?, entity_type=?, synonyms=?,
updated_at=CURRENT_TIMESTAMP
WHERE id=?""",
(entry.word, entry.category, entry.entity_type,
json.dumps(entry.synonyms), entry_id),
)
conn.commit()
return True
finally:
conn.close()
def delete_entry(self, entry_id: int) -> bool:
conn = self._get_conn()
try:
conn.execute("DELETE FROM knowledge_entries WHERE id = ?", (entry_id,))
conn.commit()
return True
finally:
conn.close()
def get_categories(self) -> List[str]:
conn = self._get_conn()
try:
rows = conn.execute(
"SELECT DISTINCT category FROM knowledge_entries WHERE category != '' ORDER BY category"
).fetchall()
return [r["category"] for r in rows]
finally:
conn.close()
# ------------------------------------------------------------------
# Custom labels (unchanged from original)
# ------------------------------------------------------------------
def set_label(self, original: str, custom: str, label_type: str = "entity"):
conn = self._get_conn()
try:
conn.execute(
"""INSERT INTO custom_labels (original_label, custom_label, label_type)
VALUES (?, ?, ?)
ON CONFLICT(original_label, label_type) DO UPDATE SET custom_label=?""",
(original, custom, label_type, custom),
)
conn.commit()
finally:
conn.close()
def get_labels(self, label_type: str = "entity") -> Dict[str, str]:
conn = self._get_conn()
try:
rows = conn.execute(
"SELECT original_label, custom_label FROM custom_labels WHERE label_type = ?",
(label_type,),
).fetchall()
return {r["original_label"]: r["custom_label"] for r in rows}
finally:
conn.close()
def delete_label(self, label_id: int) -> bool:
conn = self._get_conn()
try:
conn.execute("DELETE FROM custom_labels WHERE id = ?", (label_id,))
conn.commit()
return True
finally:
conn.close()
# ------------------------------------------------------------------
# DB stats — for admin health check endpoint
# ------------------------------------------------------------------
def db_stats(self) -> Dict[str, Any]:
"""Return row counts for all tables plus the DB file size."""
conn = self._get_conn()
try:
stats = {}
for table in (
"knowledge_entries", "custom_labels", "stopwords",
"analysis_sessions", "analysis_documents",
):
count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
stats[table] = count
stats["db_path"] = self.db_path
stats["db_size_kb"] = (
round(os.path.getsize(self.db_path) / 1024, 1)
if os.path.exists(self.db_path) else 0
)
return stats
finally:
conn.close()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _row_to_entry(row) -> KnowledgeEntry:
return KnowledgeEntry(
id=row["id"],
word=row["word"],
category=row["category"],
entity_type=row["entity_type"],
synonyms=json.loads(row["synonyms"]) if row["synonyms"] else [],
)