File size: 2,728 Bytes
2f073d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
Async worker for processing analysis jobs via Redis queue.
Handles heavy inference tasks that are offloaded from the main API.

Run: python -m backend.app.workers.analysis_worker
"""
import asyncio
import json

import redis

from backend.app.core.config import settings
from backend.app.core.logging import setup_logging, get_logger
from backend.app.services.hf_service import detect_ai_text, get_embeddings, detect_harm
from backend.app.services.groq_service import compute_perplexity
from backend.app.services.stylometry import compute_stylometry_score
from backend.app.services.ensemble import compute_ensemble
from backend.app.services.vector_db import compute_cluster_score, upsert_embedding

setup_logging(settings.LOG_LEVEL)
logger = get_logger(__name__)

QUEUE_NAME = "analysis_jobs"


def run_worker():
    """Blocking worker loop that processes analysis jobs from Redis queue."""
    r = redis.from_url(settings.REDIS_URL, decode_responses=True)
    logger.info("Worker started, listening on queue", queue=QUEUE_NAME)

    while True:
        try:
            _, raw = r.brpop(QUEUE_NAME, timeout=30)
            if raw is None:
                continue
            job = json.loads(raw)
            text = job.get("text", "")
            job_id = job.get("id", "unknown")
            logger.info("Processing job", job_id=job_id)

            result = asyncio.run(_process_job(text))
            r.setex(f"result:{job_id}", 3600, json.dumps(result))
            logger.info("Job completed", job_id=job_id)
        except Exception as e:
            logger.error("Worker error", error=str(e))


async def _process_job(text: str) -> dict:
    """Process a single analysis job."""
    try:
        p_ai = await detect_ai_text(text)
    except Exception:
        p_ai = None

    s_perp = None
    if p_ai is not None and p_ai > settings.PERPLEXITY_THRESHOLD:
        s_perp = await compute_perplexity(text)

    s_embed_cluster = None
    try:
        embeddings = await get_embeddings(text)
        s_embed_cluster = await compute_cluster_score(embeddings)
        await upsert_embedding(f"worker_{hash(text)}", embeddings)
    except Exception:
        pass

    p_ext = await detect_harm(text)
    s_styl = compute_stylometry_score(text)

    ensemble_result = compute_ensemble(
        p_ai=p_ai, s_perp=s_perp, s_embed_cluster=s_embed_cluster,
        p_ext=p_ext, s_styl=s_styl,
    )

    return {
        "p_ai": p_ai,
        "s_perp": s_perp,
        "s_embed_cluster": s_embed_cluster,
        "p_ext": p_ext,
        "s_styl": s_styl,
        "threat_score": ensemble_result["threat_score"],
        "explainability": ensemble_result["explainability"],
    }


if __name__ == "__main__":
    run_worker()