Spaces:
Sleeping
Sleeping
| """ | |
| وحدة جمع البيانات الحقيقية من المصادر الست | |
| Real-time data scraping from 6 conflict monitoring sources | |
| """ | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import json | |
| import re | |
| import time | |
| import hashlib | |
| import logging | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass, field, asdict | |
| from typing import List, Optional, Dict | |
| from abc import ABC, abstractmethod | |
| import threading | |
| import traceback | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("scraper") | |
| # ===== نموذج البيانات ===== | |
| class Event: | |
| id: str | |
| title: str | |
| event_type: str # strike, missile, aircraft, troops, naval, drone, defense | |
| severity: str # critical, high, medium, low | |
| category: str # غارة جوية, صاروخ, etc | |
| lat: float | |
| lng: float | |
| location: str | |
| region: str # middleeast, europe, africa, redSea, asia | |
| details: str | |
| source: str | |
| source_url: str = "" | |
| timestamp: float = 0 # unix timestamp | |
| raw_time: str = "" | |
| extra: Dict = field(default_factory=dict) | |
| def to_dict(self): | |
| return asdict(self) | |
| def minutes_ago(self): | |
| if self.timestamp > 0: | |
| return max(1, int((time.time() - self.timestamp) / 60)) | |
| return 30 | |
| def is_expired(self): | |
| """الحدث ينتهي بعد 24 ساعة""" | |
| return self.minutes_ago > 1440 | |
| class Flight: | |
| callsign: str | |
| aircraft_type: str | |
| altitude: str | |
| speed: str | |
| lat: float | |
| lng: float | |
| heading: str | |
| origin: str | |
| flight_type: str # military, recon, cargo, drone | |
| source: str | |
| hex_code: str = "" | |
| squawk: str = "" | |
| timestamp: float = 0 | |
| def to_dict(self): | |
| return asdict(self) | |
| # ===== مصنف المنطقة الجغرافية ===== | |
| def classify_region(lat: float, lng: float, text: str = "") -> str: | |
| text_lower = text.lower() if text else "" | |
| # بالنص | |
| me_keywords = ["syria", "iraq", "iran", "yemen", "lebanon", "gaza", "israel", | |
| "سوريا", "عراق", "إيران", "يمن", "لبنان", "غزة", "فلسطين", | |
| "jordan", "الأردن", "qatar", "قطر", "saudi", "سعودي", "bahrain", | |
| "kuwait", "كويت", "uae", "إمارات", "oman", "عمان", "turkey", "تركيا"] | |
| eu_keywords = ["ukraine", "أوكرانيا", "russia", "روسيا", "donbas", "دونباس", | |
| "crimea", "القرم", "belgorod", "بيلغورود", "poland", "romania"] | |
| af_keywords = ["sudan", "سودان", "libya", "ليبيا", "ethiopia", "إثيوبيا", | |
| "somalia", "صومال", "mali", "niger", "chad", "تشاد"] | |
| rs_keywords = ["red sea", "البحر الأحمر", "bab al-mandab", "باب المندب", | |
| "houthi", "حوثي", "aden", "عدن"] | |
| for kw in rs_keywords: | |
| if kw in text_lower: | |
| return "redSea" | |
| for kw in me_keywords: | |
| if kw in text_lower: | |
| return "middleeast" | |
| for kw in eu_keywords: | |
| if kw in text_lower: | |
| return "europe" | |
| for kw in af_keywords: | |
| if kw in text_lower: | |
| return "africa" | |
| # بالإحداثيات | |
| if 10 <= lat <= 42 and 25 <= lng <= 63: | |
| if 12 <= lat <= 20 and 38 <= lng <= 50: | |
| return "redSea" | |
| return "middleeast" | |
| elif 44 <= lat <= 60 and 22 <= lng <= 45: | |
| return "europe" | |
| elif -5 <= lat <= 35 and -20 <= lng <= 55 and lat < 30: | |
| return "africa" | |
| elif 5 <= lat <= 40 and 63 <= lng <= 100: | |
| return "asia" | |
| return "middleeast" | |
| # ===== مصنف نوع الحدث ===== | |
| def classify_event_type(text: str) -> tuple: | |
| text_lower = text.lower() | |
| strike_kw = ["airstrike", "strike", "bombing", "shelling", "bombardment", "explosion", | |
| "غارة", "قصف", "ضربة", "انفجار", "تدمير", "استهداف", "air raid"] | |
| missile_kw = ["missile", "rocket", "intercept", "ballistic", "cruise", | |
| "صاروخ", "اعتراض", "باليستي", "كروز", "إطلاق صاروخ"] | |
| aircraft_kw = ["aircraft", "plane", "jet", "fighter", "drone", "uav", "flight", | |
| "طائرة", "مقاتلة", "تحليق", "طيران", "مسيّرة"] | |
| troops_kw = ["troops", "forces", "military movement", "convoy", "deployment", | |
| "قوات", "تحركات", "رتل", "انتشار", "مدرعات", "مشاة", "تعزيز"] | |
| naval_kw = ["naval", "ship", "carrier", "destroyer", "navy", "maritime", | |
| "بحري", "سفينة", "حاملة", "مدمرة", "بارجة", "بحر"] | |
| defense_kw = ["defense", "air defense", "interception", "iron dome", "patriot", | |
| "دفاع", "دفاع جوي", "اعتراض", "قبة حديدية"] | |
| for kw in missile_kw: | |
| if kw in text_lower: | |
| return ("missile", "صاروخ", "high") | |
| for kw in strike_kw: | |
| if kw in text_lower: | |
| return ("strike", "غارة جوية", "critical") | |
| for kw in defense_kw: | |
| if kw in text_lower: | |
| return ("missile", "اعتراض / دفاع", "high") | |
| for kw in aircraft_kw: | |
| if kw in text_lower: | |
| return ("aircraft", "حركة طيران", "medium") | |
| for kw in naval_kw: | |
| if kw in text_lower: | |
| return ("naval", "نشاط بحري", "medium") | |
| for kw in troops_kw: | |
| if kw in text_lower: | |
| return ("troops", "تحرك قوات", "high") | |
| return ("strike", "حدث عسكري", "medium") | |
| # ===== مصنف الخطورة ===== | |
| def classify_severity(text: str, event_type: str) -> str: | |
| text_lower = text.lower() | |
| critical_kw = ["breaking", "urgent", "عاجل", "critical", "mass", "multiple", | |
| "killed", "casualties", "قتلى", "شهداء", "مجزرة", "كبير"] | |
| high_kw = ["alert", "warning", "تنبيه", "تحذير", "confirmed", "مؤكد"] | |
| for kw in critical_kw: | |
| if kw in text_lower: | |
| return "critical" | |
| for kw in high_kw: | |
| if kw in text_lower: | |
| return "high" | |
| if event_type in ("strike", "missile"): | |
| return "high" | |
| return "medium" | |
| # ===== معرّف فريد ===== | |
| def make_event_id(source: str, text: str) -> str: | |
| raw = f"{source}:{text[:80]}" | |
| return hashlib.md5(raw.encode()).hexdigest()[:12] | |
| # ===== رؤوس الطلبات ===== | |
| HEADERS = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.9,ar;q=0.8", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Connection": "keep-alive", | |
| } | |
| # ===== الكاشطات ===== | |
| class BaseScraper(ABC): | |
| """كاشط أساسي""" | |
| name: str = "Unknown" | |
| url: str = "" | |
| status: str = "offline" | |
| last_fetch: float = 0 | |
| error_msg: str = "" | |
| def fetch(self) -> List[Event]: | |
| pass | |
| def safe_fetch(self) -> List[Event]: | |
| try: | |
| logger.info(f"[{self.name}] جارٍ الجمع...") | |
| events = self.fetch() | |
| self.status = "online" | |
| self.last_fetch = time.time() | |
| self.error_msg = "" | |
| logger.info(f"[{self.name}] تم جمع {len(events)} حدث") | |
| return events | |
| except Exception as e: | |
| self.status = "error" | |
| self.error_msg = str(e) | |
| logger.error(f"[{self.name}] خطأ: {e}") | |
| return [] | |
| class BamqamScraper(BaseScraper): | |
| """جمع البيانات من Bamqam""" | |
| name = "Bamqam" | |
| url = "https://bamqam.com" | |
| def fetch(self) -> List[Event]: | |
| events = [] | |
| try: | |
| # محاولة جلب الصفحة الرئيسية | |
| resp = requests.get(self.url, headers=HEADERS, timeout=15) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| # البحث عن بيانات JSON مضمنة في السكربتات | |
| scripts = soup.find_all("script") | |
| for script in scripts: | |
| text = script.string or "" | |
| # البحث عن أنماط بيانات JSON | |
| json_matches = re.findall(r'(?:events|data|items)\s*[:=]\s*(\[.*?\])', text, re.DOTALL) | |
| for match in json_matches: | |
| try: | |
| items = json.loads(match) | |
| for item in items: | |
| if isinstance(item, dict): | |
| title = item.get("title", item.get("name", item.get("text", ""))) | |
| if not title: | |
| continue | |
| lat = float(item.get("lat", item.get("latitude", 33.0))) | |
| lng = float(item.get("lng", item.get("longitude", 44.0))) | |
| etype, cat, sev = classify_event_type(title) | |
| events.append(Event( | |
| id=make_event_id("bamqam", title), | |
| title=title, | |
| event_type=etype, | |
| severity=classify_severity(title, etype), | |
| category=cat, | |
| lat=lat, lng=lng, | |
| location=item.get("location", item.get("place", "—")), | |
| region=classify_region(lat, lng, title), | |
| details=item.get("description", item.get("details", title)), | |
| source="Bamqam", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| except json.JSONDecodeError: | |
| continue | |
| # البحث عن بطاقات / عناصر الأحداث في HTML | |
| cards = soup.select("article, .event, .card, .item, .post, [class*='event'], [class*='alert'], [class*='incident']") | |
| for card in cards[:20]: | |
| title_el = card.select_one("h1,h2,h3,h4,.title,[class*='title'],.heading") | |
| if not title_el: | |
| continue | |
| title = title_el.get_text(strip=True) | |
| if len(title) < 5: | |
| continue | |
| desc_el = card.select_one("p,.desc,.description,.text,[class*='desc'],[class*='text']") | |
| desc = desc_el.get_text(strip=True) if desc_el else title | |
| loc_el = card.select_one("[class*='loc'],[class*='place'],[class*='region']") | |
| loc = loc_el.get_text(strip=True) if loc_el else "" | |
| # محاولة استخراج الإحداثيات | |
| lat, lng = 33.0, 44.0 | |
| data_lat = card.get("data-lat") or card.get("data-latitude") | |
| data_lng = card.get("data-lng") or card.get("data-longitude") | |
| if data_lat and data_lng: | |
| lat, lng = float(data_lat), float(data_lng) | |
| etype, cat, sev = classify_event_type(title + " " + desc) | |
| events.append(Event( | |
| id=make_event_id("bamqam", title), | |
| title=title, | |
| event_type=etype, | |
| severity=classify_severity(title + desc, etype), | |
| category=cat, | |
| lat=lat, lng=lng, | |
| location=loc or "—", | |
| region=classify_region(lat, lng, title + " " + loc), | |
| details=desc, | |
| source="Bamqam", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| # محاولة جلب API إن وُجد | |
| for api_path in ["/api/events", "/api/incidents", "/api/data", "/api/v1/events"]: | |
| try: | |
| api_resp = requests.get(f"{self.url}{api_path}", headers=HEADERS, timeout=10) | |
| if api_resp.status_code == 200: | |
| data = api_resp.json() | |
| items = data if isinstance(data, list) else data.get("data", data.get("events", data.get("items", []))) | |
| for item in items[:30]: | |
| if isinstance(item, dict): | |
| title = item.get("title", item.get("name", "")) | |
| if not title: | |
| continue | |
| lat = float(item.get("lat", item.get("latitude", 33.0))) | |
| lng = float(item.get("lng", item.get("longitude", 44.0))) | |
| etype, cat, sev = classify_event_type(title) | |
| events.append(Event( | |
| id=make_event_id("bamqam_api", title), | |
| title=title, | |
| event_type=etype, | |
| severity=classify_severity(title, etype), | |
| category=cat, | |
| lat=lat, lng=lng, | |
| location=item.get("location", "—"), | |
| region=classify_region(lat, lng, title), | |
| details=item.get("description", title), | |
| source="Bamqam", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| logger.warning(f"[Bamqam] فشل: {e}") | |
| return events | |
| class WorldMonitorScraper(BaseScraper): | |
| """جمع البيانات من WorldMonitor""" | |
| name = "WorldMonitor" | |
| url = "https://worldmonitor.app" | |
| def fetch(self) -> List[Event]: | |
| events = [] | |
| try: | |
| resp = requests.get(self.url, headers=HEADERS, timeout=15) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| # البحث في السكربتات عن بيانات | |
| for script in soup.find_all("script"): | |
| text = script.string or "" | |
| # أنماط Next.js / Nuxt.js | |
| for pattern in [ | |
| r'__NEXT_DATA__\s*=\s*({.*?})\s*;', | |
| r'__NUXT__\s*=\s*({.*?})\s*;', | |
| r'window\.__data\s*=\s*({.*?})\s*;', | |
| r'"events"\s*:\s*(\[.*?\])', | |
| r'"incidents"\s*:\s*(\[.*?\])', | |
| r'"markers"\s*:\s*(\[.*?\])', | |
| ]: | |
| matches = re.findall(pattern, text, re.DOTALL) | |
| for match in matches: | |
| try: | |
| data = json.loads(match) | |
| items = [] | |
| if isinstance(data, list): | |
| items = data | |
| elif isinstance(data, dict): | |
| # traverse nested structure | |
| for key in ["events", "incidents", "markers", "data", "props"]: | |
| if key in data: | |
| val = data[key] | |
| if isinstance(val, list): | |
| items = val | |
| break | |
| elif isinstance(val, dict): | |
| for k2 in ["events", "data", "items", "pageProps"]: | |
| if k2 in val and isinstance(val[k2], (list, dict)): | |
| if isinstance(val[k2], list): | |
| items = val[k2] | |
| elif isinstance(val[k2], dict): | |
| for k3 in val[k2]: | |
| if isinstance(val[k2][k3], list): | |
| items = val[k2][k3] | |
| break | |
| break | |
| for item in items[:30]: | |
| if not isinstance(item, dict): | |
| continue | |
| title = item.get("title", item.get("name", item.get("description", ""))) | |
| if not title or len(str(title)) < 5: | |
| continue | |
| lat = float(item.get("lat", item.get("latitude", item.get("y", 33)))) | |
| lng = float(item.get("lng", item.get("longitude", item.get("lon", item.get("x", 44))))) | |
| etype, cat, sev = classify_event_type(str(title)) | |
| events.append(Event( | |
| id=make_event_id("worldmonitor", str(title)), | |
| title=str(title), | |
| event_type=etype, | |
| severity=classify_severity(str(title), etype), | |
| category=cat, | |
| lat=lat, lng=lng, | |
| location=item.get("location", item.get("place", item.get("region", "—"))), | |
| region=classify_region(lat, lng, str(title)), | |
| details=item.get("details", item.get("description", str(title))), | |
| source="WorldMonitor", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| except (json.JSONDecodeError, ValueError): | |
| continue | |
| # HTML parsing | |
| cards = soup.select("[class*='event'],[class*='incident'],[class*='marker'],[class*='card'],[class*='alert'],article,.item") | |
| for card in cards[:20]: | |
| title_el = card.select_one("h1,h2,h3,h4,h5,.title,[class*='title']") | |
| if not title_el: | |
| continue | |
| title = title_el.get_text(strip=True) | |
| if len(title) < 5: | |
| continue | |
| etype, cat, sev = classify_event_type(title) | |
| lat = float(card.get("data-lat", 33)) | |
| lng = float(card.get("data-lng", 44)) | |
| events.append(Event( | |
| id=make_event_id("worldmonitor_html", title), | |
| title=title, | |
| event_type=etype, | |
| severity=classify_severity(title, etype), | |
| category=cat, | |
| lat=lat, lng=lng, | |
| location="—", | |
| region=classify_region(lat, lng, title), | |
| details=title, | |
| source="WorldMonitor", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| except Exception as e: | |
| logger.warning(f"[WorldMonitor] فشل: {e}") | |
| return events | |
| class ConflictlyScraper(BaseScraper): | |
| """جمع البيانات من Conflictly""" | |
| name = "Conflictly" | |
| url = "https://www.conflictly.app" | |
| def fetch(self) -> List[Event]: | |
| events = [] | |
| try: | |
| resp = requests.get(self.url, headers=HEADERS, timeout=15) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| # نفس الاستراتيجية - البحث عن JSON مضمن | |
| for script in soup.find_all("script"): | |
| text = script.string or "" | |
| for pattern in [ | |
| r'"features"\s*:\s*(\[.*?\])', | |
| r'"events"\s*:\s*(\[.*?\])', | |
| r'(?:markers|points|incidents)\s*[:=]\s*(\[.*?\])', | |
| r'__NEXT_DATA__\s*=\s*({.*?})\s*;', | |
| ]: | |
| matches = re.findall(pattern, text, re.DOTALL) | |
| for match in matches: | |
| try: | |
| data = json.loads(match) | |
| items = data if isinstance(data, list) else [] | |
| if isinstance(data, dict): | |
| # GeoJSON features | |
| items = data.get("features", data.get("events", [])) | |
| for item in items[:30]: | |
| if not isinstance(item, dict): | |
| continue | |
| # GeoJSON format | |
| props = item.get("properties", item) | |
| geom = item.get("geometry", {}) | |
| title = props.get("title", props.get("name", props.get("description", ""))) | |
| if not title: | |
| continue | |
| coords = geom.get("coordinates", []) | |
| if coords and len(coords) >= 2: | |
| lng, lat = float(coords[0]), float(coords[1]) | |
| else: | |
| lat = float(props.get("lat", props.get("latitude", 33))) | |
| lng = float(props.get("lng", props.get("longitude", 44))) | |
| etype, cat, sev = classify_event_type(str(title)) | |
| events.append(Event( | |
| id=make_event_id("conflictly", str(title)), | |
| title=str(title), | |
| event_type=etype, | |
| severity=classify_severity(str(title), etype), | |
| category=cat, | |
| lat=lat, lng=lng, | |
| location=props.get("location", props.get("place", "—")), | |
| region=classify_region(lat, lng, str(title)), | |
| details=props.get("details", str(title)), | |
| source="Conflictly", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| except (json.JSONDecodeError, ValueError): | |
| continue | |
| # API endpoints | |
| for path in ["/api/events", "/api/incidents", "/api/conflicts", "/api/v1/data"]: | |
| try: | |
| api_resp = requests.get(f"{self.url}{path}", headers={**HEADERS, "Accept": "application/json"}, timeout=10) | |
| if api_resp.status_code == 200 and "json" in api_resp.headers.get("content-type", ""): | |
| data = api_resp.json() | |
| items = data if isinstance(data, list) else data.get("data", data.get("events", [])) | |
| for item in items[:30]: | |
| if isinstance(item, dict): | |
| title = item.get("title", item.get("name", "")) | |
| if not title: | |
| continue | |
| lat = float(item.get("lat", item.get("latitude", 33))) | |
| lng = float(item.get("lng", item.get("longitude", 44))) | |
| etype, cat, sev = classify_event_type(str(title)) | |
| events.append(Event( | |
| id=make_event_id("conflictly_api", str(title)), | |
| title=str(title), | |
| event_type=etype, | |
| severity=classify_severity(str(title), etype), | |
| category=cat, | |
| lat=lat, lng=lng, | |
| location=item.get("location", "—"), | |
| region=classify_region(lat, lng, str(title)), | |
| details=item.get("description", str(title)), | |
| source="Conflictly", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| logger.warning(f"[Conflictly] فشل: {e}") | |
| return events | |
| class PizzintScraper(BaseScraper): | |
| """جمع البيانات من PIZZINT""" | |
| name = "PIZZINT" | |
| url = "https://www.pizzint.watch" | |
| def fetch(self) -> List[Event]: | |
| events = [] | |
| try: | |
| resp = requests.get(self.url, headers=HEADERS, timeout=15) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| for script in soup.find_all("script"): | |
| text = script.string or "" | |
| for pattern in [ | |
| r'"events"\s*:\s*(\[.*?\])', | |
| r'"incidents"\s*:\s*(\[.*?\])', | |
| r'"alerts"\s*:\s*(\[.*?\])', | |
| r'__NEXT_DATA__\s*=\s*({.*?})\s*</script>', | |
| ]: | |
| matches = re.findall(pattern, text, re.DOTALL) | |
| for match in matches: | |
| try: | |
| data = json.loads(match) | |
| items = data if isinstance(data, list) else [] | |
| if isinstance(data, dict): | |
| # Navigate nested | |
| for k in ["props", "pageProps", "events", "data"]: | |
| if k in data: | |
| v = data[k] | |
| if isinstance(v, list): | |
| items = v | |
| break | |
| elif isinstance(v, dict): | |
| for k2 in v: | |
| if isinstance(v[k2], list): | |
| items = v[k2] | |
| break | |
| for item in items[:30]: | |
| if not isinstance(item, dict): | |
| continue | |
| title = item.get("title", item.get("name", "")) | |
| if not title: | |
| continue | |
| lat = float(item.get("lat", item.get("latitude", 33))) | |
| lng = float(item.get("lng", item.get("longitude", 44))) | |
| etype, cat, sev = classify_event_type(str(title)) | |
| events.append(Event( | |
| id=make_event_id("pizzint", str(title)), | |
| title=str(title), | |
| event_type=etype, | |
| severity=classify_severity(str(title), etype), | |
| category=cat, | |
| lat=lat, lng=lng, | |
| location=item.get("location", "—"), | |
| region=classify_region(lat, lng, str(title)), | |
| details=item.get("description", str(title)), | |
| source="PIZZINT", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| except (json.JSONDecodeError, ValueError): | |
| continue | |
| # HTML fallback | |
| cards = soup.select("[class*='event'],[class*='alert'],[class*='incident'],[class*='card'],article,.post") | |
| for card in cards[:15]: | |
| title_el = card.select_one("h1,h2,h3,h4,.title,[class*='title']") | |
| if not title_el: | |
| continue | |
| title = title_el.get_text(strip=True) | |
| if len(title) < 5: | |
| continue | |
| etype, cat, sev = classify_event_type(title) | |
| events.append(Event( | |
| id=make_event_id("pizzint_html", title), | |
| title=title, | |
| event_type=etype, | |
| severity=classify_severity(title, etype), | |
| category=cat, | |
| lat=33.0, lng=44.0, | |
| location="—", | |
| region=classify_region(33, 44, title), | |
| details=title, | |
| source="PIZZINT", | |
| source_url=self.url, | |
| timestamp=time.time(), | |
| )) | |
| except Exception as e: | |
| logger.warning(f"[PIZZINT] فشل: {e}") | |
| return events | |
| class ADSBExchangeScraper(BaseScraper): | |
| """جمع بيانات الطيران من ADS-B Exchange""" | |
| name = "ADS-B Exchange" | |
| url = "https://globe.adsbexchange.com" | |
| # نقاط الاهتمام العسكرية | |
| MILITARY_BOUNDS = [ | |
| {"name": "شرق المتوسط", "lat1": 32, "lat2": 37, "lng1": 30, "lng2": 37}, | |
| {"name": "الخليج العربي", "lat1": 23, "lat2": 32, "lng1": 45, "lng2": 57}, | |
| {"name": "البحر الأحمر", "lat1": 12, "lat2": 22, "lng1": 38, "lng2": 46}, | |
| {"name": "البحر الأسود", "lat1": 41, "lat2": 47, "lng1": 27, "lng2": 42}, | |
| ] | |
| MILITARY_TYPES = { | |
| "C17": ("شحن عسكري", "cargo"), "C130": ("شحن عسكري", "cargo"), | |
| "C5": ("شحن عسكري", "cargo"), "KC135": ("تزويد بالوقود", "military"), | |
| "KC10": ("تزويد بالوقود", "military"), "KC46": ("تزويد بالوقود", "military"), | |
| "E3": ("إنذار مبكر", "recon"), "E8": ("استطلاع", "recon"), | |
| "RC135": ("استطلاع إلكتروني", "recon"), "EP3": ("استطلاع", "recon"), | |
| "P8": ("مراقبة بحرية", "recon"), "P3": ("مراقبة بحرية", "recon"), | |
| "RQ4": ("استطلاع بدون طيار", "drone"), "MQ9": ("مسيّرة هجومية", "drone"), | |
| "MQ1": ("مسيّرة", "drone"), "FORTE": ("استطلاع", "recon"), | |
| "F15": ("مقاتلة", "military"), "F16": ("مقاتلة", "military"), | |
| "F18": ("مقاتلة", "military"), "F22": ("مقاتلة", "military"), | |
| "F35": ("مقاتلة", "military"), "B52": ("قاذفة", "military"), | |
| "B1": ("قاذفة", "military"), "B2": ("قاذفة", "military"), | |
| "A10": ("هجوم أرضي", "military"), "AH64": ("هليكوبتر هجومية", "military"), | |
| "V22": ("أوسبري", "military"), "GLEX": ("استطلاع", "recon"), | |
| } | |
| MILITARY_CALLSIGN_PREFIXES = [ | |
| "FORTE", "JAKE", "VIPER", "REAP", "RCH", "HAWK", "DUKE", | |
| "NAVY", "ARMY", "EVAC", "REACH", "SKULL", "DOOM", "FURY", | |
| "RAGE", "IRON", "BOLT", "STEEL", "COBRA", "TIGER", "EAGLE", | |
| "SPARTN", "TOPCAT", "WRATH", "ROGUE", "SABER", "LANCE", | |
| "DEMON", "GHOST", "STING", "TORCH", "ATLAS", "GIANT", | |
| ] | |
| def fetch(self) -> List[Flight]: | |
| flights = [] | |
| for bound in self.MILITARY_BOUNDS: | |
| try: | |
| # ADS-B Exchange V2 API format | |
| lat_c = (bound["lat1"] + bound["lat2"]) / 2 | |
| lng_c = (bound["lng1"] + bound["lng2"]) / 2 | |
| api_url = f"https://globe.adsbexchange.com/data/aircraft.json" | |
| params = { | |
| "lat": lat_c, | |
| "lon": lng_c, | |
| "range": 500, | |
| } | |
| resp = requests.get(api_url, params=params, headers={ | |
| **HEADERS, | |
| "Referer": "https://globe.adsbexchange.com/", | |
| }, timeout=15) | |
| if resp.status_code != 200: | |
| # Try alternative endpoints | |
| for alt_url in [ | |
| f"https://globe.adsbexchange.com/data/globe_{int(lat_c)}_{int(lng_c)}.json", | |
| f"https://adsbexchange.com/api/aircraft/json/lat/{lat_c}/lon/{lng_c}/dist/500/", | |
| ]: | |
| try: | |
| resp = requests.get(alt_url, headers=HEADERS, timeout=10) | |
| if resp.status_code == 200: | |
| break | |
| except Exception: | |
| continue | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| ac_list = data.get("ac", data.get("aircraft", data.get("acList", []))) | |
| for ac in ac_list: | |
| if not isinstance(ac, dict): | |
| continue | |
| callsign = (ac.get("flight", ac.get("call", ac.get("Icao", "")))).strip() | |
| ac_type = ac.get("t", ac.get("type", ac.get("Mdl", ""))) | |
| alt = ac.get("alt_baro", ac.get("Alt", ac.get("altitude", 0))) | |
| speed = ac.get("gs", ac.get("Spd", ac.get("speed", 0))) | |
| lat = ac.get("lat", ac.get("Lat", 0)) | |
| lng = ac.get("lon", ac.get("Long", ac.get("lng", 0))) | |
| heading = ac.get("track", ac.get("Trak", ac.get("heading", 0))) | |
| hex_code = ac.get("hex", ac.get("Icao", "")) | |
| squawk = ac.get("squawk", ac.get("Sqk", "")) | |
| if not callsign and not ac_type: | |
| continue | |
| if not lat or not lng: | |
| continue | |
| # فلترة: هل هي طائرة عسكرية؟ | |
| is_military = False | |
| flight_type = "military" | |
| aircraft_name = ac_type | |
| # تحقق من البادئة | |
| for prefix in self.MILITARY_CALLSIGN_PREFIXES: | |
| if callsign.upper().startswith(prefix): | |
| is_military = True | |
| break | |
| # تحقق من نوع الطائرة | |
| for mil_type, (type_name, ft) in self.MILITARY_TYPES.items(): | |
| if mil_type in (ac_type or "").upper() or mil_type in callsign.upper(): | |
| is_military = True | |
| aircraft_name = type_name | |
| flight_type = ft | |
| break | |
| # تحقق من squawk العسكري | |
| if squawk in ["7700", "7600", "7500"] or (squawk and squawk.startswith("0")): | |
| is_military = True | |
| # لا ترسبوندر = محتمل عسكري | |
| if not squawk and alt and int(str(alt).replace("ground","0")) > 25000: | |
| is_military = True | |
| if is_military: | |
| flights.append(Flight( | |
| callsign=callsign or hex_code, | |
| aircraft_type=aircraft_name or ac_type or "غير معروف", | |
| altitude=f"{alt} ft" if alt else "—", | |
| speed=f"{speed} kts" if speed else "—", | |
| lat=float(lat), | |
| lng=float(lng), | |
| heading=f"{heading}°" if heading else "—", | |
| origin=bound["name"], | |
| flight_type=flight_type, | |
| source="ADS-B Exchange", | |
| hex_code=hex_code, | |
| squawk=squawk, | |
| timestamp=time.time(), | |
| )) | |
| except Exception as e: | |
| logger.warning(f"[ADS-B] منطقة {bound['name']}: {e}") | |
| return flights | |
| class FlightRadarScraper(BaseScraper): | |
| """جمع بيانات الطيران من Flightradar24""" | |
| name = "Flightradar24" | |
| url = "https://www.flightradar24.com" | |
| # مناطق المراقبة | |
| ZONES = [ | |
| {"name": "الشرق الأوسط", "bounds": "37,25,55,32"}, | |
| {"name": "شرق المتوسط", "bounds": "33,30,37,36"}, | |
| {"name": "البحر الأحمر", "bounds": "12,38,22,46"}, | |
| ] | |
| def fetch(self) -> List[Flight]: | |
| flights = [] | |
| for zone in self.ZONES: | |
| try: | |
| bounds = zone["bounds"] | |
| api_url = f"https://data-cloud.flightradar24.com/zones/fcgi/feed.js" | |
| params = { | |
| "bounds": bounds, | |
| "faa": "1", | |
| "satellite": "1", | |
| "mlat": "1", | |
| "flarm": "0", | |
| "adsb": "1", | |
| "gnd": "0", | |
| "air": "1", | |
| "vehicles": "0", | |
| "estimated": "0", | |
| "gliders": "0", | |
| "stats": "0", | |
| } | |
| resp = requests.get(api_url, params=params, headers={ | |
| **HEADERS, | |
| "Referer": "https://www.flightradar24.com/", | |
| "Origin": "https://www.flightradar24.com", | |
| }, timeout=15) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| for key, val in data.items(): | |
| if key in ("full_count", "version", "stats"): | |
| continue | |
| if not isinstance(val, list) or len(val) < 10: | |
| continue | |
| # FR24 format: [hex, lat, lng, heading, alt, speed, squawk, radar, type, reg, timestamp, origin, dest, flight, ?, ?, callsign, ...] | |
| try: | |
| hex_code = val[0] if len(val) > 0 else "" | |
| lat = float(val[1]) if len(val) > 1 else 0 | |
| lng = float(val[2]) if len(val) > 2 else 0 | |
| heading = val[3] if len(val) > 3 else 0 | |
| alt = val[4] if len(val) > 4 else 0 | |
| speed = val[5] if len(val) > 5 else 0 | |
| squawk = str(val[6]) if len(val) > 6 else "" | |
| ac_type = val[8] if len(val) > 8 else "" | |
| callsign = str(val[16] if len(val) > 16 else val[13] if len(val) > 13 else "") | |
| if not lat or not lng: | |
| continue | |
| # فلترة عسكرية | |
| is_military = False | |
| flight_type = "military" | |
| aircraft_name = ac_type | |
| for mil_type, (tname, ft) in ADSBExchangeScraper.MILITARY_TYPES.items(): | |
| if mil_type in (ac_type or "").upper() or mil_type in callsign.upper(): | |
| is_military = True | |
| aircraft_name = tname | |
| flight_type = ft | |
| break | |
| for prefix in ADSBExchangeScraper.MILITARY_CALLSIGN_PREFIXES: | |
| if callsign.upper().startswith(prefix): | |
| is_military = True | |
| break | |
| # squawk 0000 or military ranges | |
| if squawk and (squawk == "0000" or squawk.startswith("0") or squawk in ["7700","7600","7500"]): | |
| is_military = True | |
| if is_military: | |
| flights.append(Flight( | |
| callsign=callsign.strip() or hex_code, | |
| aircraft_type=aircraft_name or ac_type or "غير معروف", | |
| altitude=f"{alt} ft", | |
| speed=f"{speed} kts", | |
| lat=float(lat), | |
| lng=float(lng), | |
| heading=f"{heading}°", | |
| origin=zone["name"], | |
| flight_type=flight_type, | |
| source="Flightradar24", | |
| hex_code=str(hex_code), | |
| squawk=squawk, | |
| timestamp=time.time(), | |
| )) | |
| except (ValueError, IndexError): | |
| continue | |
| except Exception as e: | |
| logger.warning(f"[FR24] منطقة {zone['name']}: {e}") | |
| return flights | |
| # ===== مدير البيانات المركزي ===== | |
| class DataManager: | |
| """يدير جمع وتخزين وفلترة كل البيانات""" | |
| def __init__(self): | |
| self.events: List[Event] = [] | |
| self.flights: List[Flight] = [] | |
| self.source_status: Dict[str, Dict] = {} | |
| self.last_update: float = 0 | |
| self.is_fetching: bool = False | |
| self.lock = threading.Lock() | |
| self.event_scrapers = [ | |
| BamqamScraper(), | |
| WorldMonitorScraper(), | |
| ConflictlyScraper(), | |
| PizzintScraper(), | |
| ] | |
| self.flight_scrapers = [ | |
| ADSBExchangeScraper(), | |
| FlightRadarScraper(), | |
| ] | |
| def fetch_all(self): | |
| """جلب كل البيانات من جميع المصادر""" | |
| if self.is_fetching: | |
| return | |
| self.is_fetching = True | |
| new_events = [] | |
| new_flights = [] | |
| # جمع الأحداث | |
| for scraper in self.event_scrapers: | |
| items = scraper.safe_fetch() | |
| new_events.extend(items) | |
| self.source_status[scraper.name] = { | |
| "status": scraper.status, | |
| "count": len(items), | |
| "last_fetch": scraper.last_fetch, | |
| "error": scraper.error_msg, | |
| "url": scraper.url, | |
| } | |
| # جمع بيانات الطيران | |
| for scraper in self.flight_scrapers: | |
| items = scraper.safe_fetch() | |
| if isinstance(items, list) and len(items) > 0: | |
| if isinstance(items[0], Flight): | |
| new_flights.extend(items) | |
| else: | |
| # convert events if returned | |
| new_events.extend([e for e in items if isinstance(e, Event)]) | |
| self.source_status[scraper.name] = { | |
| "status": scraper.status, | |
| "count": len(items), | |
| "last_fetch": scraper.last_fetch, | |
| "error": scraper.error_msg, | |
| "url": scraper.url, | |
| } | |
| # حذف التكرارات ودمج البيانات | |
| with self.lock: | |
| # حذف الأحداث المنتهية (أكثر من 24 ساعة) | |
| existing = {e.id: e for e in self.events if not e.is_expired} | |
| for ev in new_events: | |
| if ev.id not in existing: | |
| existing[ev.id] = ev | |
| self.events = list(existing.values()) | |
| # تحديث الطائرات (استبدال كامل) | |
| seen_flights = {} | |
| for fl in new_flights: | |
| key = fl.callsign or fl.hex_code | |
| if key and key not in seen_flights: | |
| seen_flights[key] = fl | |
| if seen_flights: | |
| self.flights = list(seen_flights.values()) | |
| self.last_update = time.time() | |
| self.is_fetching = False | |
| logger.info(f"تم التحديث: {len(self.events)} حدث, {len(self.flights)} طائرة") | |
| def get_events(self, region="all", severity=None, event_type=None, search=""): | |
| """جلب الأحداث مع الفلترة""" | |
| with self.lock: | |
| filtered = [e for e in self.events if not e.is_expired] | |
| if region and region != "all" and region != "جميع المناطق": | |
| region_key = {v: k for k, v in REGION_MAP.items()}.get(region, region) | |
| filtered = [e for e in filtered if e.region == region_key] | |
| if severity: | |
| sev_map = {"🔴 حرج": "critical", "🟠 مرتفع": "high", "🟡 متوسط": "medium", "🔵 منخفض": "low"} | |
| active = [sev_map.get(s, s) for s in severity] | |
| filtered = [e for e in filtered if e.severity in active] | |
| if event_type: | |
| type_map = {"💥 غارات": "strike", "🚀 صواريخ": "missile", "✈️ طيران": "aircraft", "🎖️ قوات": "troops", "🚢 بحري": "naval"} | |
| active = [type_map.get(t, t) for t in event_type] | |
| filtered = [e for e in filtered if e.event_type in active] | |
| if search: | |
| filtered = [e for e in filtered if search in e.title or search in e.location or search in e.details] | |
| filtered.sort(key=lambda x: x.timestamp, reverse=True) | |
| return filtered | |
| def get_flights(self): | |
| """جلب بيانات الطيران""" | |
| with self.lock: | |
| return list(self.flights) | |
| def get_source_status(self): | |
| return dict(self.source_status) | |
| def get_stats(self): | |
| events = [e for e in self.events if not e.is_expired] | |
| return { | |
| "total": len(events), | |
| "strikes": len([e for e in events if e.event_type == "strike"]), | |
| "missiles": len([e for e in events if e.event_type == "missile"]), | |
| "aircraft": len([e for e in events if e.event_type == "aircraft"]), | |
| "troops": len([e for e in events if e.event_type == "troops"]), | |
| "naval": len([e for e in events if e.event_type == "naval"]), | |
| "critical": len([e for e in events if e.severity == "critical"]), | |
| "flights": len(self.flights), | |
| } | |
| REGION_MAP = { | |
| "all": "جميع المناطق", | |
| "middleeast": "الشرق الأوسط", | |
| "europe": "أوروبا الشرقية", | |
| "africa": "شمال أفريقيا", | |
| "redSea": "البحر الأحمر", | |
| "asia": "جنوب آسيا", | |
| } | |