Nomio4640's picture
added test for NER
7726529
"""
Analysis router β€” handles CSV upload, text analysis, and network generation.
Changes from previous version:
1. Every analysis result is now saved to the DB via kb.save_analysis()
so results persist across server restarts.
2. MIN_TOPICS_DOCS lowered to 3 to match topic_modeler.py's KMeans fallback.
3. New endpoints:
GET /history β€” list past analysis sessions
GET /history/{id} β€” retrieve a full past session with documents
DELETE /history/{id} β€” delete a session
GET /db-stats β€” show table row counts + DB file size
"""
import csv
import io
import json
import logging
import time
import uuid
from typing import List
logger = logging.getLogger(__name__)
from fastapi import APIRouter, UploadFile, File, HTTPException
from adapters.api.schemas import (
TextAnalysisRequest, BatchAnalysisRequest,
AnalysisResponse, DocumentResponse, EntityResponse,
SentimentResponse, TopicResponse,
NetworkResponse, NetworkNodeResponse, NetworkEdgeResponse,
DocumentUpdateRequest,
)
from adapters.api import services
from nlp_core.models import EntityResult
router = APIRouter()
# Minimum docs to attempt topic modeling.
# Matches topic_modeler.py MIN_TINY_DOCS β€” KMeans fallback handles 3-9 docs,
# standard HDBSCAN BERTopic handles 10+.
MIN_TOPICS_DOCS = 3
# ---------------------------------------------------------------------------
# POST /upload
# ---------------------------------------------------------------------------
@router.post("/upload", response_model=AnalysisResponse)
async def upload_csv(
file: UploadFile = File(...),
run_ner: bool = True,
run_sentiment: bool = True,
run_topics: bool = True,
):
"""Upload a CSV file for analysis. Must have a 'text' or 'Text' column."""
if not file.filename.lower().endswith(".csv"):
raise HTTPException(status_code=400, detail="Only CSV files are supported")
content = await file.read()
rows = list(csv.DictReader(io.StringIO(content.decode("utf-8-sig", errors="replace"))))
if not rows:
raise HTTPException(status_code=400, detail="CSV file is empty")
text_col = _find_text_column(rows[0])
if text_col is None:
raise HTTPException(
status_code=400,
detail=f"No text column found. Got columns: {list(rows[0].keys())}",
)
try:
result = _run_analysis(rows, text_col, run_ner, run_sentiment, run_topics)
except Exception as exc:
logger.exception(f"Analysis pipeline failed for file '{file.filename}'")
raise HTTPException(status_code=500, detail=f"Analysis failed: {exc}")
services.set_last_analysis(result)
result = _save_and_attach_doc_ids(result, source_filename=file.filename)
return result
# ---------------------------------------------------------------------------
# POST /analyze (single text)
# ---------------------------------------------------------------------------
@router.post("/analyze", response_model=AnalysisResponse)
async def analyze_text(request: TextAnalysisRequest):
"""Analyze a single text string."""
rows = [{"ID": str(uuid.uuid4())[:8], "Text": request.text, "Source": "direct"}]
try:
result = _run_analysis(rows, "Text", run_ner=True, run_sentiment=True, run_topics=False)
except Exception as exc:
logger.exception("Analysis pipeline failed for single text")
raise HTTPException(status_code=500, detail=f"Analysis failed: {exc}")
services.set_last_analysis(result)
result = _save_and_attach_doc_ids(result, source_filename="single-text")
return result
# ---------------------------------------------------------------------------
# POST /analyze/batch
# ---------------------------------------------------------------------------
@router.post("/analyze/batch", response_model=AnalysisResponse)
async def analyze_batch(request: BatchAnalysisRequest):
"""Analyze a list of texts. Topics auto-enabled when >= 3 texts."""
rows = [
{"ID": str(uuid.uuid4())[:8], "Text": t, "Source": "batch"}
for t in request.texts
]
result = _run_analysis(
rows, "Text",
request.run_ner, request.run_sentiment, request.run_topics,
)
services.set_last_analysis(result)
result = _save_and_attach_doc_ids(result, source_filename="batch")
return result
# ---------------------------------------------------------------------------
# POST /network
# ---------------------------------------------------------------------------
@router.post("/network", response_model=NetworkResponse)
async def get_network():
last = services.get_last_analysis()
if last is None:
raise HTTPException(status_code=404, detail="No analysis has been run yet.")
if last.network is None:
raise HTTPException(status_code=404, detail="No network data available.")
return last.network
# ---------------------------------------------------------------------------
# POST /reload
# ---------------------------------------------------------------------------
@router.post("/reload")
async def reload():
"""Reload custom stopwords and labels from DB without restarting."""
services.reload_preprocessor()
return {
"status": "reloaded",
"custom_stopword_count": len(services.kb.get_stopwords()),
}
# ---------------------------------------------------------------------------
# GET /history β€” list past analysis sessions
# ---------------------------------------------------------------------------
@router.get("/history")
async def list_history(limit: int = 20):
"""
List the most recent analysis sessions stored in the DB.
Returns summary rows (no per-document detail).
"""
return services.kb.list_analyses(limit=limit)
# ---------------------------------------------------------------------------
# GET /history/{session_id} β€” retrieve a full past session
# ---------------------------------------------------------------------------
@router.get("/history/{session_id}")
async def get_history(session_id: int):
"""
Retrieve a full analysis session by ID, including all documents.
Use GET /history to find session IDs.
"""
session = services.kb.get_analysis(session_id)
if session is None:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found.")
return session
# ---------------------------------------------------------------------------
# DELETE /history/{session_id}
# ---------------------------------------------------------------------------
@router.delete("/history/{session_id}")
async def delete_history(session_id: int):
"""Delete a stored analysis session and all its documents."""
services.kb.delete_analysis(session_id)
return {"status": "deleted", "session_id": session_id}
# ---------------------------------------------------------------------------
# GET /db-stats β€” health check showing DB table sizes
# ---------------------------------------------------------------------------
@router.get("/db-stats")
async def db_stats():
"""
Show row counts for every table in knowledge.db plus file size.
Example response:
{
"knowledge_entries": 12,
"custom_labels": 3,
"stopwords": 87, ← should be 80+ after seeding
"analysis_sessions": 5,
"analysis_documents": 423,
"db_path": "/home/.../webapp/knowledge.db",
"db_size_kb": 128.4
}
This is the quickest way to confirm the DB is initialised and
that stopword seeding worked.
"""
return services.kb.db_stats()
# ---------------------------------------------------------------------------
# PATCH /documents/{doc_id} β€” update annotations for a single document
# ---------------------------------------------------------------------------
@router.patch("/documents/{doc_id}")
async def update_document(doc_id: int, body: DocumentUpdateRequest):
"""
Save edited entities and/or sentiment for a single stored document.
Called by the inline annotation editor in the frontend.
"""
entities = [
{
"word": e.word,
"entity_group": e.entity_group,
"score": e.score,
"start": e.start,
"end": e.end,
}
for e in body.entities
]
ok = services.kb.update_document_annotations(
doc_id=doc_id,
entities=entities,
sentiment_label=body.sentiment_label,
sentiment_score=body.sentiment_score,
)
if not ok:
raise HTTPException(status_code=404, detail=f"Document {doc_id} not found.")
return {"ok": True, "doc_id": doc_id}
# ---------------------------------------------------------------------------
# GET /global-analysis β€” run topic modeling + network on ALL stored documents
# ---------------------------------------------------------------------------
@router.get("/global-analysis")
async def global_analysis():
"""
Run topic modeling and build a co-occurrence network using every document
stored in the DB across all sessions. NER and sentiment are NOT re-run β€”
the stored results are reused so this is fast.
"""
docs = services.kb.get_all_documents()
if len(docs) < MIN_TOPICS_DOCS:
raise HTTPException(
status_code=400,
detail=(
f"Global analysis needs at least {MIN_TOPICS_DOCS} stored documents. "
f"Currently have {len(docs)}."
),
)
nlp_texts = [d["nlp_text"] for d in docs]
# Rebuild EntityResult objects from stored JSON for the network analyzer
entity_results: List[List[EntityResult]] = []
for d in docs:
ents = d["entities"] if isinstance(d["entities"], list) else json.loads(d["entities"])
entity_results.append([
EntityResult(
word=e.get("word", ""),
entity_group=e.get("entity_group", "MISC"),
score=float(e.get("score", 0.0)),
start=int(e.get("start") or 0),
end=int(e.get("end") or 0),
)
for e in ents
])
# Topic modeling across all stored documents
topic_summary: list = []
try:
_, topic_summary = services.topic.fit_transform(nlp_texts)
except Exception as exc:
topic_summary = [{"error": f"Topic modeling failed: {exc}"}]
# Network co-occurrence graph across all stored documents
nd = services.network.build_network(entity_results)
network = NetworkResponse(
nodes=[
NetworkNodeResponse(
id=n.id, label=n.label,
entity_type=n.entity_type, frequency=n.frequency,
)
for n in nd.nodes
],
edges=[
NetworkEdgeResponse(source=e.source, target=e.target, weight=e.weight)
for e in nd.edges
],
)
return {
"total_documents": len(docs),
"topic_summary": topic_summary,
"network": network,
}
# ---------------------------------------------------------------------------
# Core pipeline
# ---------------------------------------------------------------------------
def _run_analysis(
rows: List[dict],
text_col: str,
run_ner: bool,
run_sentiment: bool,
run_topics: bool,
) -> AnalysisResponse:
t0 = time.time()
preprocessor = services.preprocessor
kb = services.kb
raw_texts = [row.get(text_col, "") for row in rows]
ids = [row.get("ID", str(i)) for i, row in enumerate(rows)]
sources = [row.get("Source", "") for row in rows]
logger.info(f"[Pipeline] Starting analysis: {len(raw_texts)} rows, NER={run_ner}, Sentiment={run_sentiment}, Topics={run_topics}")
# Dual preprocessing β€” one pass, two outputs
nlp_texts: List[str] = []
tm_texts: List[str] = []
for raw in raw_texts:
nlp, tm = preprocessor.preprocess_dual(raw)
nlp_texts.append(nlp)
tm_texts.append(tm)
logger.info(f"[Pipeline] Preprocessing done in {(time.time()-t0)*1000:.0f}ms")
# NER
ner_results = []
if run_ner:
t1 = time.time()
ner_results = services.ner.recognize_batch(nlp_texts)
total_ents = sum(len(r) for r in ner_results)
logger.info(f"[Pipeline] NER done in {(time.time()-t1)*1000:.0f}ms β€” found {total_ents} entities total")
# Entity relabeling from admin custom labels
custom_labels = kb.get_labels(label_type="entity") if run_ner else {}
# Sentiment
sentiment_results = []
if run_sentiment:
t1 = time.time()
sentiment_results = services.sentiment.analyze_batch(nlp_texts)
pos = sum(1 for s in sentiment_results if s.label == "positive")
neg = sum(1 for s in sentiment_results if s.label == "negative")
neu = sum(1 for s in sentiment_results if s.label == "neutral")
logger.info(f"[Pipeline] Sentiment done in {(time.time()-t1)*1000:.0f}ms β€” pos={pos} neu={neu} neg={neg}")
# Topic modeling β€” now works from 3 documents via KMeans fallback
topic_results = []
topic_summary = []
if run_topics:
non_empty_tm = [t for t in tm_texts if t.strip()]
logger.info(f"[Pipeline] Topic modeling: {len(non_empty_tm)} non-empty TM texts (need >={MIN_TOPICS_DOCS})")
if len(tm_texts) >= MIN_TOPICS_DOCS:
try:
t1 = time.time()
topic_results, topic_summary = services.topic.fit_transform(tm_texts)
real_topics = [t for t in topic_summary if isinstance(t, dict) and t.get("topic_id", -1) >= 0]
logger.info(f"[Pipeline] Topics done in {(time.time()-t1)*1000:.0f}ms β€” {len(real_topics)} real topics, summary={topic_summary}")
except Exception as exc:
logger.error(f"[Pipeline] Topic modeling FAILED: {exc}", exc_info=True)
topic_summary = [{"error": f"Topic modeling failed: {exc}"}]
else:
logger.info(f"[Pipeline] Skipping topics β€” only {len(tm_texts)} docs (need {MIN_TOPICS_DOCS}+)")
topic_summary = [{
"info": (
f"Topic modeling needs at least {MIN_TOPICS_DOCS} documents. "
f"Got {len(tm_texts)}."
)
}]
# Assemble per-document results
sentiment_counts = {"positive": 0, "neutral": 0, "negative": 0}
documents: List[DocumentResponse] = []
for i in range(len(raw_texts)):
entities: List[EntityResponse] = []
if i < len(ner_results):
for e in ner_results[i]:
label = custom_labels.get(e.entity_group, e.entity_group)
entities.append(EntityResponse(
word=e.word, entity_group=label, score=e.score,
start=e.start, end=e.end,
))
sentiment = None
if i < len(sentiment_results):
sr = sentiment_results[i]
sentiment = SentimentResponse(label=sr.label, score=sr.score)
sentiment_counts[sr.label] = sentiment_counts.get(sr.label, 0) + 1
topic = None
if i < len(topic_results):
tr = topic_results[i]
topic = TopicResponse(
topic_id=tr.topic_id,
topic_label=tr.topic_label,
probability=tr.probability,
keywords=tr.keywords,
)
documents.append(DocumentResponse(
id=str(ids[i]),
text=raw_texts[i],
clean_text=nlp_texts[i],
source=sources[i],
entities=entities,
topic=topic,
sentiment=sentiment,
))
# Network / co-occurrence graph
network = None
entity_stats: dict = {}
if run_ner and ner_results:
nd = services.network.build_network(ner_results)
entity_stats = services.network.get_entity_stats(ner_results)
network = NetworkResponse(
nodes=[
NetworkNodeResponse(
id=n.id, label=n.label,
entity_type=n.entity_type, frequency=n.frequency,
)
for n in nd.nodes
],
edges=[
NetworkEdgeResponse(source=e.source, target=e.target, weight=e.weight)
for e in nd.edges
],
)
return AnalysisResponse(
documents=documents,
network=network,
topic_summary=topic_summary,
sentiment_summary=sentiment_counts,
entity_summary=entity_stats,
total_documents=len(documents),
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _find_text_column(first_row: dict):
for name in ("text", "Text", "clean_text", "cleaned_text",
"content", "Content", "Message", "body", "Body"):
if name in first_row:
return name
return None
def _save_and_attach_doc_ids(
result: AnalysisResponse,
source_filename: str = "",
) -> AnalysisResponse:
"""
Persist the analysis to the DB and return a new AnalysisResponse where
every DocumentResponse has its doc_id (DB row id) filled in.
"""
docs = []
for doc in result.documents:
docs.append({
"raw_text": doc.text,
"nlp_text": doc.clean_text,
"source": doc.source,
"sentiment_label": doc.sentiment.label if doc.sentiment else "",
"sentiment_score": doc.sentiment.score if doc.sentiment else 0.0,
"entities": [
{
"word": e.word,
"entity_group": e.entity_group,
"score": e.score,
"start": e.start,
"end": e.end,
}
for e in (doc.entities or [])
],
"topic_id": doc.topic.topic_id if doc.topic else -1,
"topic_label": doc.topic.topic_label if doc.topic else "",
"topic_keywords": doc.topic.keywords if doc.topic else [],
})
try:
_session_id, doc_ids = services.kb.save_analysis(
documents=docs,
sentiment_summary=result.sentiment_summary,
entity_summary=result.entity_summary,
topic_summary=result.topic_summary,
source_filename=source_filename,
)
# Attach the DB ids to the response documents
new_docs = [
doc.model_copy(update={"doc_id": did})
for doc, did in zip(result.documents, doc_ids)
]
return result.model_copy(update={"documents": new_docs})
except Exception as exc:
# Never let a DB write failure break the analysis response
print(f"[analysis] Warning: could not save analysis to DB: {exc}")
return result