DBMS / ui /server.py
vkhoa2110
Use real bge-m3 embedding snapshot on Space
31ccea5
from __future__ import annotations
import csv
import hashlib
import json
import math
import os
import re
import subprocess
import sys
import unicodedata
import urllib.parse
import urllib.error
import urllib.request
from collections import defaultdict
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parent
STATIC_ROOT = ROOT / "static"
SQL_SERVER = os.environ.get("HELPDESK_SQL_SERVER", r".\SQLEXPRESS")
SQL_DATABASE = os.environ.get("HELPDESK_SQL_DATABASE", "CustomerAIDemo2022")
SQLCMD = os.environ.get("HELPDESK_SQLCMD", "sqlcmd")
SQL_USER = os.environ.get("HELPDESK_SQL_USER")
SQL_PASSWORD = os.environ.get("HELPDESK_SQL_PASSWORD")
SQL_TRUST_CERTIFICATE = os.environ.get("HELPDESK_SQL_TRUST_CERTIFICATE", "").lower() in {
"1",
"true",
"yes",
}
DEMO_BACKEND = os.environ.get("HELPDESK_DEMO_BACKEND", "auto").lower()
CSV_DATA_PATH = Path(os.environ.get("HELPDESK_CSV_PATH") or ROOT.parent / "data" / "customer_feedback.csv")
REAL_FEEDBACK_EMBEDDINGS_PATH = Path(
os.environ.get("HELPDESK_REAL_FEEDBACK_EMBEDDINGS_PATH")
or ROOT.parent / "data" / "real_feedback_embeddings.json"
)
REAL_QUERY_EMBEDDINGS_PATH = Path(
os.environ.get("HELPDESK_REAL_QUERY_EMBEDDINGS_PATH")
or ROOT.parent / "data" / "real_query_embeddings.json"
)
EMBEDDING_MODE = os.environ.get("HELPDESK_EMBEDDING_MODE", "auto").lower()
OLLAMA_URL = os.environ.get("HELPDESK_OLLAMA_URL", "http://127.0.0.1:11434/api/embed")
OLLAMA_MODEL = os.environ.get("HELPDESK_OLLAMA_MODEL", "bge-m3")
_REAL_EMBEDDINGS_AVAILABLE: bool | None = None
_CSV_ROWS: list[dict[str, Any]] | None = None
_REAL_FEEDBACK_SNAPSHOT: dict[str, Any] | None = None
_REAL_QUERY_SNAPSHOT: dict[str, Any] | None = None
STOP_WORDS = {
"the",
"and",
"for",
"that",
"this",
"with",
"from",
"have",
"has",
"was",
"were",
"are",
"not",
"but",
"you",
"your",
"their",
"there",
"khach",
"hang",
"giao",
"dich",
}
QUERY_PROFILES = {
"debited_failed_transaction",
"vip_serious_payment_issue",
"suspicious_money_movement",
}
def sql_literal(value: str | None) -> str:
if value is None or value == "":
return "NULL"
return "N'" + value.replace("'", "''") + "'"
def sql_int(value: Any, default: int, minimum: int, maximum: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return max(minimum, min(parsed, maximum))
def choose_query_profile(question: str) -> str:
normalized = question.lower()
if any(token in normalized for token in ("gian lận", "gian lan", "đáng ngờ", "dang ngo", "fraud", "lạ", "la")):
return "suspicious_money_movement"
if "vip" in normalized or "ưu tiên" in normalized or "uu tien" in normalized:
return "vip_serious_payment_issue"
return "debited_failed_transaction"
def csv_backend_enabled() -> bool:
if DEMO_BACKEND == "csv":
return True
if DEMO_BACKEND == "sql":
return False
return bool(os.environ.get("SPACE_ID")) and not SQL_USER
def parse_created_at(value: str | None) -> datetime:
if not value:
return datetime.min
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None)
except ValueError:
return datetime.min
def load_csv_rows() -> list[dict[str, Any]]:
global _CSV_ROWS
if _CSV_ROWS is not None:
return _CSV_ROWS
if not CSV_DATA_PATH.exists():
raise RuntimeError(f"CSV demo data not found: {CSV_DATA_PATH}")
with CSV_DATA_PATH.open("r", encoding="utf-8-sig", newline="") as handle:
rows = []
for index, row in enumerate(csv.DictReader(handle), start=1):
item: dict[str, Any] = dict(row)
item["FeedbackId"] = index
item["_created_at"] = parse_created_at(item.get("CreatedAt"))
rows.append(item)
_CSV_ROWS = rows
return rows
def normalize_search_text(text: str | None) -> str:
normalized = (text or "").lower().replace("đ", "d")
decomposed = unicodedata.normalize("NFKD", normalized)
return "".join(char for char in decomposed if not unicodedata.combining(char))
def choose_query_profile(question: str) -> str:
normalized = normalize_search_text(question)
if any(token in normalized for token in ("gian lan", "dang ngo", "fraud", "la", "bat thuong")):
return "suspicious_money_movement"
if "vip" in normalized or "uu tien" in normalized:
return "vip_serious_payment_issue"
return "debited_failed_transaction"
def normalize_search_text(text: str | None) -> str:
normalized = (text or "").lower().replace("\u0111", "d")
decomposed = unicodedata.normalize("NFKD", normalized)
return "".join(char for char in decomposed if not unicodedata.combining(char))
def tokenize(text: str | None) -> set[str]:
tokens = re.findall(r"[a-z0-9]+", normalize_search_text(text))
return {token for token in tokens if len(token) > 2 and token not in STOP_WORDS}
def load_real_feedback_snapshot() -> dict[str, Any] | None:
global _REAL_FEEDBACK_SNAPSHOT
if _REAL_FEEDBACK_SNAPSHOT is not None:
return _REAL_FEEDBACK_SNAPSHOT
if not REAL_FEEDBACK_EMBEDDINGS_PATH.exists():
return None
raw = json.loads(REAL_FEEDBACK_EMBEDDINGS_PATH.read_text(encoding="utf-8"))
items: dict[int, dict[str, Any]] = {}
for item in raw.get("items", []):
values = [float(value) for value in item.get("values") or []]
norm = math.sqrt(sum(value * value for value in values))
items[int(item["FeedbackId"])] = {
"values": values,
"norm": norm,
"dimensionCount": int(item.get("dimensionCount") or len(values)),
}
_REAL_FEEDBACK_SNAPSHOT = {
"modelName": raw.get("modelName") or OLLAMA_MODEL,
"dimensionCount": int(raw.get("dimensionCount") or 0),
"items": items,
}
return _REAL_FEEDBACK_SNAPSHOT
def load_real_query_snapshot() -> dict[str, Any] | None:
global _REAL_QUERY_SNAPSHOT
if _REAL_QUERY_SNAPSHOT is not None:
return _REAL_QUERY_SNAPSHOT
if not REAL_QUERY_EMBEDDINGS_PATH.exists():
return None
raw = json.loads(REAL_QUERY_EMBEDDINGS_PATH.read_text(encoding="utf-8"))
by_name: dict[str, dict[str, Any]] = {}
by_text: dict[str, dict[str, Any]] = {}
for item in raw.get("items", []):
values = [float(value) for value in item.get("values") or []]
normalized = {
"name": item.get("name"),
"text": item.get("text"),
"values": values,
"norm": math.sqrt(sum(value * value for value in values)),
}
by_name[str(item.get("name"))] = normalized
by_text[normalize_search_text(str(item.get("text") or ""))] = normalized
_REAL_QUERY_SNAPSHOT = {
"modelName": raw.get("modelName") or OLLAMA_MODEL,
"byName": by_name,
"byText": by_text,
}
return _REAL_QUERY_SNAPSHOT
def real_snapshot_available() -> bool:
feedback = load_real_feedback_snapshot()
queries = load_real_query_snapshot()
return bool(feedback and feedback.get("items") and queries and queries.get("byName"))
def real_query_for_question(question: str) -> dict[str, Any] | None:
queries = load_real_query_snapshot()
if not queries:
return None
normalized = normalize_search_text(question)
if normalized in queries["byText"]:
return queries["byText"][normalized]
query_name = choose_query_profile(question)
if query_name == "debited_failed_transaction":
return queries["byName"].get("default_debited")
return queries["byName"].get(query_name) or queries["byName"].get("default_debited")
def cosine(values_a: list[float], norm_a: float, values_b: list[float], norm_b: float) -> float:
if norm_a == 0 or norm_b == 0:
return 0.0
return sum(a * b for a, b in zip(values_a, values_b)) / (norm_a * norm_b)
def text_similarity(query_tokens: set[str], row: dict[str, Any]) -> float:
row_text = " ".join(
str(row.get(key) or "")
for key in ("FeedbackText", "SourceIssueGroup", "Product", "RiskLevel", "CustomerSegment")
)
row_tokens = tokenize(row_text)
if not query_tokens or not row_tokens:
return 0.0
overlap = len(query_tokens & row_tokens)
return overlap / math.sqrt(len(query_tokens) * len(row_tokens))
def csv_row_payload(row: dict[str, Any], similarity: float | None = None) -> dict[str, Any]:
payload = {
"FeedbackId": row.get("FeedbackId"),
"Product": row.get("Product"),
"CustomerSegment": row.get("CustomerSegment"),
"RiskLevel": row.get("RiskLevel"),
"Channel": row.get("Channel"),
"Region": row.get("Region"),
"CreatedAt": row.get("CreatedAt"),
"FeedbackText": row.get("FeedbackText"),
}
if similarity is not None:
payload["similarity"] = round(similarity, 4)
return payload
def csv_filtered_rows(segment: str | None, risk: str | None, days_back: int) -> list[dict[str, Any]]:
cutoff = datetime.utcnow() - timedelta(days=days_back)
rows = []
for row in load_csv_rows():
if segment and row.get("CustomerSegment") != segment:
continue
if risk and row.get("RiskLevel") != risk:
continue
if row.get("_created_at", datetime.min) < cutoff:
continue
rows.append(row)
return rows
def score_rows_with_real_snapshot(question: str, rows: list[dict[str, Any]]) -> list[tuple[float, dict[str, Any]]]:
snapshot = load_real_feedback_snapshot()
query = real_query_for_question(question)
if not snapshot or not query:
return []
vectors = snapshot["items"]
scored = []
for row in rows:
vector = vectors.get(int(row.get("FeedbackId") or 0))
if not vector:
continue
score = cosine(query["values"], query["norm"], vector["values"], vector["norm"])
scored.append((score, row))
scored.sort(key=lambda item: (item[0], item[1].get("_created_at", datetime.min)), reverse=True)
return scored
def real_snapshot_search_payload(payload: dict[str, Any]) -> dict[str, Any]:
top = sql_int(payload.get("top"), 10, 1, 50)
question = str(payload.get("question") or "")
rows = csv_filtered_rows(
segment=payload.get("segment") or None,
risk=payload.get("risk") or None,
days_back=sql_int(payload.get("daysBack"), 365, 1, 3650),
)
scored = score_rows_with_real_snapshot(question, rows)
return {
"mode": "semantic",
"queryProfile": f"real_ollama:{load_real_feedback_snapshot()['modelName']}",
"rows": [csv_row_payload(row, score) for score, row in scored[:top]],
}
def real_snapshot_similar_payload(feedback_id: int, top: int) -> dict[str, Any]:
snapshot = load_real_feedback_snapshot()
if not snapshot:
return {"feedbackId": feedback_id, "queryProfile": "real_ollama:missing", "rows": []}
vectors = snapshot["items"]
seed = vectors.get(feedback_id)
if not seed:
return {"feedbackId": feedback_id, "queryProfile": f"real_ollama:{snapshot['modelName']}", "rows": []}
rows_by_id = {int(row["FeedbackId"]): row for row in load_csv_rows()}
scored = []
for other_id, vector in vectors.items():
if other_id == feedback_id or other_id not in rows_by_id:
continue
score = cosine(seed["values"], seed["norm"], vector["values"], vector["norm"])
row = rows_by_id[other_id]
scored.append((score, row))
scored.sort(key=lambda item: (item[0], item[1].get("_created_at", datetime.min)), reverse=True)
return {
"feedbackId": feedback_id,
"queryProfile": f"real_ollama:{snapshot['modelName']}",
"rows": [csv_row_payload(row, score) for score, row in scored[:top]],
}
def real_snapshot_embedding_payload(feedback_id: int) -> tuple[dict[str, Any], int]:
snapshot = load_real_feedback_snapshot()
row = next((item for item in load_csv_rows() if int(item.get("FeedbackId") or 0) == feedback_id), None)
vector = snapshot["items"].get(feedback_id) if snapshot else None
if row is None or vector is None:
return {"error": f"Feedback #{feedback_id} not found"}, 404
return {
"feedbackId": feedback_id,
"source": f"real_ollama:{snapshot['modelName']}",
"modelName": snapshot["modelName"],
"dimensionCount": vector["dimensionCount"],
"norm": round(vector["norm"], 6),
"feedbackText": row.get("FeedbackText"),
"values": vector["values"],
}, 200
def real_snapshot_triage_payload(question: str) -> dict[str, Any]:
snapshot = load_real_feedback_snapshot()
scored = score_rows_with_real_snapshot(question, load_csv_rows())
grouped: dict[tuple[str, str], list[float]] = defaultdict(list)
for score, row in scored[:40]:
grouped[(str(row.get("Product") or "Unknown"), str(row.get("RiskLevel") or "Unknown"))].append(score)
rows = []
for (product, risk), scores in grouped.items():
rows.append(
{
"Product": product,
"RiskLevel": risk,
"hit_count": len(scores),
"best_similarity": round(max(scores), 4),
"avg_similarity": round(sum(scores) / len(scores), 4),
}
)
rows.sort(key=lambda row: (row["best_similarity"], row["hit_count"]), reverse=True)
model_name = snapshot["modelName"] if snapshot else OLLAMA_MODEL
return {"queryProfile": f"real_ollama:{model_name}", "rows": rows}
def csv_overview_payload() -> dict[str, Any]:
rows = load_csv_rows()
snapshot = load_real_feedback_snapshot()
real_count = len(snapshot["items"]) if snapshot else 0
model_name = snapshot["modelName"] if snapshot else None
dimension_count = snapshot["dimensionCount"] if snapshot else 64
issue_counts: dict[str, int] = defaultdict(int)
for row in rows:
issue_counts[str(row.get("SourceIssueGroup") or "Unknown")] += 1
return {
"overview": {
"product_version": "csv-space-demo",
"edition": "Hugging Face Space",
"database_name": CSV_DATA_PATH.name,
"feedback_count": len(rows),
"embedding_rows": (real_count or len(rows)) * dimension_count,
"embedded_feedback_count": real_count or len(rows),
"critical_count": sum(1 for row in rows if row.get("RiskLevel") == "Critical"),
"vip_count": sum(1 for row in rows if row.get("CustomerSegment") == "VIP"),
},
"issues": [
{"SourceIssueGroup": name, "count": count}
for name, count in sorted(issue_counts.items(), key=lambda item: item[1], reverse=True)
],
"realEmbedding": {
"embedded_feedback_count": real_count,
"model_name": model_name,
"dimension_count": dimension_count,
},
"embeddingMode": "real" if real_count else "csv",
"ollamaModel": model_name or "csv-pseudo",
}
def csv_search_payload(payload: dict[str, Any]) -> dict[str, Any]:
top = sql_int(payload.get("top"), 10, 1, 50)
mode = str(payload.get("mode") or "semantic")
rows = csv_filtered_rows(
segment=payload.get("segment") or None,
risk=payload.get("risk") or None,
days_back=sql_int(payload.get("daysBack"), 365, 1, 3650),
)
if mode == "keyword":
keyword = normalize_search_text(str(payload.get("keyword") or payload.get("question") or ""))
matches = [row for row in rows if keyword in normalize_search_text(str(row.get("FeedbackText") or ""))]
matches.sort(key=lambda row: row.get("_created_at", datetime.min), reverse=True)
return {"mode": "keyword", "rows": [csv_row_payload(row) for row in matches[:top]]}
if real_snapshot_available():
return real_snapshot_search_payload(payload)
question = str(payload.get("question") or "")
query_tokens = tokenize(question)
scored = [(text_similarity(query_tokens, row), row) for row in rows]
scored.sort(key=lambda item: (item[0], item[1].get("_created_at", datetime.min)), reverse=True)
return {
"mode": "semantic",
"queryProfile": "csv-pseudo",
"rows": [csv_row_payload(row, score) for score, row in scored[:top]],
}
def csv_similar_payload(feedback_id: int, top: int) -> dict[str, Any]:
if real_snapshot_available():
return real_snapshot_similar_payload(feedback_id, top)
rows = load_csv_rows()
seed = next((row for row in rows if row.get("FeedbackId") == feedback_id), None)
if seed is None:
return {"feedbackId": feedback_id, "queryProfile": "csv-pseudo", "rows": []}
seed_text = " ".join(str(seed.get(key) or "") for key in ("FeedbackText", "SourceIssueGroup", "Product"))
query_tokens = tokenize(seed_text)
scored = [
(text_similarity(query_tokens, row), row)
for row in rows
if row.get("FeedbackId") != feedback_id
]
scored.sort(key=lambda item: (item[0], item[1].get("_created_at", datetime.min)), reverse=True)
return {
"feedbackId": feedback_id,
"queryProfile": "csv-pseudo",
"rows": [csv_row_payload(row, score) for score, row in scored[:top]],
}
def pseudo_vector(text: str, dimensions: int = 64) -> list[float]:
values = [0.0] * dimensions
for token in tokenize(text):
digest = hashlib.blake2b(token.encode("utf-8"), digest_size=8).digest()
index = int.from_bytes(digest[:4], "big") % dimensions
sign = 1.0 if digest[4] % 2 == 0 else -1.0
values[index] += sign * (1.0 + (len(token) % 5)) / 5.0
norm = math.sqrt(sum(value * value for value in values))
if norm == 0:
return values
return [round(value / norm, 8) for value in values]
def csv_embedding_payload(feedback_id: int) -> tuple[dict[str, Any], int]:
if real_snapshot_available():
return real_snapshot_embedding_payload(feedback_id)
row = next((item for item in load_csv_rows() if item.get("FeedbackId") == feedback_id), None)
if row is None:
return {"error": f"Feedback #{feedback_id} not found"}, 404
text = str(row.get("FeedbackText") or "")
values = pseudo_vector(text)
norm = math.sqrt(sum(value * value for value in values))
return {
"feedbackId": feedback_id,
"source": "csv-pseudo",
"modelName": "hashed-token-vector",
"dimensionCount": len(values),
"norm": round(norm, 6),
"feedbackText": text,
"values": values,
}, 200
def csv_triage_payload(question: str) -> dict[str, Any]:
if real_snapshot_available():
return real_snapshot_triage_payload(question)
query_tokens = tokenize(question)
scored = [(text_similarity(query_tokens, row), row) for row in load_csv_rows()]
scored.sort(key=lambda item: item[0], reverse=True)
grouped: dict[tuple[str, str], list[float]] = defaultdict(list)
for score, row in scored[:40]:
grouped[(str(row.get("Product") or "Unknown"), str(row.get("RiskLevel") or "Unknown"))].append(score)
rows = []
for (product, risk), scores in grouped.items():
rows.append(
{
"Product": product,
"RiskLevel": risk,
"hit_count": len(scores),
"best_similarity": round(max(scores), 4),
"avg_similarity": round(sum(scores) / len(scores), 4),
}
)
rows.sort(key=lambda row: (row["best_similarity"], row["hit_count"]), reverse=True)
return {"queryProfile": "csv-pseudo", "rows": rows}
def ollama_embed(text: str) -> list[float]:
payload = json.dumps({"model": OLLAMA_MODEL, "input": text}).encode("utf-8")
request = urllib.request.Request(
OLLAMA_URL,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=120) as response:
body = json.loads(response.read().decode("utf-8"))
except urllib.error.URLError as exc:
raise RuntimeError(
f"Cannot connect to Ollama at {OLLAMA_URL}. Start Ollama and run: ollama pull {OLLAMA_MODEL}"
) from exc
if "embeddings" in body:
embeddings = body["embeddings"]
if embeddings and isinstance(embeddings[0], list):
return [float(value) for value in embeddings[0]]
if "embedding" in body:
return [float(value) for value in body["embedding"]]
raise RuntimeError(f"Unexpected Ollama embedding response: {body}")
def run_sql(query: str) -> Any:
command = [
SQLCMD,
"-S",
SQL_SERVER,
"-d",
SQL_DATABASE,
]
if SQL_USER:
command.extend(["-U", SQL_USER, "-P", SQL_PASSWORD or ""])
else:
command.append("-E")
if SQL_TRUST_CERTIFICATE:
command.append("-C")
command.extend(
[
"-b",
"-f",
"i:65001,o:65001",
"-w",
"65535",
"-y",
"0",
"-Q",
"SET NOCOUNT ON;\n" + query,
]
)
completed = subprocess.run(
command,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
check=False,
)
if completed.returncode != 0:
raise RuntimeError((completed.stderr or completed.stdout).strip())
output = completed.stdout.strip()
if not output:
return []
json_start = min((idx for idx in (output.find("["), output.find("{")) if idx != -1), default=-1)
if json_start > 0:
output = output[json_start:]
# sqlcmd can insert physical line breaks into long FOR JSON output. Those
# breaks are not part of SQL Server's JSON payload and can land inside JSON
# strings, so remove them before parsing.
output = output.replace("\r", "").replace("\n", "")
return json.loads(output)
def real_embedding_status_query() -> str:
return """
IF OBJECT_ID(N'dbo.RealFeedbackEmbedding', N'U') IS NULL
BEGIN
SELECT
CAST(0 AS INT) AS embedded_feedback_count,
CAST(NULL AS NVARCHAR(200)) AS model_name,
CAST(NULL AS INT) AS dimension_count
FOR JSON PATH, WITHOUT_ARRAY_WRAPPER, INCLUDE_NULL_VALUES;
END
ELSE
BEGIN
SELECT
COUNT(DISTINCT e.FeedbackId) AS embedded_feedback_count,
MAX(m.ModelName) AS model_name,
MAX(m.DimensionCount) AS dimension_count
FROM dbo.RealFeedbackEmbedding AS e
OUTER APPLY
(
SELECT TOP (1)
ModelName,
DimensionCount
FROM dbo.RealEmbeddingMetadata
ORDER BY CreatedAt DESC
) AS m
FOR JSON PATH, WITHOUT_ARRAY_WRAPPER, INCLUDE_NULL_VALUES;
END
"""
def real_embeddings_available() -> bool:
global _REAL_EMBEDDINGS_AVAILABLE
if EMBEDDING_MODE == "fallback":
return False
if _REAL_EMBEDDINGS_AVAILABLE is not None:
return _REAL_EMBEDDINGS_AVAILABLE
try:
status = run_sql(real_embedding_status_query())
_REAL_EMBEDDINGS_AVAILABLE = int(status.get("embedded_feedback_count") or 0) > 0
except Exception:
_REAL_EMBEDDINGS_AVAILABLE = False
if EMBEDDING_MODE == "real" and not _REAL_EMBEDDINGS_AVAILABLE:
raise RuntimeError(
"Real embeddings are not built yet. Run scripts/build_real_embeddings_ollama.ps1 first."
)
return _REAL_EMBEDDINGS_AVAILABLE
def query_embedding_json(question: str) -> str:
return json.dumps(ollama_embed(question), separators=(",", ":"))
def keyword_query(keyword: str, top: int) -> str:
return f"""
DECLARE @Keyword NVARCHAR(100) = {sql_literal(keyword)};
SELECT TOP ({top})
FeedbackId,
Product,
CustomerSegment,
RiskLevel,
Channel,
Region,
CreatedAt,
FeedbackText
FROM dbo.CustomerFeedback
WHERE FeedbackText LIKE N'%' + @Keyword + N'%'
ORDER BY CreatedAt DESC
FOR JSON PATH, INCLUDE_NULL_VALUES;
"""
def semantic_query(query_name: str, segment: str | None, risk: str | None, days_back: int, top: int) -> str:
if query_name not in QUERY_PROFILES:
query_name = "debited_failed_transaction"
return f"""
DECLARE @QueryName SYSNAME = {sql_literal(query_name)};
DECLARE @CustomerSegment NVARCHAR(50) = {sql_literal(segment)};
DECLARE @RiskLevel NVARCHAR(20) = {sql_literal(risk)};
DECLARE @DaysBack INT = {days_back};
;WITH QueryNorm AS
(
SELECT SQRT(SUM(Value * Value)) AS Norm
FROM dbo.QueryEmbedding
WHERE QueryName = @QueryName
),
FeedbackNorm AS
(
SELECT FeedbackId, SQRT(SUM(Value * Value)) AS Norm
FROM dbo.FeedbackEmbedding
GROUP BY FeedbackId
),
DotProduct AS
(
SELECT fe.FeedbackId, SUM(fe.Value * qe.Value) AS DotValue
FROM dbo.FeedbackEmbedding AS fe
INNER JOIN dbo.QueryEmbedding AS qe
ON qe.DimensionName = fe.DimensionName
AND qe.QueryName = @QueryName
GROUP BY fe.FeedbackId
),
Scored AS
(
SELECT
f.FeedbackId,
f.Product,
f.CustomerSegment,
f.RiskLevel,
f.Channel,
f.Region,
f.CreatedAt,
f.FeedbackText,
CAST(dp.DotValue / NULLIF(fn.Norm * qn.Norm, 0) AS DECIMAL(10, 4)) AS similarity
FROM DotProduct AS dp
INNER JOIN FeedbackNorm AS fn
ON fn.FeedbackId = dp.FeedbackId
CROSS JOIN QueryNorm AS qn
INNER JOIN dbo.CustomerFeedback AS f
ON f.FeedbackId = dp.FeedbackId
WHERE (@CustomerSegment IS NULL OR f.CustomerSegment = @CustomerSegment)
AND (@RiskLevel IS NULL OR f.RiskLevel = @RiskLevel)
AND (@DaysBack IS NULL OR f.CreatedAt >= DATEADD(DAY, -@DaysBack, SYSUTCDATETIME()))
)
SELECT TOP ({top})
FeedbackId,
Product,
CustomerSegment,
RiskLevel,
Channel,
Region,
CreatedAt,
FeedbackText,
similarity
FROM Scored
ORDER BY similarity DESC, CreatedAt DESC
FOR JSON PATH, INCLUDE_NULL_VALUES;
"""
def real_semantic_query(
embedding_json: str,
segment: str | None,
risk: str | None,
days_back: int,
top: int,
) -> str:
return f"""
DECLARE @QueryEmbedding NVARCHAR(MAX) = {sql_literal(embedding_json)};
DECLARE @CustomerSegment NVARCHAR(50) = {sql_literal(segment)};
DECLARE @RiskLevel NVARCHAR(20) = {sql_literal(risk)};
DECLARE @DaysBack INT = {days_back};
;WITH QueryVector AS
(
SELECT
CAST([key] AS INT) AS DimensionIndex,
CAST([value] AS FLOAT) AS Value
FROM OPENJSON(@QueryEmbedding)
),
QueryNorm AS
(
SELECT SQRT(SUM(Value * Value)) AS Norm
FROM QueryVector
),
FeedbackNorm AS
(
SELECT
FeedbackId,
SQRT(SUM(Value * Value)) AS Norm
FROM dbo.RealFeedbackEmbedding
GROUP BY FeedbackId
),
DotProduct AS
(
SELECT
fe.FeedbackId,
SUM(fe.Value * q.Value) AS DotValue
FROM dbo.RealFeedbackEmbedding AS fe
INNER JOIN QueryVector AS q
ON q.DimensionIndex = fe.DimensionIndex
GROUP BY fe.FeedbackId
),
Scored AS
(
SELECT
f.FeedbackId,
f.Product,
f.CustomerSegment,
f.RiskLevel,
f.Channel,
f.Region,
f.CreatedAt,
f.FeedbackText,
CAST(dp.DotValue / NULLIF(fn.Norm * qn.Norm, 0) AS DECIMAL(10, 4)) AS similarity
FROM DotProduct AS dp
INNER JOIN FeedbackNorm AS fn
ON fn.FeedbackId = dp.FeedbackId
CROSS JOIN QueryNorm AS qn
INNER JOIN dbo.CustomerFeedback AS f
ON f.FeedbackId = dp.FeedbackId
WHERE (@CustomerSegment IS NULL OR f.CustomerSegment = @CustomerSegment)
AND (@RiskLevel IS NULL OR f.RiskLevel = @RiskLevel)
AND (@DaysBack IS NULL OR f.CreatedAt >= DATEADD(DAY, -@DaysBack, SYSUTCDATETIME()))
)
SELECT TOP ({top})
FeedbackId,
Product,
CustomerSegment,
RiskLevel,
Channel,
Region,
CreatedAt,
FeedbackText,
similarity
FROM Scored
ORDER BY similarity DESC, CreatedAt DESC
FOR JSON PATH, INCLUDE_NULL_VALUES;
"""
def similar_query(feedback_id: int, top: int) -> str:
return f"""
DECLARE @FeedbackId INT = {feedback_id};
;WITH SeedNorm AS
(
SELECT SQRT(SUM(Value * Value)) AS Norm
FROM dbo.FeedbackEmbedding
WHERE FeedbackId = @FeedbackId
),
FeedbackNorm AS
(
SELECT FeedbackId, SQRT(SUM(Value * Value)) AS Norm
FROM dbo.FeedbackEmbedding
GROUP BY FeedbackId
),
DotProduct AS
(
SELECT fe.FeedbackId, SUM(fe.Value * seed.Value) AS DotValue
FROM dbo.FeedbackEmbedding AS fe
INNER JOIN dbo.FeedbackEmbedding AS seed
ON seed.DimensionName = fe.DimensionName
AND seed.FeedbackId = @FeedbackId
WHERE fe.FeedbackId <> @FeedbackId
GROUP BY fe.FeedbackId
)
SELECT TOP ({top})
f.FeedbackId,
f.Product,
f.CustomerSegment,
f.RiskLevel,
f.Channel,
f.Region,
f.CreatedAt,
f.FeedbackText,
CAST(dp.DotValue / NULLIF(fn.Norm * sn.Norm, 0) AS DECIMAL(10, 4)) AS similarity
FROM DotProduct AS dp
INNER JOIN FeedbackNorm AS fn
ON fn.FeedbackId = dp.FeedbackId
CROSS JOIN SeedNorm AS sn
INNER JOIN dbo.CustomerFeedback AS f
ON f.FeedbackId = dp.FeedbackId
ORDER BY similarity DESC, f.CreatedAt DESC
FOR JSON PATH, INCLUDE_NULL_VALUES;
"""
def real_similar_query(feedback_id: int, top: int) -> str:
return f"""
DECLARE @FeedbackId INT = {feedback_id};
;WITH SeedNorm AS
(
SELECT SQRT(SUM(Value * Value)) AS Norm
FROM dbo.RealFeedbackEmbedding
WHERE FeedbackId = @FeedbackId
),
FeedbackNorm AS
(
SELECT
FeedbackId,
SQRT(SUM(Value * Value)) AS Norm
FROM dbo.RealFeedbackEmbedding
GROUP BY FeedbackId
),
DotProduct AS
(
SELECT
fe.FeedbackId,
SUM(fe.Value * seed.Value) AS DotValue
FROM dbo.RealFeedbackEmbedding AS fe
INNER JOIN dbo.RealFeedbackEmbedding AS seed
ON seed.DimensionIndex = fe.DimensionIndex
AND seed.FeedbackId = @FeedbackId
WHERE fe.FeedbackId <> @FeedbackId
GROUP BY fe.FeedbackId
)
SELECT TOP ({top})
f.FeedbackId,
f.Product,
f.CustomerSegment,
f.RiskLevel,
f.Channel,
f.Region,
f.CreatedAt,
f.FeedbackText,
CAST(dp.DotValue / NULLIF(fn.Norm * sn.Norm, 0) AS DECIMAL(10, 4)) AS similarity
FROM DotProduct AS dp
INNER JOIN FeedbackNorm AS fn
ON fn.FeedbackId = dp.FeedbackId
CROSS JOIN SeedNorm AS sn
INNER JOIN dbo.CustomerFeedback AS f
ON f.FeedbackId = dp.FeedbackId
ORDER BY similarity DESC, f.CreatedAt DESC
FOR JSON PATH, INCLUDE_NULL_VALUES;
"""
def triage_query(query_name: str) -> str:
if query_name not in QUERY_PROFILES:
query_name = "debited_failed_transaction"
return f"""
DECLARE @QueryName SYSNAME = {sql_literal(query_name)};
;WITH TopHits AS
(
SELECT TOP (40)
f.FeedbackId,
f.Product,
f.RiskLevel,
CAST(dp.DotValue / NULLIF(fn.Norm * qn.Norm, 0) AS DECIMAL(10, 4)) AS similarity
FROM
(
SELECT fe.FeedbackId, SUM(fe.Value * qe.Value) AS DotValue
FROM dbo.FeedbackEmbedding AS fe
INNER JOIN dbo.QueryEmbedding AS qe
ON qe.DimensionName = fe.DimensionName
AND qe.QueryName = @QueryName
GROUP BY fe.FeedbackId
) AS dp
INNER JOIN
(
SELECT FeedbackId, SQRT(SUM(Value * Value)) AS Norm
FROM dbo.FeedbackEmbedding
GROUP BY FeedbackId
) AS fn
ON fn.FeedbackId = dp.FeedbackId
CROSS JOIN
(
SELECT SQRT(SUM(Value * Value)) AS Norm
FROM dbo.QueryEmbedding
WHERE QueryName = @QueryName
) AS qn
INNER JOIN dbo.CustomerFeedback AS f
ON f.FeedbackId = dp.FeedbackId
ORDER BY similarity DESC
)
SELECT
Product,
RiskLevel,
COUNT(*) AS hit_count,
MAX(similarity) AS best_similarity,
CAST(AVG(CAST(similarity AS FLOAT)) AS DECIMAL(10, 4)) AS avg_similarity
FROM TopHits
GROUP BY Product, RiskLevel
ORDER BY best_similarity DESC, hit_count DESC
FOR JSON PATH, INCLUDE_NULL_VALUES;
"""
def real_triage_query(embedding_json: str) -> str:
return f"""
DECLARE @QueryEmbedding NVARCHAR(MAX) = {sql_literal(embedding_json)};
;WITH QueryVector AS
(
SELECT
CAST([key] AS INT) AS DimensionIndex,
CAST([value] AS FLOAT) AS Value
FROM OPENJSON(@QueryEmbedding)
),
QueryNorm AS
(
SELECT SQRT(SUM(Value * Value)) AS Norm
FROM QueryVector
),
TopHits AS
(
SELECT TOP (40)
f.FeedbackId,
f.Product,
f.RiskLevel,
CAST(dp.DotValue / NULLIF(fn.Norm * qn.Norm, 0) AS DECIMAL(10, 4)) AS similarity
FROM
(
SELECT
fe.FeedbackId,
SUM(fe.Value * q.Value) AS DotValue
FROM dbo.RealFeedbackEmbedding AS fe
INNER JOIN QueryVector AS q
ON q.DimensionIndex = fe.DimensionIndex
GROUP BY fe.FeedbackId
) AS dp
INNER JOIN
(
SELECT FeedbackId, SQRT(SUM(Value * Value)) AS Norm
FROM dbo.RealFeedbackEmbedding
GROUP BY FeedbackId
) AS fn
ON fn.FeedbackId = dp.FeedbackId
CROSS JOIN QueryNorm AS qn
INNER JOIN dbo.CustomerFeedback AS f
ON f.FeedbackId = dp.FeedbackId
ORDER BY similarity DESC
)
SELECT
Product,
RiskLevel,
COUNT(*) AS hit_count,
MAX(similarity) AS best_similarity,
CAST(AVG(CAST(similarity AS FLOAT)) AS DECIMAL(10, 4)) AS avg_similarity
FROM TopHits
GROUP BY Product, RiskLevel
ORDER BY best_similarity DESC, hit_count DESC
FOR JSON PATH, INCLUDE_NULL_VALUES;
"""
def real_feedback_embedding_query(feedback_id: int) -> str:
return f"""
DECLARE @FeedbackId INT = {feedback_id};
DECLARE @FeedbackText NVARCHAR(MAX) =
(SELECT FeedbackText FROM dbo.CustomerFeedback WHERE FeedbackId = @FeedbackId);
DECLARE @ModelName NVARCHAR(200) =
(SELECT TOP (1) ModelName FROM dbo.RealEmbeddingMetadata ORDER BY CreatedAt DESC);
DECLARE @DimensionCount INT =
(SELECT COUNT(*) FROM dbo.RealFeedbackEmbedding WHERE FeedbackId = @FeedbackId);
DECLARE @Norm FLOAT =
(SELECT SQRT(SUM(Value * Value)) FROM dbo.RealFeedbackEmbedding WHERE FeedbackId = @FeedbackId);
SELECT
@FeedbackId AS feedback_id,
@FeedbackText AS feedback_text,
@ModelName AS model_name,
@DimensionCount AS dimension_count,
CAST(@Norm AS DECIMAL(18, 6)) AS norm,
(
SELECT CAST(Value AS DECIMAL(18, 8)) AS v
FROM dbo.RealFeedbackEmbedding
WHERE FeedbackId = @FeedbackId
ORDER BY DimensionIndex
FOR JSON PATH
) AS vector_json
FOR JSON PATH, WITHOUT_ARRAY_WRAPPER, INCLUDE_NULL_VALUES;
"""
def fallback_feedback_embedding_query(feedback_id: int) -> str:
return f"""
DECLARE @FeedbackId INT = {feedback_id};
DECLARE @FeedbackText NVARCHAR(MAX) =
(SELECT FeedbackText FROM dbo.CustomerFeedback WHERE FeedbackId = @FeedbackId);
DECLARE @DimensionCount INT =
(SELECT COUNT(*) FROM dbo.FeedbackEmbedding WHERE FeedbackId = @FeedbackId);
DECLARE @Norm FLOAT =
(SELECT SQRT(SUM(Value * Value)) FROM dbo.FeedbackEmbedding WHERE FeedbackId = @FeedbackId);
SELECT
@FeedbackId AS feedback_id,
@FeedbackText AS feedback_text,
N'fallback-pseudo' AS model_name,
@DimensionCount AS dimension_count,
CAST(@Norm AS DECIMAL(18, 6)) AS norm,
(
SELECT
DimensionName AS k,
CAST(Value AS DECIMAL(18, 8)) AS v
FROM dbo.FeedbackEmbedding
WHERE FeedbackId = @FeedbackId
ORDER BY DimensionName
FOR JSON PATH
) AS vector_json
FOR JSON PATH, WITHOUT_ARRAY_WRAPPER, INCLUDE_NULL_VALUES;
"""
def overview_query() -> str:
return """
DECLARE @PseudoRows INT = ISNULL((SELECT COUNT(*) FROM sys.tables WHERE name = N'FeedbackEmbedding'), 0);
DECLARE @RealRows INT = ISNULL((SELECT COUNT(*) FROM sys.tables WHERE name = N'RealFeedbackEmbedding'), 0);
DECLARE @EmbeddingRows INT = 0;
DECLARE @EmbeddedFeedback INT = 0;
IF @PseudoRows > 0
BEGIN
SELECT
@EmbeddingRows = COUNT(*),
@EmbeddedFeedback = COUNT(DISTINCT FeedbackId)
FROM dbo.FeedbackEmbedding;
END
ELSE IF @RealRows > 0
BEGIN
SELECT
@EmbeddingRows = COUNT(*),
@EmbeddedFeedback = COUNT(DISTINCT FeedbackId)
FROM dbo.RealFeedbackEmbedding;
END
SELECT
CAST(SERVERPROPERTY('ProductVersion') AS NVARCHAR(40)) AS product_version,
CAST(SERVERPROPERTY('Edition') AS NVARCHAR(128)) AS edition,
DB_NAME() AS database_name,
(SELECT COUNT(*) FROM dbo.CustomerFeedback) AS feedback_count,
@EmbeddingRows AS embedding_rows,
@EmbeddedFeedback AS embedded_feedback_count,
(SELECT COUNT(*) FROM dbo.CustomerFeedback WHERE RiskLevel = N'Critical') AS critical_count,
(SELECT COUNT(*) FROM dbo.CustomerFeedback WHERE CustomerSegment = N'VIP') AS vip_count
FOR JSON PATH, WITHOUT_ARRAY_WRAPPER, INCLUDE_NULL_VALUES;
"""
def issue_distribution_query() -> str:
return """
SELECT
SourceIssueGroup,
COUNT(*) AS count
FROM dbo.CustomerFeedback
GROUP BY SourceIssueGroup
ORDER BY count DESC
FOR JSON PATH, INCLUDE_NULL_VALUES;
"""
def read_request_json(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
length = int(handler.headers.get("Content-Length", "0"))
if length <= 0:
return {}
raw = handler.rfile.read(length).decode("utf-8")
return json.loads(raw)
class DemoHandler(BaseHTTPRequestHandler):
server_version = "HelpdeskVectorDemo/1.0"
def do_GET(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if parsed.path == "/":
self.send_static("index.html")
return
if parsed.path == "/api/overview":
if csv_backend_enabled():
self.send_json(csv_overview_payload())
return
self.send_json(
{
"overview": run_sql(overview_query()),
"issues": run_sql(issue_distribution_query()),
"realEmbedding": run_sql(real_embedding_status_query()),
"embeddingMode": EMBEDDING_MODE,
"ollamaModel": OLLAMA_MODEL,
}
)
return
if parsed.path.startswith("/static/"):
self.send_static(parsed.path.removeprefix("/static/"))
return
self.send_error(404)
def do_POST(self) -> None:
try:
parsed = urllib.parse.urlparse(self.path)
payload = read_request_json(self)
if parsed.path == "/api/search":
if csv_backend_enabled():
self.send_json(csv_search_payload(payload))
return
mode = str(payload.get("mode") or "semantic")
top = sql_int(payload.get("top"), 10, 1, 50)
if mode == "keyword":
keyword = str(payload.get("keyword") or payload.get("question") or "trừ tiền")
rows = run_sql(keyword_query(keyword, top))
self.send_json({"mode": "keyword", "rows": rows})
return
question = str(payload.get("question") or "khách hàng bị trừ tiền dù giao dịch thất bại")
if real_embeddings_available():
rows = run_sql(
real_semantic_query(
embedding_json=query_embedding_json(question),
segment=payload.get("segment") or None,
risk=payload.get("risk") or None,
days_back=sql_int(payload.get("daysBack"), 30, 1, 365),
top=top,
)
)
self.send_json(
{
"mode": "semantic",
"queryProfile": f"real_ollama:{OLLAMA_MODEL}",
"rows": rows,
}
)
return
query_name = choose_query_profile(question)
rows = run_sql(
semantic_query(
query_name=query_name,
segment=payload.get("segment") or None,
risk=payload.get("risk") or None,
days_back=sql_int(payload.get("daysBack"), 30, 1, 365),
top=top,
)
)
self.send_json({"mode": "semantic", "queryProfile": f"fallback:{query_name}", "rows": rows})
return
if parsed.path == "/api/similar":
feedback_id = sql_int(payload.get("feedbackId"), 1, 1, 2_147_483_647)
top = sql_int(payload.get("top"), 15, 1, 50)
if csv_backend_enabled():
self.send_json(csv_similar_payload(feedback_id, top))
return
if real_embeddings_available():
rows = run_sql(real_similar_query(feedback_id, top))
self.send_json(
{
"feedbackId": feedback_id,
"queryProfile": f"real_ollama:{OLLAMA_MODEL}",
"rows": rows,
}
)
return
rows = run_sql(similar_query(feedback_id, top))
self.send_json({"feedbackId": feedback_id, "queryProfile": "fallback", "rows": rows})
return
if parsed.path == "/api/embedding":
feedback_id = sql_int(payload.get("feedbackId"), 1, 1, 2_147_483_647)
if csv_backend_enabled():
body, status = csv_embedding_payload(feedback_id)
self.send_json(body, status=status)
return
def _normalize_vector(vec: Any) -> list[dict[str, Any]]:
if vec is None:
return []
if isinstance(vec, str):
return json.loads(vec) if vec else []
if isinstance(vec, list):
return vec
return []
if real_embeddings_available():
raw = run_sql(real_feedback_embedding_query(feedback_id))
if not isinstance(raw, dict):
self.send_json({"error": f"Feedback #{feedback_id} not found"}, status=404)
return
items = _normalize_vector(raw.get("vector_json"))
values = [item["v"] for item in items]
self.send_json(
{
"feedbackId": feedback_id,
"source": f"real_ollama:{OLLAMA_MODEL}",
"modelName": raw.get("model_name"),
"dimensionCount": raw.get("dimension_count"),
"norm": raw.get("norm"),
"feedbackText": raw.get("feedback_text"),
"values": values,
}
)
return
raw = run_sql(fallback_feedback_embedding_query(feedback_id))
if not isinstance(raw, dict):
self.send_json({"error": f"Feedback #{feedback_id} not found"}, status=404)
return
items = _normalize_vector(raw.get("vector_json"))
values = [item["v"] for item in items]
labels = [item.get("k") for item in items]
self.send_json(
{
"feedbackId": feedback_id,
"source": "fallback-pseudo",
"modelName": raw.get("model_name"),
"dimensionCount": raw.get("dimension_count"),
"norm": raw.get("norm"),
"feedbackText": raw.get("feedback_text"),
"values": values,
"labels": labels,
}
)
return
if parsed.path == "/api/triage":
question = str(payload.get("question") or "khách hàng bị trừ tiền dù giao dịch thất bại")
if csv_backend_enabled():
self.send_json(csv_triage_payload(question))
return
if real_embeddings_available():
rows = run_sql(real_triage_query(query_embedding_json(question)))
self.send_json({"queryProfile": f"real_ollama:{OLLAMA_MODEL}", "rows": rows})
return
query_name = choose_query_profile(question)
rows = run_sql(triage_query(query_name))
self.send_json({"queryProfile": f"fallback:{query_name}", "rows": rows})
return
self.send_error(404)
except Exception as exc:
self.send_json({"error": str(exc)}, status=500)
def send_json(self, value: Any, status: int = 200) -> None:
body = json.dumps(value, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def send_static(self, name: str) -> None:
safe_name = name.strip("/") or "index.html"
path = (STATIC_ROOT / safe_name).resolve()
if not str(path).startswith(str(STATIC_ROOT.resolve())) or not path.exists() or not path.is_file():
self.send_error(404)
return
content_type = "text/plain; charset=utf-8"
if path.suffix == ".html":
content_type = "text/html; charset=utf-8"
elif path.suffix == ".css":
content_type = "text/css; charset=utf-8"
elif path.suffix == ".js":
content_type = "application/javascript; charset=utf-8"
body = path.read_bytes()
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format: str, *args: Any) -> None:
sys.stderr.write("%s - %s\n" % (self.address_string(), format % args))
def main() -> None:
port = sql_int(os.environ.get("HELPDESK_UI_PORT") or os.environ.get("PORT"), 8080, 1024, 65535)
host = os.environ.get("HELPDESK_UI_HOST", "127.0.0.1")
server = ThreadingHTTPServer((host, port), DemoHandler)
print(f"AI Helpdesk demo UI: http://{host}:{port}")
auth_mode = "SQL auth" if SQL_USER else "integrated auth"
backend = "csv" if csv_backend_enabled() else "sql"
print(f"Backend: {backend} | SQL Server: {SQL_SERVER} | Database: {SQL_DATABASE} | Auth: {auth_mode}")
server.serve_forever()
if __name__ == "__main__":
main()