insighthub-backend / app /workers /analysis_worker.py
im1d's picture
fix: fallback when keywords not found
b8f53d9
"""
المهمة الأساسية - pipeline التحليل الكامل.
سابقاً كانت Celery task. الآن دالة عادية تُستدعى من
FastAPI BackgroundTasks عبر runner.run_with_concurrency.
تنفذ 11 خطوة وتحدّث progress في Supabase عند كل خطوة.
عند الفشل في أي خطوة، نُعلِّم الطلب بـ status=failed مع error_message.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, List, Optional
from loguru import logger
from app.analyzers.influencer_detector import detect_influencers
from app.analyzers.network_analyzer import build_interaction_network
from app.analyzers.report_generator import generate_recommendations, generate_summary
from app.analyzers.sentiment_analyzer import analyze_sentiment
from app.analyzers.topic_modeler import extract_topics
from app.config import settings
from app.database.supabase_client import (
create_notification,
log_audit,
save_results,
update_request_status,
)
from app.datasets.registry import default_dataset_ids
from app.datasets.stream_manager import StreamManager
from app.processors.cleaner import clean_post, is_valid_for_analysis
from app.spark.aggregations import compute_overall_stats, dataset_breakdown
from app.spark.filters import apply_full_filter, build_dataframe_from_rows
def run_analysis(request_id: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
يُنفّذ pipeline تحليل واحد من البداية للنهاية.
تُستدعى عادةً من FastAPI BackgroundTasks (داخل runner).
request_data يحتوي ما تم قراءته من analysis_requests:
id, user_id, title, keywords, platforms, analysis_type,
date_from, date_to, ...
اختياري:
dataset_ids: list[str] (إذا غاب، نستخدم default)
يُرجع dict موجز عن النتيجة - لا يرفع استثناءات للأعلى.
"""
started_at = datetime.utcnow()
user_id = str(request_data.get("user_id") or "")
title = request_data.get("title", "")
keywords: List[str] = request_data.get("keywords") or []
date_from = _to_iso_date(request_data.get("date_from"))
date_to = _to_iso_date(request_data.get("date_to"))
dataset_ids: Optional[List[str]] = request_data.get("dataset_ids") or default_dataset_ids()
logger.info(
f"[worker] analysis start id={request_id} kw={keywords} "
f"datasets={dataset_ids} dates=[{date_from}, {date_to}]"
)
try:
# ===== 1. Collecting =====
update_request_status(request_id, "collecting", 10)
# ===== 2. Stream datasets =====
manager = StreamManager()
raw_rows = list(manager.stream_many(
dataset_ids=dataset_ids,
max_total_rows=settings.max_posts_per_analysis,
))
logger.info(f"[worker] streamed {len(raw_rows)} raw rows")
update_request_status(request_id, "collecting", 25)
if not raw_rows:
return _fail(request_id, "لم يتم جلب أي بيانات من المصادر المحددة.")
# ===== 3. Clean + validate =====
cleaned = []
for row in raw_rows:
cp = clean_post(row)
if is_valid_for_analysis(cp):
cleaned.append(cp)
logger.info(f"[worker] valid after cleaning: {len(cleaned)}")
update_request_status(request_id, "analyzing", 35)
if not cleaned:
return _fail(request_id, "لم تتبق بيانات صالحة بعد التنظيف.")
# ===== 4-5. Build Spark DataFrame + filter =====
df = build_dataframe_from_rows(cleaned)
df_filtered = apply_full_filter(
df,
keywords=keywords,
date_from=date_from,
date_to=date_to,
deduplicate=True,
)
# نُعيد البيانات إلى Python للتحليل النصي
filtered_rows = df_filtered.collect()
overall = compute_overall_stats(df_filtered)
breakdown = dataset_breakdown(df_filtered)
update_request_status(request_id, "analyzing", 45)
logger.info(
f"[worker] after filter: posts={overall['total_posts']} "
f"users={overall['total_users']} reach={overall['total_reach']}"
)
# إذا الفلتر لم يجد نتائج كافية، نستخدم كل البيانات المنظّفة
# (الـ datasets العامة قد لا تحتوي الكلمات المفتاحية المحددة)
if overall["total_posts"] < settings.min_posts_threshold:
logger.warning(
f"[worker] keyword filter returned {overall['total_posts']} posts "
f"(below threshold {settings.min_posts_threshold}). "
f"Falling back to all {len(cleaned)} cleaned posts."
)
# نستخدم كل البيانات المنظّفة بدون فلتر الكلمات المفتاحية
posts = cleaned[:settings.max_posts_per_analysis]
overall = {
"total_posts": len(posts),
"total_users": len(set(p.get("user_id") or f"anon_{i}" for i, p in enumerate(posts))),
"total_reach": sum(p.get("user_followers") or 0 for p in posts) or len(posts) * 10,
}
breakdown = {}
else:
# نُحوّل Spark Rows إلى dicts نمط cleaned (مع mentions/hashtags المُستخرجة)
filtered_texts_set = {r["text"] for r in filtered_rows}
posts = [p for p in cleaned if p.get("cleaned_text") and p["cleaned_text"] in filtered_texts_set]
if not posts:
posts = cleaned[:settings.max_posts_per_analysis]
# ===== 6. Sentiment =====
update_request_status(request_id, "analyzing", 55)
sentiment = analyze_sentiment(posts)
logger.info(
f"[worker] sentiment: pos={sentiment['positive']} "
f"neu={sentiment['neutral']} neg={sentiment['negative']}"
)
update_request_status(request_id, "analyzing", 65)
# ===== 7. Topics =====
topics = extract_topics(
[p["cleaned_text"] for p in posts if p.get("cleaned_text")],
n_topics=8,
method="tfidf",
)
logger.info(f"[worker] topics: {len(topics)} found")
update_request_status(request_id, "analyzing", 75)
# ===== 8. Influencers =====
influencers = detect_influencers(posts, top_n=10)
logger.info(f"[worker] influencers: {len(influencers)}")
update_request_status(request_id, "analyzing", 82)
# ===== 9. Network =====
network = build_interaction_network(posts, max_nodes=50, max_edges=200)
logger.info(
f"[worker] network: nodes={len(network['nodes'])} "
f"edges={len(network['edges'])}"
)
update_request_status(request_id, "analyzing", 90)
# ===== 10. Summary + Recommendations =====
analysis_data = {
"sentiment": {
"positive": sentiment["positive"],
"neutral": sentiment["neutral"],
"negative": sentiment["negative"],
"timeline": sentiment["timeline"],
},
"topics": topics,
"influencers": influencers,
"total_posts": overall["total_posts"],
"total_users": overall["total_users"],
"keywords": keywords,
}
summary = generate_summary(analysis_data)
recommendations = generate_recommendations(analysis_data)
# ===== 11. Save & notify =====
results = {
"total_posts": int(overall["total_posts"]),
"total_users": int(overall["total_users"]),
"total_reach": int(overall["total_reach"]),
"sentiment_positive": float(sentiment["positive"]),
"sentiment_neutral": float(sentiment["neutral"]),
"sentiment_negative": float(sentiment["negative"]),
"top_topics": topics,
"top_influencers": influencers,
"sentiment_timeline": sentiment["timeline"],
"network_nodes": network["nodes"],
"network_edges": network["edges"],
"summary": summary,
"recommendations": recommendations,
}
save_results(request_id, results)
update_request_status(request_id, "completed", 100)
if user_id:
create_notification(
user_id=user_id,
title="اكتمل التحليل",
body=f"تحليل '{title}' جاهز للمراجعة",
notif_type="analysis_complete",
link=f"#analysis-{request_id}",
)
log_audit(
action="ANALYSIS_COMPLETED",
entity_type="analysis_request",
entity_id=request_id,
metadata={
"total_posts": overall["total_posts"],
"duration_seconds": (datetime.utcnow() - started_at).total_seconds(),
"datasets_used": breakdown,
},
)
duration = (datetime.utcnow() - started_at).total_seconds()
logger.info(f"[worker] analysis {request_id} completed in {duration:.1f}s")
return {
"status": "success",
"request_id": request_id,
"total_posts": overall["total_posts"],
"duration_seconds": duration,
}
except Exception as exc:
logger.exception(f"[worker] analysis {request_id} failed")
return _fail(request_id, f"فشل التحليل: {str(exc)[:300]}")
# =====================================================================
# Helpers
# =====================================================================
def _fail(request_id: str, message: str) -> Dict[str, Any]:
update_request_status(request_id, "failed", 0, error_message=message)
log_audit(
action="ANALYSIS_FAILED",
entity_type="analysis_request",
entity_id=request_id,
metadata={"error": message},
)
return {"status": "failed", "request_id": request_id, "error": message}
def _to_iso_date(value: Any) -> Optional[str]:
if not value:
return None
if isinstance(value, str):
return value[:10]
try:
return value.isoformat()[:10]
except Exception:
return None