""" المهمة الأساسية - pipeline التحليل الكامل. سابقاً كانت Celery task. الآن دالة عادية تُستدعى من FastAPI BackgroundTasks عبر runner.run_with_concurrency. تنفذ 11 خطوة وتحدّث progress في Supabase عند كل خطوة. عند الفشل في أي خطوة، نُعلِّم الطلب بـ status=failed مع error_message. """ from __future__ import annotations from datetime import datetime from typing import Any, Dict, List, Optional from loguru import logger from app.analyzers.influencer_detector import detect_influencers from app.analyzers.network_analyzer import build_interaction_network from app.analyzers.report_generator import generate_recommendations, generate_summary from app.analyzers.sentiment_analyzer import analyze_sentiment from app.analyzers.topic_modeler import extract_topics from app.config import settings from app.database.supabase_client import ( create_notification, log_audit, save_results, update_request_status, ) from app.datasets.registry import default_dataset_ids from app.datasets.stream_manager import StreamManager from app.processors.cleaner import clean_post, is_valid_for_analysis from app.spark.aggregations import compute_overall_stats, dataset_breakdown from app.spark.filters import apply_full_filter, build_dataframe_from_rows def run_analysis(request_id: str, request_data: Dict[str, Any]) -> Dict[str, Any]: """ يُنفّذ pipeline تحليل واحد من البداية للنهاية. تُستدعى عادةً من FastAPI BackgroundTasks (داخل runner). request_data يحتوي ما تم قراءته من analysis_requests: id, user_id, title, keywords, platforms, analysis_type, date_from, date_to, ... اختياري: dataset_ids: list[str] (إذا غاب، نستخدم default) يُرجع dict موجز عن النتيجة - لا يرفع استثناءات للأعلى. """ started_at = datetime.utcnow() user_id = str(request_data.get("user_id") or "") title = request_data.get("title", "") keywords: List[str] = request_data.get("keywords") or [] date_from = _to_iso_date(request_data.get("date_from")) date_to = _to_iso_date(request_data.get("date_to")) dataset_ids: Optional[List[str]] = request_data.get("dataset_ids") or default_dataset_ids() logger.info( f"[worker] analysis start id={request_id} kw={keywords} " f"datasets={dataset_ids} dates=[{date_from}, {date_to}]" ) try: # ===== 1. Collecting ===== update_request_status(request_id, "collecting", 10) # ===== 2. Stream datasets ===== manager = StreamManager() raw_rows = list(manager.stream_many( dataset_ids=dataset_ids, max_total_rows=settings.max_posts_per_analysis, )) logger.info(f"[worker] streamed {len(raw_rows)} raw rows") update_request_status(request_id, "collecting", 25) if not raw_rows: return _fail(request_id, "لم يتم جلب أي بيانات من المصادر المحددة.") # ===== 3. Clean + validate ===== cleaned = [] for row in raw_rows: cp = clean_post(row) if is_valid_for_analysis(cp): cleaned.append(cp) logger.info(f"[worker] valid after cleaning: {len(cleaned)}") update_request_status(request_id, "analyzing", 35) if not cleaned: return _fail(request_id, "لم تتبق بيانات صالحة بعد التنظيف.") # ===== 4-5. Build Spark DataFrame + filter ===== df = build_dataframe_from_rows(cleaned) df_filtered = apply_full_filter( df, keywords=keywords, date_from=date_from, date_to=date_to, deduplicate=True, ) # نُعيد البيانات إلى Python للتحليل النصي filtered_rows = df_filtered.collect() overall = compute_overall_stats(df_filtered) breakdown = dataset_breakdown(df_filtered) update_request_status(request_id, "analyzing", 45) logger.info( f"[worker] after filter: posts={overall['total_posts']} " f"users={overall['total_users']} reach={overall['total_reach']}" ) # إذا الفلتر لم يجد نتائج كافية، نستخدم كل البيانات المنظّفة # (الـ datasets العامة قد لا تحتوي الكلمات المفتاحية المحددة) if overall["total_posts"] < settings.min_posts_threshold: logger.warning( f"[worker] keyword filter returned {overall['total_posts']} posts " f"(below threshold {settings.min_posts_threshold}). " f"Falling back to all {len(cleaned)} cleaned posts." ) # نستخدم كل البيانات المنظّفة بدون فلتر الكلمات المفتاحية posts = cleaned[:settings.max_posts_per_analysis] overall = { "total_posts": len(posts), "total_users": len(set(p.get("user_id") or f"anon_{i}" for i, p in enumerate(posts))), "total_reach": sum(p.get("user_followers") or 0 for p in posts) or len(posts) * 10, } breakdown = {} else: # نُحوّل Spark Rows إلى dicts نمط cleaned (مع mentions/hashtags المُستخرجة) filtered_texts_set = {r["text"] for r in filtered_rows} posts = [p for p in cleaned if p.get("cleaned_text") and p["cleaned_text"] in filtered_texts_set] if not posts: posts = cleaned[:settings.max_posts_per_analysis] # ===== 6. Sentiment ===== update_request_status(request_id, "analyzing", 55) sentiment = analyze_sentiment(posts) logger.info( f"[worker] sentiment: pos={sentiment['positive']} " f"neu={sentiment['neutral']} neg={sentiment['negative']}" ) update_request_status(request_id, "analyzing", 65) # ===== 7. Topics ===== topics = extract_topics( [p["cleaned_text"] for p in posts if p.get("cleaned_text")], n_topics=8, method="tfidf", ) logger.info(f"[worker] topics: {len(topics)} found") update_request_status(request_id, "analyzing", 75) # ===== 8. Influencers ===== influencers = detect_influencers(posts, top_n=10) logger.info(f"[worker] influencers: {len(influencers)}") update_request_status(request_id, "analyzing", 82) # ===== 9. Network ===== network = build_interaction_network(posts, max_nodes=50, max_edges=200) logger.info( f"[worker] network: nodes={len(network['nodes'])} " f"edges={len(network['edges'])}" ) update_request_status(request_id, "analyzing", 90) # ===== 10. Summary + Recommendations ===== analysis_data = { "sentiment": { "positive": sentiment["positive"], "neutral": sentiment["neutral"], "negative": sentiment["negative"], "timeline": sentiment["timeline"], }, "topics": topics, "influencers": influencers, "total_posts": overall["total_posts"], "total_users": overall["total_users"], "keywords": keywords, } summary = generate_summary(analysis_data) recommendations = generate_recommendations(analysis_data) # ===== 11. Save & notify ===== results = { "total_posts": int(overall["total_posts"]), "total_users": int(overall["total_users"]), "total_reach": int(overall["total_reach"]), "sentiment_positive": float(sentiment["positive"]), "sentiment_neutral": float(sentiment["neutral"]), "sentiment_negative": float(sentiment["negative"]), "top_topics": topics, "top_influencers": influencers, "sentiment_timeline": sentiment["timeline"], "network_nodes": network["nodes"], "network_edges": network["edges"], "summary": summary, "recommendations": recommendations, } save_results(request_id, results) update_request_status(request_id, "completed", 100) if user_id: create_notification( user_id=user_id, title="اكتمل التحليل", body=f"تحليل '{title}' جاهز للمراجعة", notif_type="analysis_complete", link=f"#analysis-{request_id}", ) log_audit( action="ANALYSIS_COMPLETED", entity_type="analysis_request", entity_id=request_id, metadata={ "total_posts": overall["total_posts"], "duration_seconds": (datetime.utcnow() - started_at).total_seconds(), "datasets_used": breakdown, }, ) duration = (datetime.utcnow() - started_at).total_seconds() logger.info(f"[worker] analysis {request_id} completed in {duration:.1f}s") return { "status": "success", "request_id": request_id, "total_posts": overall["total_posts"], "duration_seconds": duration, } except Exception as exc: logger.exception(f"[worker] analysis {request_id} failed") return _fail(request_id, f"فشل التحليل: {str(exc)[:300]}") # ===================================================================== # Helpers # ===================================================================== def _fail(request_id: str, message: str) -> Dict[str, Any]: update_request_status(request_id, "failed", 0, error_message=message) log_audit( action="ANALYSIS_FAILED", entity_type="analysis_request", entity_id=request_id, metadata={"error": message}, ) return {"status": "failed", "request_id": request_id, "error": message} def _to_iso_date(value: Any) -> Optional[str]: if not value: return None if isinstance(value, str): return value[:10] try: return value.isoformat()[:10] except Exception: return None