AHFIDAILabs's picture
Fix metrics: read by_language from model_config.json not hardcoded
1a2e3da verified
import logging
import sys
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Check 1 — Knowledge base exists and has chunks
# ---------------------------------------------------------------------------
def check_knowledge_base() -> bool:
kb_path = Path("models/knowledge_base")
if not kb_path.exists():
log.error(" [FAIL] Knowledge base not found at %s", kb_path)
return False
try:
import chromadb
client = chromadb.PersistentClient(path=str(kb_path))
collection = client.get_collection("immuniwatch_kb")
count = collection.count()
if count == 0:
log.error(" [FAIL] Knowledge base is empty")
return False
log.info(" [PASS] Knowledge base: %d chunks stored", count)
return True
except Exception as e:
log.error(" [FAIL] Knowledge base error: %s", e)
return False
# ---------------------------------------------------------------------------
# Check 2 — ONNX model file exists and is valid size
# ---------------------------------------------------------------------------
def check_onnx_model() -> bool:
onnx_path = Path("models/onnx/immuniwatch_classifier.onnx")
if not onnx_path.exists():
log.error(" [FAIL] ONNX model not found: %s", onnx_path)
return False
size_mb = onnx_path.stat().st_size / (1024 * 1024)
log.info(" [PASS] ONNX model: %.1f MB", size_mb)
return True
# ---------------------------------------------------------------------------
# Check 3 — Thresholds file correct
# ---------------------------------------------------------------------------
def check_thresholds() -> bool:
import json
path = Path("models/onnx/thresholds.json")
if not path.exists():
log.error(" [FAIL] thresholds.json not found")
return False
with open(path) as f:
data = json.load(f)
biases = data.get("class_biases", {})
required = ["factual", "misinformation", "irrelevant"]
for label in required:
if label not in biases:
log.error(" [FAIL] Missing bias for label: %s", label)
return False
log.info(" [PASS] Thresholds: factual=%.1f misinfo=%.1f irrelevant=%.1f",
biases["factual"], biases["misinformation"], biases["irrelevant"])
return True
# ---------------------------------------------------------------------------
# Check 4 — System design constants
# ---------------------------------------------------------------------------
def check_system_design_constants() -> bool:
passed = True
from src.ingestion.deduplication import JACCARD_THRESHOLD, EXACT_TTL_S
if JACCARD_THRESHOLD != 0.85:
log.error(" [FAIL] JACCARD_THRESHOLD should be 0.85, got %s", JACCARD_THRESHOLD)
passed = False
else:
log.info(" [PASS] Dedup Jaccard threshold: 0.85 (Section 4.3)")
if EXACT_TTL_S != 86400:
log.error(" [FAIL] EXACT_TTL_S should be 86400, got %s", EXACT_TTL_S)
passed = False
else:
log.info(" [PASS] Dedup TTL: 24 hours (Section 4.3)")
from src.intelligence.rag import TOP_K, SIMILARITY_THRESHOLD
if TOP_K != 5:
log.error(" [FAIL] TOP_K should be 5, got %s", TOP_K)
passed = False
else:
log.info(" [PASS] RAG top-K: 5 (Section 5.3)")
if SIMILARITY_THRESHOLD != 0.72:
log.error(" [FAIL] SIMILARITY_THRESHOLD should be 0.72, got %s", SIMILARITY_THRESHOLD)
passed = False
else:
log.info(" [PASS] RAG similarity threshold: 0.72 (Section 5.2)")
from src.intelligence.counter import SHORT_MAX_CHARS, MEDIUM_MAX_WORDS, LONG_MAX_WORDS
if SHORT_MAX_CHARS != 280:
log.error(" [FAIL] SHORT_MAX_CHARS should be 280, got %s", SHORT_MAX_CHARS)
passed = False
else:
log.info(" [PASS] Counter SHORT: ≤280 chars (Section 6.5)")
if MEDIUM_MAX_WORDS != 200:
log.error(" [FAIL] MEDIUM_MAX_WORDS should be 200, got %s", MEDIUM_MAX_WORDS)
passed = False
else:
log.info(" [PASS] Counter MEDIUM: ≤200 words (Section 6.5)")
if LONG_MAX_WORDS != 500:
log.error(" [FAIL] LONG_MAX_WORDS should be 500, got %s", LONG_MAX_WORDS)
passed = False
else:
log.info(" [PASS] Counter LONG: ≤500 words (Section 6.5)")
return passed
# ---------------------------------------------------------------------------
# Check 5 — Counter-response format compliance (no API calls)
# ---------------------------------------------------------------------------
def check_counter_format_compliance() -> bool:
from src.intelligence.counter import (
_enforce_short, _enforce_word_limit,
SHORT_MAX_CHARS, MEDIUM_MAX_WORDS, LONG_MAX_WORDS,
)
passed = True
# Test SHORT limit
long_text = "Vaccine dey safe for all pikin " * 20
short = _enforce_short(long_text)
if len(short) <= SHORT_MAX_CHARS:
log.info(" [PASS] SHORT format enforced: %d chars ≤ 280", len(short))
else:
log.error(" [FAIL] SHORT format exceeded: %d chars", len(short))
passed = False
# Test MEDIUM limit
medium = _enforce_word_limit(long_text, MEDIUM_MAX_WORDS)
if len(medium.split()) <= MEDIUM_MAX_WORDS + 1:
log.info(" [PASS] MEDIUM format enforced: %d words ≤ 200", len(medium.split()))
else:
log.error(" [FAIL] MEDIUM format exceeded: %d words", len(medium.split()))
passed = False
# Test LONG limit
long = _enforce_word_limit(long_text, LONG_MAX_WORDS)
if len(long.split()) <= LONG_MAX_WORDS + 1:
log.info(" [PASS] LONG format enforced: %d words ≤ 500", len(long.split()))
else:
log.error(" [FAIL] LONG format exceeded: %d words", len(long.split()))
passed = False
return passed
# ---------------------------------------------------------------------------
# Check 6 — Required files exist
# ---------------------------------------------------------------------------
def check_required_files() -> bool:
required = [
"models/onnx/immuniwatch_classifier.onnx",
"models/onnx/thresholds.json",
"models/onnx/model_config.json",
"src/models/classifier.py",
"src/api/main.py",
"src/api/routes.py",
"src/api/schemas.py",
"src/ingestion/worker.py",
"src/ingestion/deduplication.py",
"src/ingestion/connectors/base.py",
"src/ingestion/connectors/youtube.py",
"src/ingestion/connectors/sociavault.py",
"src/ingestion/connectors/bluesky.py",
"src/intelligence/ingestion.py",
"src/intelligence/rag.py",
"src/intelligence/counter.py",
"src/intelligence/evaluation.py",
"docker-compose.yml",
]
all_found = True
for path in required:
if Path(path).exists():
log.info(" [PASS] %s", path)
else:
log.error(" [FAIL] MISSING: %s", path)
all_found = False
return all_found
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def run_evaluation() -> None:
log.info("=" * 55)
log.info("ImmuniWatch — Lightweight Evaluation")
log.info("No heavy model loading — safe on CPU")
log.info("=" * 55)
checks = [
("Knowledge Base", check_knowledge_base),
("ONNX Model", check_onnx_model),
("Thresholds", check_thresholds),
("System Design Constants", check_system_design_constants),
("Counter Format Compliance", check_counter_format_compliance),
("Required Files", check_required_files),
]
results = []
for name, fn in checks:
log.info("")
log.info("[ %s ]", name)
try:
passed = fn()
except Exception as e:
log.error(" [FAIL] Unexpected error: %s", e)
passed = False
results.append((name, passed))
# Summary
log.info("")
log.info("=" * 55)
log.info("Evaluation Summary")
log.info("=" * 55)
passed_count = sum(1 for _, p in results if p)
for name, passed in results:
status = "[PASS]" if passed else "[FAIL]"
log.info(" %s %s", status, name)
log.info("")
log.info(" %d / %d checks passed", passed_count, len(checks))
log.info("=" * 55)
if passed_count == len(checks):
log.info("System is ready for Docker deployment.")
else:
log.warning("%d check(s) need attention before deployment.",
len(checks) - passed_count)
if __name__ == "__main__":
run_evaluation()