satyacheck-backend / core /pipeline.py
omiii2005's picture
Initial clean deploy
87eb9ac
"""
SatyaCheck β€” Master Analysis Pipeline (7 Layers)
ΰ€Έΰ€€ΰ₯ΰ€― ΰ€•ΰ₯€ ΰ€œΰ€Ύΰ€ΰ€š
Orchestrates all 7 layers in the correct sequence:
Layer 1 (Semantic) ─┐
Layer 2 (Vision) ──
Layer 3 (Authority) ───► Layer 4 (XAI Verdict)
Layer 5 (Citation Network) ──
Layer 6 (Indian Languages) β”€β”˜
Layer 7 (Continuous Learning) ─► runs after Layer 4
Layers 1, 2, 3, 5, 6 run concurrently (asyncio.gather).
Layer 4 waits for 1+2+3.
Layer 7 waits for all layers.
Redis cache is checked first β€” bypasses pipeline on hit.
"""
import logging
import asyncio
import time
from datetime import datetime, timezone
from core.schemas import AnalyzeRequest, AnalysisResponse
from core.layer1_semantic import run_layer1
from core.layer2_multimodal import run_layer2
from core.layer3_authority import run_layer3
from core.layer4_xai import run_layer4
from core.layer5_citation_network import run_layer5
from core.layer6_indian_languages import run_layer6
from core.layer7_continuous_learning import run_layer7
from cache.redis_client import RedisClient
logger = logging.getLogger("satyacheck.pipeline")
async def run_full_pipeline(request: AnalyzeRequest) -> AnalysisResponse:
"""
Execute the complete 7-layer SatyaCheck analysis pipeline.
Execution flow:
1. Check Redis cache β†’ return instantly if hit
2. Run Layers 1 + 2 + 3 + 5 + 6 concurrently (asyncio.gather)
3. Run Layer 4 (synthesises 1+2+3) β†’ final verdict
4. Run Layer 7 (community feedback + benchmark data)
5. Cache result in Redis
6. Return complete AnalysisResponse
Args:
request: AnalyzeRequest from the Chrome Extension
Returns:
AnalysisResponse with all 7 layer results
"""
start_time = time.monotonic()
logger.info(f"πŸš€ Pipeline starting for: {request.url[:80]}...")
# ── Step 0: Redis Cache Check ─────────────────────────────────────────────
cached = await RedisClient.get(request.url)
if cached:
logger.info(f"⚑ Cache HIT β€” returning instant result for: {request.url[:60]}...")
return AnalysisResponse(**cached)
# ── Step 1+2+3+5+6: Run concurrently ──────────────────────────────────────
logger.info("⏳ Running Layers 1, 2, 3, 5, 6 concurrently...")
try:
l1_result, l2_result, l3_result, l5_result, l6_result = await asyncio.gather(
run_layer1(request.title, request.body_text),
run_layer2(request.image_urls),
run_layer3(request.domain, request.title, request.body_text),
run_layer5(request.url, request.title, request.body_text, request.domain),
run_layer6(request.title, request.body_text, request.url),
)
except Exception as exc:
logger.error(f"❌ Layer execution failed: {exc}")
raise
# ── Step 4: XAI Verdict ───────────────────────────────────────────────────
logger.info("⏳ Running Layer 4 (XAI)...")
l4_result = await run_layer4(l1_result, l2_result, l3_result)
# ── Step 7: Continuous Learning ───────────────────────────────────────────
logger.info("⏳ Running Layer 7 (Continuous Learning)...")
l7_result = await run_layer7(
url=request.url,
domain=request.domain,
l1_status=l1_result.status.value,
l2_status=l2_result.status.value,
l3_status=l3_result.status.value,
l4_risk=l4_result.overall_risk.value,
l5_status=l5_result.status,
l6_status=l6_result.status,
)
# ── Build Response ────────────────────────────────────────────────────────
elapsed_ms = (time.monotonic() - start_time) * 1000
# Convert Layer5 and Layer6 results to dicts for the response
l5_dict = l5_result.to_dict()
l6_dict = l6_result.to_dict()
l7_dict = l7_result.to_dict()
response = AnalysisResponse(
url=request.url,
title=request.title,
domain=request.domain,
analyzed_at=datetime.now(timezone.utc).isoformat(),
cached=False,
processing_time_ms=round(elapsed_ms, 1),
layer1=l1_result,
layer2=l2_result,
layer3=l3_result,
layer4=l4_result,
layer5=l5_dict,
layer6=l6_dict,
layer7=l7_dict,
)
# ── Cache Result ──────────────────────────────────────────────────────────
try:
await RedisClient.set(request.url, response.model_dump())
logger.info(f"βœ… Result cached for: {request.url[:60]}...")
except Exception as exc:
logger.warning(f"⚠️ Cache write failed: {exc}")
logger.info(
f"βœ… Pipeline complete in {elapsed_ms:.0f}ms β€” "
f"verdict={l4_result.overall_risk}, "
f"confidence={l4_result.confidence_score:.1f}%"
)
return response