Spaces:
Sleeping
Sleeping
| """ | |
| SatyaCheck β Master Analysis Pipeline (7 Layers) | |
| ΰ€Έΰ€€ΰ₯ΰ€― ΰ€ΰ₯ ΰ€ΰ€Ύΰ€ΰ€ | |
| Orchestrates all 7 layers in the correct sequence: | |
| Layer 1 (Semantic) ββ | |
| Layer 2 (Vision) ββ€ | |
| Layer 3 (Authority) ββ€ββΊ Layer 4 (XAI Verdict) | |
| Layer 5 (Citation Network) ββ€ | |
| Layer 6 (Indian Languages) ββ | |
| Layer 7 (Continuous Learning) ββΊ runs after Layer 4 | |
| Layers 1, 2, 3, 5, 6 run concurrently (asyncio.gather). | |
| Layer 4 waits for 1+2+3. | |
| Layer 7 waits for all layers. | |
| Redis cache is checked first β bypasses pipeline on hit. | |
| """ | |
| import logging | |
| import asyncio | |
| import time | |
| from datetime import datetime, timezone | |
| from core.schemas import AnalyzeRequest, AnalysisResponse | |
| from core.layer1_semantic import run_layer1 | |
| from core.layer2_multimodal import run_layer2 | |
| from core.layer3_authority import run_layer3 | |
| from core.layer4_xai import run_layer4 | |
| from core.layer5_citation_network import run_layer5 | |
| from core.layer6_indian_languages import run_layer6 | |
| from core.layer7_continuous_learning import run_layer7 | |
| from cache.redis_client import RedisClient | |
| logger = logging.getLogger("satyacheck.pipeline") | |
| async def run_full_pipeline(request: AnalyzeRequest) -> AnalysisResponse: | |
| """ | |
| Execute the complete 7-layer SatyaCheck analysis pipeline. | |
| Execution flow: | |
| 1. Check Redis cache β return instantly if hit | |
| 2. Run Layers 1 + 2 + 3 + 5 + 6 concurrently (asyncio.gather) | |
| 3. Run Layer 4 (synthesises 1+2+3) β final verdict | |
| 4. Run Layer 7 (community feedback + benchmark data) | |
| 5. Cache result in Redis | |
| 6. Return complete AnalysisResponse | |
| Args: | |
| request: AnalyzeRequest from the Chrome Extension | |
| Returns: | |
| AnalysisResponse with all 7 layer results | |
| """ | |
| start_time = time.monotonic() | |
| logger.info(f"π Pipeline starting for: {request.url[:80]}...") | |
| # ββ Step 0: Redis Cache Check βββββββββββββββββββββββββββββββββββββββββββββ | |
| cached = await RedisClient.get(request.url) | |
| if cached: | |
| logger.info(f"β‘ Cache HIT β returning instant result for: {request.url[:60]}...") | |
| return AnalysisResponse(**cached) | |
| # ββ Step 1+2+3+5+6: Run concurrently ββββββββββββββββββββββββββββββββββββββ | |
| logger.info("β³ Running Layers 1, 2, 3, 5, 6 concurrently...") | |
| try: | |
| l1_result, l2_result, l3_result, l5_result, l6_result = await asyncio.gather( | |
| run_layer1(request.title, request.body_text), | |
| run_layer2(request.image_urls), | |
| run_layer3(request.domain, request.title, request.body_text), | |
| run_layer5(request.url, request.title, request.body_text, request.domain), | |
| run_layer6(request.title, request.body_text, request.url), | |
| ) | |
| except Exception as exc: | |
| logger.error(f"β Layer execution failed: {exc}") | |
| raise | |
| # ββ Step 4: XAI Verdict βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logger.info("β³ Running Layer 4 (XAI)...") | |
| l4_result = await run_layer4(l1_result, l2_result, l3_result) | |
| # ββ Step 7: Continuous Learning βββββββββββββββββββββββββββββββββββββββββββ | |
| logger.info("β³ Running Layer 7 (Continuous Learning)...") | |
| l7_result = await run_layer7( | |
| url=request.url, | |
| domain=request.domain, | |
| l1_status=l1_result.status.value, | |
| l2_status=l2_result.status.value, | |
| l3_status=l3_result.status.value, | |
| l4_risk=l4_result.overall_risk.value, | |
| l5_status=l5_result.status, | |
| l6_status=l6_result.status, | |
| ) | |
| # ββ Build Response ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elapsed_ms = (time.monotonic() - start_time) * 1000 | |
| # Convert Layer5 and Layer6 results to dicts for the response | |
| l5_dict = l5_result.to_dict() | |
| l6_dict = l6_result.to_dict() | |
| l7_dict = l7_result.to_dict() | |
| response = AnalysisResponse( | |
| url=request.url, | |
| title=request.title, | |
| domain=request.domain, | |
| analyzed_at=datetime.now(timezone.utc).isoformat(), | |
| cached=False, | |
| processing_time_ms=round(elapsed_ms, 1), | |
| layer1=l1_result, | |
| layer2=l2_result, | |
| layer3=l3_result, | |
| layer4=l4_result, | |
| layer5=l5_dict, | |
| layer6=l6_dict, | |
| layer7=l7_dict, | |
| ) | |
| # ββ Cache Result ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| await RedisClient.set(request.url, response.model_dump()) | |
| logger.info(f"β Result cached for: {request.url[:60]}...") | |
| except Exception as exc: | |
| logger.warning(f"β οΈ Cache write failed: {exc}") | |
| logger.info( | |
| f"β Pipeline complete in {elapsed_ms:.0f}ms β " | |
| f"verdict={l4_result.overall_risk}, " | |
| f"confidence={l4_result.confidence_score:.1f}%" | |
| ) | |
| return response | |