noticecheck / traces /runtime.py
kingabzpro's picture
Simplify image trace classification
818c308
Raw
History Blame Contribute Delete
26 kB
"""Fast, deterministic, privacy-safe pipeline tracing."""
from __future__ import annotations
import json
import os
import queue
import re
import threading
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
TRACE_ROOT = Path(os.getenv("TRACE_DIR", ROOT / "traces"))
PENDING_DIR = TRACE_ROOT / "pending"
DATASET_REPO = os.getenv(
"HF_TRACE_DATASET_REPO",
"build-small-hackathon/pakistan-notice-helper-traces",
)
BATCH_SIZE = max(1, int(os.getenv("TRACE_BATCH_SIZE", "20")))
FLUSH_SECONDS = max(1.0, float(os.getenv("TRACE_FLUSH_SECONDS", "60")))
MAX_QUEUE_SIZE = max(1, int(os.getenv("TRACE_MAX_QUEUE_SIZE", "5000")))
RISK_LABELS = {
"Looks normal",
"Verify first",
"Suspicious",
"Likely scam",
"Inappropriate",
"none",
}
SIGNAL_PATTERNS = {
"otp": r"\b(?:otp|one[- ]time (?:pin|password)|verification code)\b|او ٹی پی",
"cnic": r"\bcnic\b|شناختی کارڈ",
"credentials": (
r"\b(?:pin|password|cvv|card details?|bank details?)\b|"
r"پاس ورڈ|کارڈ کی تفصیلات"
),
"link": (
r"(?:https?://|www\.|bit\.ly|tinyurl\.|cutt\.ly|\.xyz\b|\.top\b)|"
r"\b(?:link|url|domain)\b|لنک"
),
"urgency": (
r"\b(?:urgent|immediately|today|now|within \d+|last warning)\b|"
r"فوری|فوراً|آج|آخری وارننگ"
),
"payment": (
r"\b(?:pay|payment|fee|fine|transfer|send money|rs\.?|pkr)\b|"
r"ادائیگی|جرمانہ|رقم"
),
"refund_or_prize": (
r"\b(?:refund|prize|winner|lottery|cashback|reward)\b|"
r"انعام|ریفنڈ|قرعہ اندازی"
),
"courier": (
r"\b(?:parcel|courier|delivery|pakistan post|leopards|tcs|customs)\b|"
r"پارسل|کوریئر|ڈیلیوری|پاکستان پوسٹ"
),
"challan": (
r"\b(?:challan|traffic fine|traffic violation|e-challan)\b|"
r"چالان|ٹریفک جرمانہ"
),
"account_threat": (
r"\b(?:account|sim|service|electricity)\b.{0,50}"
r"\b(?:block|blocked|suspend|closed|disconnect)\b"
),
"off_platform_contact": (
r"\b(?:move|continue|contact|chat|message)\b.{0,50}"
r"\b(?:whatsapp|telegram|outside|another (?:app|platform)|phone number)\b|"
r"\bwhatsapp\b|واٹس ایپ"
),
"impersonation": (
r"\b(?:claims? to be|pretends? to be|impersonat(?:e|es|ing|ion)|"
r"fake (?:sender|branding|authority)|unverified (?:sender|identity)|"
r"(?:sender )?identity is unverified|official branding)\b|جعلی|نقالی"
),
}
EXAMPLE_PROFILES = {
"text-courier": ("text", "courier", {"link", "urgency", "payment", "courier"}),
"text-fbr": ("text", "fbr", {"cnic", "credentials", "urgency", "refund_or_prize"}),
"text-bank": ("text", "bank", {"otp", "urgency", "account_threat"}),
"image-courier": ("image", "courier", {"link", "urgency", "courier"}),
"image-mobile": ("image", "marketplace", {"credentials"}),
"image-traffic": ("image", "traffic_challan", {"link", "urgency", "payment", "challan"}),
}
RESULT_GUIDANCE = {
"Looks normal": (
"No strong scam indicators were found, but verify through an official "
"channel."
),
"Verify first": (
"Limited or ambiguous warning signs were found; verify independently."
),
"Suspicious": (
"Multiple warning signs were found; use caution and verify independently."
),
"Likely scam": (
"Strong scam indicators were found; avoid payments, links, and sharing "
"credentials."
),
"Inappropriate": "The content was not suitable for a scam-risk assessment.",
"none": "No completed assessment result was available.",
}
CATEGORY_DISPLAY_NAMES = {
"fbr": "FBR",
"bank": "bank",
"wallet": "mobile-wallet",
"utility": "utility",
"traffic_challan": "traffic-challan",
"courier": "courier",
"customs": "customs",
"university": "education",
"job": "job",
"marketplace": "marketplace",
"unknown": "unclassified",
}
SENSITIVE_VALUE_PATTERN = re.compile(
r"\b(?:password|passcode|pin|otp|cvv|account(?: number)?|card(?: number)?|"
r"tracking(?: id| number)?|consignment(?: id| number)?|reference(?: id| number)?)"
r"\s*(?:is|:|#|-)?\s*[A-Za-z0-9@._/-]{2,}",
re.I,
)
TITLE_CASE_PATTERN = re.compile(r"\b[A-Z][a-z]{2,}\b")
TRACE_FIELDS = (
"trace_id",
"timestamp",
"input",
"input_category",
"urgency",
"scam_tactics",
"result_summary",
"risk_label",
"reply_draft_policy",
)
REPLY_DRAFT_POLICIES = {"allowed", "suppressed", "not_applicable"}
RESIDUAL_IDENTIFIER_PATTERNS = {
"URL": re.compile(r"https?://|www\.", re.I),
"email": re.compile(r"\b[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\b"),
"CNIC": re.compile(r"\b\d{5}-\d{7}-\d\b"),
"phone number": re.compile(r"(?<!\d)(?:\+?92[- ]?|0)?3\d{2}[- ]?\d{7}(?!\d)"),
"account number": re.compile(r"\bPK\d{2}[A-Z0-9]{10,30}\b", re.I),
"card number": re.compile(r"(?<!\d)(?:\d[ -]?){12,19}(?!\d)"),
}
CATEGORY_TERMS = (
("fbr", ("fbr", "taxpayer", "tax refund", "revenue board", "ایف بی آر", "ٹیکس")),
(
"bank",
(
"bank",
"banking",
"hbl",
"ubl",
"meezan",
"alfalah",
"debit card",
"credit card",
"بینک",
),
),
(
"wallet",
(
"easypaisa",
"easy paisa",
"jazzcash",
"mobile wallet",
"ایزی پیسہ",
"جاز کیش",
),
),
(
"utility",
(
"electricity",
"gas bill",
"utility bill",
"lesco",
"k-electric",
"meter",
"بجلی",
"گیس بل",
),
),
(
"traffic_challan",
(
"challan",
"traffic fine",
"traffic violation",
"e-challan",
"vehicle fine",
"چالان",
"ٹریفک جرمانہ",
),
),
(
"courier",
(
"parcel",
"package",
"courier",
"delivery",
"shipment",
"consignment",
"pakistan post",
"leopards",
"tcs",
"پارسل",
"کوریئر",
"ڈیلیوری",
"پاکستان پوسٹ",
),
),
("customs", ("customs", "custom duty", "import duty", "کسٹمز")),
(
"university",
(
"university",
"admission",
"scholarship",
"student portal",
"tuition",
"hec",
"یونیورسٹی",
"داخلہ",
"اسکالرشپ",
),
),
(
"job",
(
"job",
"salary",
"recruiter",
"recruitment",
"employment",
"interview",
"work from home",
"نوکری",
"تنخواہ",
),
),
(
"marketplace",
(
"buyer",
"seller",
"marketplace",
"listing",
"product",
"whatsapp",
"move the conversation",
"خریدار",
"فروخت",
"واٹس ایپ",
),
),
)
def detect_signals(text: str, example_id: str = "") -> dict[str, bool]:
detected = {
name: bool(re.search(pattern, text or "", re.I | re.S))
for name, pattern in SIGNAL_PATTERNS.items()
}
profile = EXAMPLE_PROFILES.get(example_id)
if profile:
for name in profile[2]:
detected[name] = True
return detected
def detect_category(text: str, signals: dict[str, bool], example_id: str = "") -> str:
profile = EXAMPLE_PROFILES.get(example_id)
if profile:
return profile[1]
if signals["challan"]:
return "traffic_challan"
lowered = (text or "").lower()
for category, terms in CATEGORY_TERMS:
if any(term in lowered for term in terms):
return category
if signals["courier"]:
return "courier"
return "unknown"
def safe_description(category: str, signals: dict[str, bool]) -> str:
category_labels = {
"fbr": "FBR-style",
"bank": "Bank-style",
"wallet": "Wallet-style",
"utility": "Utility-style",
"traffic_challan": "Traffic-challan-style",
"courier": "Courier-style",
"customs": "Customs-style",
"university": "Education-style",
"job": "Job-style",
"marketplace": "Marketplace-style",
"unknown": "Unclassified",
}
signal_labels = {
"otp": "OTP",
"cnic": "CNIC",
"credentials": "credential",
"link": "link",
"urgency": "urgency",
"payment": "payment",
"refund_or_prize": "refund-or-prize",
"courier": "courier",
"challan": "challan",
"account_threat": "account-threat",
"off_platform_contact": "off-platform-contact",
"impersonation": "impersonation",
}
active = [signal_labels[name] for name, enabled in signals.items() if enabled][:4]
suffix = f" with {', '.join(active)} signals" if active else " with no mapped signals"
return f"{category_labels[category]} content{suffix}"
def redact_text(text: str) -> str:
value = re.sub(r"\s+", " ", text or "").strip()
value = SENSITIVE_VALUE_PATTERN.sub(
lambda match: match.group(0).split()[0] + " [REDACTED]",
value,
)
replacements = (
(r"https?://\S+|www\.\S+", "[LINK]"),
(r"\b[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\b", "[EMAIL]"),
(r"\b\d{5}-\d{7}-\d\b", "[CNIC]"),
(r"(?<!\d)(?:\+?92[- ]?|0)?3\d{2}[- ]?\d{7}(?!\d)", "[PHONE]"),
(r"\bPK\d{2}[A-Z0-9]{10,30}\b", "[ACCOUNT]"),
(r"(?<!\d)(?:\d[ -]?){12,19}(?!\d)", "[CARD_NUMBER]"),
(r"\b\d{3,}\b", "[NUMBER]"),
(
r"\b(?:address|location|house|street|road|flat)\b"
r"[^,.;]{0,60}",
"[ADDRESS]",
),
)
for pattern, replacement in replacements:
value = re.sub(pattern, replacement, value, flags=re.I)
value = TITLE_CASE_PATTERN.sub("[NAME_OR_ENTITY]", value)
value = re.sub(
r"(?:\[[A-Z_]+\]\s*){2,}",
lambda match: match.group(0).strip() + " ",
value,
)
return value[:500] or "[EMPTY]"
def result_summary(
risk_label: str,
category: str,
signals: dict[str, bool],
) -> str:
pattern = safe_description(category, signals)
novelty = (
"Unclassified pattern; this does not confirm a new scam type."
if category == "unknown"
else f"Known {CATEGORY_DISPLAY_NAMES[category]} pattern."
)
return f"{risk_label}: {pattern}. {novelty} {RESULT_GUIDANCE[risk_label]}"
def assessment_evidence(assessment: dict[str, Any] | None) -> str:
"""Return transient model evidence used only for allow-list classification."""
if not isinstance(assessment, dict):
return ""
values: list[str] = [str(assessment.get("simple_explanation", ""))]
red_flags = assessment.get("red_flags", [])
if isinstance(red_flags, list):
values.extend(str(item) for item in red_flags)
return " ".join(values)[:4000]
def build_input_profile(
text: str,
image_data_url: str,
example_id: str = "",
assessment: dict[str, Any] | None = None,
) -> dict[str, Any]:
profile = EXAMPLE_PROFILES.get(example_id)
if profile:
input_type = profile[0]
elif image_data_url:
input_type = "image"
else:
input_type = "text"
classification_text = text
if input_type == "image" and not example_id:
classification_text = " ".join(
part for part in (text, assessment_evidence(assessment)) if part
)
signals = detect_signals(classification_text, example_id)
category = detect_category(classification_text, signals, example_id)
tactics = [name for name, enabled in signals.items() if enabled]
if input_type == "image" and not assessment and not example_id:
input_description = "image: Assessment unavailable"
else:
input_description = f"image: {safe_description(category, signals)}"
return {
"input": (
f"text: {redact_text(text)}"
if input_type == "text"
else input_description
),
"input_category": category,
"urgency": signals["urgency"],
"scam_tactics": ", ".join(tactics) if tactics else "none",
}
def build_trace_record(
*,
text: str,
image_data_url: str,
example_id: str,
assessment: dict[str, Any] | None,
) -> dict[str, Any]:
trace_id = str(uuid.uuid4())
risk_label = str((assessment or {}).get("risk_label", "none"))
if risk_label not in RISK_LABELS:
risk_label = "none"
input_profile = build_input_profile(
text,
image_data_url,
example_id,
assessment,
)
category = input_profile["input_category"]
tactic_values = (
[]
if input_profile["scam_tactics"] == "none"
else input_profile["scam_tactics"].split(", ")
)
signals = {name: name in tactic_values for name in SIGNAL_PATTERNS}
assessment = assessment or {}
return {
"trace_id": trace_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
**input_profile,
"result_summary": result_summary(risk_label, category, signals),
"risk_label": risk_label,
"reply_draft_policy": (
"allowed"
if risk_label in {"Verify first", "Suspicious"}
else "suppressed"
if risk_label != "none"
else "not_applicable"
),
}
def validate_trace(record: Any) -> list[str]:
errors: list[str] = []
if not isinstance(record, dict):
return ["Trace must be an object."]
required = set(TRACE_FIELDS)
missing = required - record.keys()
if missing:
errors.append("Missing fields: " + ", ".join(sorted(missing)))
unexpected = record.keys() - required
if unexpected:
errors.append("Unexpected fields: " + ", ".join(sorted(unexpected)))
if record and next(iter(record)) != "trace_id":
errors.append("trace_id must be the first column.")
trace_id = record.get("trace_id")
try:
parsed_trace_id = uuid.UUID(trace_id) if isinstance(trace_id, str) else None
if parsed_trace_id is None or str(parsed_trace_id) != trace_id:
raise ValueError
except (ValueError, AttributeError):
errors.append("Trace ID must be a canonical UUID.")
timestamp = record.get("timestamp")
try:
parsed_timestamp = (
datetime.fromisoformat(timestamp) if isinstance(timestamp, str) else None
)
if (
parsed_timestamp is None
or parsed_timestamp.utcoffset() != timezone.utc.utcoffset(None)
):
raise ValueError
except (ValueError, TypeError):
errors.append("Timestamp must be an ISO 8601 UTC value.")
input_value = record.get("input")
if not (
isinstance(input_value, str)
and (
input_value.startswith("text: ")
or input_value.startswith("image: ")
)
):
errors.append("Input must use a fixed text: or image: description.")
elif len(input_value) > 506:
errors.append("Input exceeds the 500-character content limit.")
else:
for label, pattern in RESIDUAL_IDENTIFIER_PATTERNS.items():
if pattern.search(input_value):
errors.append(f"Input contains an unredacted {label}.")
if record.get("input_category") not in CATEGORY_DISPLAY_NAMES:
errors.append("Invalid input category.")
if not isinstance(record.get("urgency"), bool):
errors.append("Urgency must be boolean.")
tactics = record.get("scam_tactics")
tactic_values: list[str] = []
if not isinstance(tactics, str):
errors.append("Scam tactics must be a string.")
else:
tactic_values = [] if tactics == "none" else tactics.split(", ")
if (
any(value not in SIGNAL_PATTERNS for value in tactic_values)
or len(tactic_values) != len(set(tactic_values))
):
errors.append("Invalid scam tactics.")
result = record.get("result_summary")
if not isinstance(result, str) or not result or len(result) > 500:
errors.append("Result summary must be a non-empty string of at most 500 characters.")
if record.get("risk_label") not in RISK_LABELS:
errors.append("Invalid risk label.")
if record.get("reply_draft_policy") not in REPLY_DRAFT_POLICIES:
errors.append("Invalid reply draft policy.")
risk_label = record.get("risk_label")
category = record.get("input_category")
if risk_label in RISK_LABELS and category in CATEGORY_DISPLAY_NAMES:
expected_policy = (
"allowed"
if risk_label in {"Verify first", "Suspicious"}
else "suppressed"
if risk_label != "none"
else "not_applicable"
)
if record.get("reply_draft_policy") != expected_policy:
errors.append("Reply draft policy does not match the risk label.")
signals = {name: name in tactic_values for name in SIGNAL_PATTERNS}
expected_summary = result_summary(risk_label, category, signals)
if result != expected_summary:
errors.append("Result summary does not match the deterministic fields.")
if isinstance(input_value, str) and input_value.startswith("image: "):
expected_input = f"image: {safe_description(category, signals)}"
unavailable_input = (
risk_label == "none"
and input_value == "image: Assessment unavailable"
)
if input_value != expected_input and not unavailable_input:
errors.append("Image input does not match the deterministic fields.")
if any(isinstance(value, (dict, list)) for value in record.values()):
errors.append("Trace columns must contain scalar values only.")
forbidden_keys = {
"schema_version",
"app_commit",
"request_source",
"pipeline_steps",
"cache",
"failure",
"text_byte_bucket",
"text_character_bucket",
"image_size_bucket",
"language_hint",
"modal",
"exception_text_stored",
"identifiers_stored",
"input_storage",
"raw_image_stored",
"raw_input_stored",
"raw_model_output_stored",
"red_flag_count",
"reply_draft_returned",
"safe_next_step_count",
"signal_account_threat",
"signal_challan",
"signal_cnic",
"signal_courier",
"signal_credentials",
"signal_link",
"signal_otp",
"signal_payment",
"signal_refund_or_prize",
"raw_input",
"raw_text",
"image_data_url",
"raw_model_output",
"reply_draft",
"simple_explanation",
"error",
"exception",
"url",
"phone",
"account_number",
}
def walk(value: Any) -> None:
if isinstance(value, dict):
for key, child in value.items():
if key.lower() in forbidden_keys:
errors.append(f"Forbidden field: {key}")
walk(child)
elif isinstance(value, list):
for child in value:
walk(child)
walk(record)
return sorted(set(errors))
class TracePublisher:
def __init__(self) -> None:
self.queue: queue.Queue[dict[str, Any]] = queue.Queue(MAX_QUEUE_SIZE)
self.lock = threading.Lock()
self.thread: threading.Thread | None = None
self.counters = {
"queued": 0,
"persisted": 0,
"uploaded": 0,
"upload_failures": 0,
"dropped": 0,
}
def enqueue(self, record: dict[str, Any]) -> str:
if validate_trace(record):
with self.lock:
self.counters["dropped"] += 1
return "invalid"
try:
self.queue.put_nowait(record)
except queue.Full:
with self.lock:
self.counters["dropped"] += 1
return "dropped"
with self.lock:
self.counters["queued"] += 1
self._ensure_worker()
return "queued"
def status(self) -> dict[str, Any]:
with self.lock:
counters = dict(self.counters)
counters.update(
{
"queue_size": self.queue.qsize(),
"pending_shards": len(list(PENDING_DIR.glob("*.jsonl")))
if PENDING_DIR.exists()
else 0,
"dataset_repo": DATASET_REPO,
}
)
return counters
def _ensure_worker(self) -> None:
with self.lock:
if self.thread and self.thread.is_alive():
return
self.thread = threading.Thread(
target=self._worker,
name="privacy-safe-trace-publisher",
daemon=True,
)
self.thread.start()
def _worker(self) -> None:
batch: list[dict[str, Any]] = []
deadline: float | None = None
self._upload_pending()
while True:
timeout = (
max(0.05, deadline - time.monotonic())
if deadline is not None
else FLUSH_SECONDS
)
try:
record = self.queue.get(timeout=timeout)
batch.append(record)
if deadline is None:
deadline = time.monotonic() + FLUSH_SECONDS
except queue.Empty:
pass
if batch and (
len(batch) >= BATCH_SIZE
or (deadline is not None and time.monotonic() >= deadline)
):
self._persist_batch(batch)
batch = []
deadline = None
self._upload_pending()
def _persist_batch(self, records: list[dict[str, Any]]) -> None:
PENDING_DIR.mkdir(parents=True, exist_ok=True)
pending_count = self._pending_record_count()
capacity = max(0, MAX_QUEUE_SIZE - pending_count)
if capacity == 0:
with self.lock:
self.counters["dropped"] += len(records)
return
accepted = records[:capacity]
dropped = len(records) - len(accepted)
if dropped:
with self.lock:
self.counters["dropped"] += dropped
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
filename = f"trace-{timestamp}-{uuid.uuid4().hex[:8]}.jsonl"
final_path = PENDING_DIR / filename
temporary_path = final_path.with_suffix(".tmp")
content = "".join(
json.dumps(record, ensure_ascii=True) + "\n"
for record in accepted
)
temporary_path.write_text(content, encoding="utf-8")
os.replace(temporary_path, final_path)
with self.lock:
self.counters["persisted"] += len(accepted)
def _pending_record_count(self) -> int:
if not PENDING_DIR.exists():
return 0
count = 0
for path in PENDING_DIR.glob("*.jsonl"):
try:
count += sum(
1
for line in path.read_text(encoding="utf-8").splitlines()
if line
)
except OSError:
continue
return count
def _upload_pending(self) -> None:
token = os.getenv("HF_TOKEN", "").strip()
if not token or not PENDING_DIR.exists():
return
try:
from huggingface_hub import HfApi
api = HfApi(token=token)
for path in sorted(PENDING_DIR.glob("*.jsonl")):
date_path = datetime.now(timezone.utc).strftime("%Y/%m/%d")
uploaded = False
for attempt in range(3):
try:
api.upload_file(
path_or_fileobj=str(path),
path_in_repo=f"data/{date_path}/{path.name}",
repo_id=DATASET_REPO,
repo_type="dataset",
commit_message=f"Add privacy-safe trace shard {path.name}",
)
uploaded = True
break
except Exception:
if attempt < 2:
time.sleep(2**attempt)
if not uploaded:
with self.lock:
self.counters["upload_failures"] += 1
return
count = sum(1 for line in path.read_text(encoding="utf-8").splitlines() if line)
path.unlink(missing_ok=True)
with self.lock:
self.counters["uploaded"] += count
except Exception:
with self.lock:
self.counters["upload_failures"] += 1
PUBLISHER = TracePublisher()
def start_trace_worker() -> None:
PUBLISHER._ensure_worker()
def queue_trace(**kwargs: Any) -> tuple[str, str]:
record = build_trace_record(**kwargs)
return record["trace_id"], PUBLISHER.enqueue(record)
def trace_status() -> dict[str, Any]:
return PUBLISHER.status()