""" وحدة جمع البيانات الحقيقية من المصادر الست Real-time data scraping from 6 conflict monitoring sources """ import requests from bs4 import BeautifulSoup import json import re import time import hashlib import logging from datetime import datetime, timedelta from dataclasses import dataclass, field, asdict from typing import List, Optional, Dict from abc import ABC, abstractmethod import threading import traceback logging.basicConfig(level=logging.INFO) logger = logging.getLogger("scraper") # ===== نموذج البيانات ===== @dataclass class Event: id: str title: str event_type: str # strike, missile, aircraft, troops, naval, drone, defense severity: str # critical, high, medium, low category: str # غارة جوية, صاروخ, etc lat: float lng: float location: str region: str # middleeast, europe, africa, redSea, asia details: str source: str source_url: str = "" timestamp: float = 0 # unix timestamp raw_time: str = "" extra: Dict = field(default_factory=dict) def to_dict(self): return asdict(self) @property def minutes_ago(self): if self.timestamp > 0: return max(1, int((time.time() - self.timestamp) / 60)) return 30 @property def is_expired(self): """الحدث ينتهي بعد 24 ساعة""" return self.minutes_ago > 1440 @dataclass class Flight: callsign: str aircraft_type: str altitude: str speed: str lat: float lng: float heading: str origin: str flight_type: str # military, recon, cargo, drone source: str hex_code: str = "" squawk: str = "" timestamp: float = 0 def to_dict(self): return asdict(self) # ===== مصنف المنطقة الجغرافية ===== def classify_region(lat: float, lng: float, text: str = "") -> str: text_lower = text.lower() if text else "" # بالنص me_keywords = ["syria", "iraq", "iran", "yemen", "lebanon", "gaza", "israel", "سوريا", "عراق", "إيران", "يمن", "لبنان", "غزة", "فلسطين", "jordan", "الأردن", "qatar", "قطر", "saudi", "سعودي", "bahrain", "kuwait", "كويت", "uae", "إمارات", "oman", "عمان", "turkey", "تركيا"] eu_keywords = ["ukraine", "أوكرانيا", "russia", "روسيا", "donbas", "دونباس", "crimea", "القرم", "belgorod", "بيلغورود", "poland", "romania"] af_keywords = ["sudan", "سودان", "libya", "ليبيا", "ethiopia", "إثيوبيا", "somalia", "صومال", "mali", "niger", "chad", "تشاد"] rs_keywords = ["red sea", "البحر الأحمر", "bab al-mandab", "باب المندب", "houthi", "حوثي", "aden", "عدن"] for kw in rs_keywords: if kw in text_lower: return "redSea" for kw in me_keywords: if kw in text_lower: return "middleeast" for kw in eu_keywords: if kw in text_lower: return "europe" for kw in af_keywords: if kw in text_lower: return "africa" # بالإحداثيات if 10 <= lat <= 42 and 25 <= lng <= 63: if 12 <= lat <= 20 and 38 <= lng <= 50: return "redSea" return "middleeast" elif 44 <= lat <= 60 and 22 <= lng <= 45: return "europe" elif -5 <= lat <= 35 and -20 <= lng <= 55 and lat < 30: return "africa" elif 5 <= lat <= 40 and 63 <= lng <= 100: return "asia" return "middleeast" # ===== مصنف نوع الحدث ===== def classify_event_type(text: str) -> tuple: text_lower = text.lower() strike_kw = ["airstrike", "strike", "bombing", "shelling", "bombardment", "explosion", "غارة", "قصف", "ضربة", "انفجار", "تدمير", "استهداف", "air raid"] missile_kw = ["missile", "rocket", "intercept", "ballistic", "cruise", "صاروخ", "اعتراض", "باليستي", "كروز", "إطلاق صاروخ"] aircraft_kw = ["aircraft", "plane", "jet", "fighter", "drone", "uav", "flight", "طائرة", "مقاتلة", "تحليق", "طيران", "مسيّرة"] troops_kw = ["troops", "forces", "military movement", "convoy", "deployment", "قوات", "تحركات", "رتل", "انتشار", "مدرعات", "مشاة", "تعزيز"] naval_kw = ["naval", "ship", "carrier", "destroyer", "navy", "maritime", "بحري", "سفينة", "حاملة", "مدمرة", "بارجة", "بحر"] defense_kw = ["defense", "air defense", "interception", "iron dome", "patriot", "دفاع", "دفاع جوي", "اعتراض", "قبة حديدية"] for kw in missile_kw: if kw in text_lower: return ("missile", "صاروخ", "high") for kw in strike_kw: if kw in text_lower: return ("strike", "غارة جوية", "critical") for kw in defense_kw: if kw in text_lower: return ("missile", "اعتراض / دفاع", "high") for kw in aircraft_kw: if kw in text_lower: return ("aircraft", "حركة طيران", "medium") for kw in naval_kw: if kw in text_lower: return ("naval", "نشاط بحري", "medium") for kw in troops_kw: if kw in text_lower: return ("troops", "تحرك قوات", "high") return ("strike", "حدث عسكري", "medium") # ===== مصنف الخطورة ===== def classify_severity(text: str, event_type: str) -> str: text_lower = text.lower() critical_kw = ["breaking", "urgent", "عاجل", "critical", "mass", "multiple", "killed", "casualties", "قتلى", "شهداء", "مجزرة", "كبير"] high_kw = ["alert", "warning", "تنبيه", "تحذير", "confirmed", "مؤكد"] for kw in critical_kw: if kw in text_lower: return "critical" for kw in high_kw: if kw in text_lower: return "high" if event_type in ("strike", "missile"): return "high" return "medium" # ===== معرّف فريد ===== def make_event_id(source: str, text: str) -> str: raw = f"{source}:{text[:80]}" return hashlib.md5(raw.encode()).hexdigest()[:12] # ===== رؤوس الطلبات ===== HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9,ar;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", } # ===== الكاشطات ===== class BaseScraper(ABC): """كاشط أساسي""" name: str = "Unknown" url: str = "" status: str = "offline" last_fetch: float = 0 error_msg: str = "" @abstractmethod def fetch(self) -> List[Event]: pass def safe_fetch(self) -> List[Event]: try: logger.info(f"[{self.name}] جارٍ الجمع...") events = self.fetch() self.status = "online" self.last_fetch = time.time() self.error_msg = "" logger.info(f"[{self.name}] تم جمع {len(events)} حدث") return events except Exception as e: self.status = "error" self.error_msg = str(e) logger.error(f"[{self.name}] خطأ: {e}") return [] class BamqamScraper(BaseScraper): """جمع البيانات من Bamqam""" name = "Bamqam" url = "https://bamqam.com" def fetch(self) -> List[Event]: events = [] try: # محاولة جلب الصفحة الرئيسية resp = requests.get(self.url, headers=HEADERS, timeout=15) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") # البحث عن بيانات JSON مضمنة في السكربتات scripts = soup.find_all("script") for script in scripts: text = script.string or "" # البحث عن أنماط بيانات JSON json_matches = re.findall(r'(?:events|data|items)\s*[:=]\s*(\[.*?\])', text, re.DOTALL) for match in json_matches: try: items = json.loads(match) for item in items: if isinstance(item, dict): title = item.get("title", item.get("name", item.get("text", ""))) if not title: continue lat = float(item.get("lat", item.get("latitude", 33.0))) lng = float(item.get("lng", item.get("longitude", 44.0))) etype, cat, sev = classify_event_type(title) events.append(Event( id=make_event_id("bamqam", title), title=title, event_type=etype, severity=classify_severity(title, etype), category=cat, lat=lat, lng=lng, location=item.get("location", item.get("place", "—")), region=classify_region(lat, lng, title), details=item.get("description", item.get("details", title)), source="Bamqam", source_url=self.url, timestamp=time.time(), )) except json.JSONDecodeError: continue # البحث عن بطاقات / عناصر الأحداث في HTML cards = soup.select("article, .event, .card, .item, .post, [class*='event'], [class*='alert'], [class*='incident']") for card in cards[:20]: title_el = card.select_one("h1,h2,h3,h4,.title,[class*='title'],.heading") if not title_el: continue title = title_el.get_text(strip=True) if len(title) < 5: continue desc_el = card.select_one("p,.desc,.description,.text,[class*='desc'],[class*='text']") desc = desc_el.get_text(strip=True) if desc_el else title loc_el = card.select_one("[class*='loc'],[class*='place'],[class*='region']") loc = loc_el.get_text(strip=True) if loc_el else "" # محاولة استخراج الإحداثيات lat, lng = 33.0, 44.0 data_lat = card.get("data-lat") or card.get("data-latitude") data_lng = card.get("data-lng") or card.get("data-longitude") if data_lat and data_lng: lat, lng = float(data_lat), float(data_lng) etype, cat, sev = classify_event_type(title + " " + desc) events.append(Event( id=make_event_id("bamqam", title), title=title, event_type=etype, severity=classify_severity(title + desc, etype), category=cat, lat=lat, lng=lng, location=loc or "—", region=classify_region(lat, lng, title + " " + loc), details=desc, source="Bamqam", source_url=self.url, timestamp=time.time(), )) # محاولة جلب API إن وُجد for api_path in ["/api/events", "/api/incidents", "/api/data", "/api/v1/events"]: try: api_resp = requests.get(f"{self.url}{api_path}", headers=HEADERS, timeout=10) if api_resp.status_code == 200: data = api_resp.json() items = data if isinstance(data, list) else data.get("data", data.get("events", data.get("items", []))) for item in items[:30]: if isinstance(item, dict): title = item.get("title", item.get("name", "")) if not title: continue lat = float(item.get("lat", item.get("latitude", 33.0))) lng = float(item.get("lng", item.get("longitude", 44.0))) etype, cat, sev = classify_event_type(title) events.append(Event( id=make_event_id("bamqam_api", title), title=title, event_type=etype, severity=classify_severity(title, etype), category=cat, lat=lat, lng=lng, location=item.get("location", "—"), region=classify_region(lat, lng, title), details=item.get("description", title), source="Bamqam", source_url=self.url, timestamp=time.time(), )) except Exception: continue except Exception as e: logger.warning(f"[Bamqam] فشل: {e}") return events class WorldMonitorScraper(BaseScraper): """جمع البيانات من WorldMonitor""" name = "WorldMonitor" url = "https://worldmonitor.app" def fetch(self) -> List[Event]: events = [] try: resp = requests.get(self.url, headers=HEADERS, timeout=15) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") # البحث في السكربتات عن بيانات for script in soup.find_all("script"): text = script.string or "" # أنماط Next.js / Nuxt.js for pattern in [ r'__NEXT_DATA__\s*=\s*({.*?})\s*;', r'__NUXT__\s*=\s*({.*?})\s*;', r'window\.__data\s*=\s*({.*?})\s*;', r'"events"\s*:\s*(\[.*?\])', r'"incidents"\s*:\s*(\[.*?\])', r'"markers"\s*:\s*(\[.*?\])', ]: matches = re.findall(pattern, text, re.DOTALL) for match in matches: try: data = json.loads(match) items = [] if isinstance(data, list): items = data elif isinstance(data, dict): # traverse nested structure for key in ["events", "incidents", "markers", "data", "props"]: if key in data: val = data[key] if isinstance(val, list): items = val break elif isinstance(val, dict): for k2 in ["events", "data", "items", "pageProps"]: if k2 in val and isinstance(val[k2], (list, dict)): if isinstance(val[k2], list): items = val[k2] elif isinstance(val[k2], dict): for k3 in val[k2]: if isinstance(val[k2][k3], list): items = val[k2][k3] break break for item in items[:30]: if not isinstance(item, dict): continue title = item.get("title", item.get("name", item.get("description", ""))) if not title or len(str(title)) < 5: continue lat = float(item.get("lat", item.get("latitude", item.get("y", 33)))) lng = float(item.get("lng", item.get("longitude", item.get("lon", item.get("x", 44))))) etype, cat, sev = classify_event_type(str(title)) events.append(Event( id=make_event_id("worldmonitor", str(title)), title=str(title), event_type=etype, severity=classify_severity(str(title), etype), category=cat, lat=lat, lng=lng, location=item.get("location", item.get("place", item.get("region", "—"))), region=classify_region(lat, lng, str(title)), details=item.get("details", item.get("description", str(title))), source="WorldMonitor", source_url=self.url, timestamp=time.time(), )) except (json.JSONDecodeError, ValueError): continue # HTML parsing cards = soup.select("[class*='event'],[class*='incident'],[class*='marker'],[class*='card'],[class*='alert'],article,.item") for card in cards[:20]: title_el = card.select_one("h1,h2,h3,h4,h5,.title,[class*='title']") if not title_el: continue title = title_el.get_text(strip=True) if len(title) < 5: continue etype, cat, sev = classify_event_type(title) lat = float(card.get("data-lat", 33)) lng = float(card.get("data-lng", 44)) events.append(Event( id=make_event_id("worldmonitor_html", title), title=title, event_type=etype, severity=classify_severity(title, etype), category=cat, lat=lat, lng=lng, location="—", region=classify_region(lat, lng, title), details=title, source="WorldMonitor", source_url=self.url, timestamp=time.time(), )) except Exception as e: logger.warning(f"[WorldMonitor] فشل: {e}") return events class ConflictlyScraper(BaseScraper): """جمع البيانات من Conflictly""" name = "Conflictly" url = "https://www.conflictly.app" def fetch(self) -> List[Event]: events = [] try: resp = requests.get(self.url, headers=HEADERS, timeout=15) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") # نفس الاستراتيجية - البحث عن JSON مضمن for script in soup.find_all("script"): text = script.string or "" for pattern in [ r'"features"\s*:\s*(\[.*?\])', r'"events"\s*:\s*(\[.*?\])', r'(?:markers|points|incidents)\s*[:=]\s*(\[.*?\])', r'__NEXT_DATA__\s*=\s*({.*?})\s*;', ]: matches = re.findall(pattern, text, re.DOTALL) for match in matches: try: data = json.loads(match) items = data if isinstance(data, list) else [] if isinstance(data, dict): # GeoJSON features items = data.get("features", data.get("events", [])) for item in items[:30]: if not isinstance(item, dict): continue # GeoJSON format props = item.get("properties", item) geom = item.get("geometry", {}) title = props.get("title", props.get("name", props.get("description", ""))) if not title: continue coords = geom.get("coordinates", []) if coords and len(coords) >= 2: lng, lat = float(coords[0]), float(coords[1]) else: lat = float(props.get("lat", props.get("latitude", 33))) lng = float(props.get("lng", props.get("longitude", 44))) etype, cat, sev = classify_event_type(str(title)) events.append(Event( id=make_event_id("conflictly", str(title)), title=str(title), event_type=etype, severity=classify_severity(str(title), etype), category=cat, lat=lat, lng=lng, location=props.get("location", props.get("place", "—")), region=classify_region(lat, lng, str(title)), details=props.get("details", str(title)), source="Conflictly", source_url=self.url, timestamp=time.time(), )) except (json.JSONDecodeError, ValueError): continue # API endpoints for path in ["/api/events", "/api/incidents", "/api/conflicts", "/api/v1/data"]: try: api_resp = requests.get(f"{self.url}{path}", headers={**HEADERS, "Accept": "application/json"}, timeout=10) if api_resp.status_code == 200 and "json" in api_resp.headers.get("content-type", ""): data = api_resp.json() items = data if isinstance(data, list) else data.get("data", data.get("events", [])) for item in items[:30]: if isinstance(item, dict): title = item.get("title", item.get("name", "")) if not title: continue lat = float(item.get("lat", item.get("latitude", 33))) lng = float(item.get("lng", item.get("longitude", 44))) etype, cat, sev = classify_event_type(str(title)) events.append(Event( id=make_event_id("conflictly_api", str(title)), title=str(title), event_type=etype, severity=classify_severity(str(title), etype), category=cat, lat=lat, lng=lng, location=item.get("location", "—"), region=classify_region(lat, lng, str(title)), details=item.get("description", str(title)), source="Conflictly", source_url=self.url, timestamp=time.time(), )) except Exception: continue except Exception as e: logger.warning(f"[Conflictly] فشل: {e}") return events class PizzintScraper(BaseScraper): """جمع البيانات من PIZZINT""" name = "PIZZINT" url = "https://www.pizzint.watch" def fetch(self) -> List[Event]: events = [] try: resp = requests.get(self.url, headers=HEADERS, timeout=15) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") for script in soup.find_all("script"): text = script.string or "" for pattern in [ r'"events"\s*:\s*(\[.*?\])', r'"incidents"\s*:\s*(\[.*?\])', r'"alerts"\s*:\s*(\[.*?\])', r'__NEXT_DATA__\s*=\s*({.*?})\s*', ]: matches = re.findall(pattern, text, re.DOTALL) for match in matches: try: data = json.loads(match) items = data if isinstance(data, list) else [] if isinstance(data, dict): # Navigate nested for k in ["props", "pageProps", "events", "data"]: if k in data: v = data[k] if isinstance(v, list): items = v break elif isinstance(v, dict): for k2 in v: if isinstance(v[k2], list): items = v[k2] break for item in items[:30]: if not isinstance(item, dict): continue title = item.get("title", item.get("name", "")) if not title: continue lat = float(item.get("lat", item.get("latitude", 33))) lng = float(item.get("lng", item.get("longitude", 44))) etype, cat, sev = classify_event_type(str(title)) events.append(Event( id=make_event_id("pizzint", str(title)), title=str(title), event_type=etype, severity=classify_severity(str(title), etype), category=cat, lat=lat, lng=lng, location=item.get("location", "—"), region=classify_region(lat, lng, str(title)), details=item.get("description", str(title)), source="PIZZINT", source_url=self.url, timestamp=time.time(), )) except (json.JSONDecodeError, ValueError): continue # HTML fallback cards = soup.select("[class*='event'],[class*='alert'],[class*='incident'],[class*='card'],article,.post") for card in cards[:15]: title_el = card.select_one("h1,h2,h3,h4,.title,[class*='title']") if not title_el: continue title = title_el.get_text(strip=True) if len(title) < 5: continue etype, cat, sev = classify_event_type(title) events.append(Event( id=make_event_id("pizzint_html", title), title=title, event_type=etype, severity=classify_severity(title, etype), category=cat, lat=33.0, lng=44.0, location="—", region=classify_region(33, 44, title), details=title, source="PIZZINT", source_url=self.url, timestamp=time.time(), )) except Exception as e: logger.warning(f"[PIZZINT] فشل: {e}") return events class ADSBExchangeScraper(BaseScraper): """جمع بيانات الطيران من ADS-B Exchange""" name = "ADS-B Exchange" url = "https://globe.adsbexchange.com" # نقاط الاهتمام العسكرية MILITARY_BOUNDS = [ {"name": "شرق المتوسط", "lat1": 32, "lat2": 37, "lng1": 30, "lng2": 37}, {"name": "الخليج العربي", "lat1": 23, "lat2": 32, "lng1": 45, "lng2": 57}, {"name": "البحر الأحمر", "lat1": 12, "lat2": 22, "lng1": 38, "lng2": 46}, {"name": "البحر الأسود", "lat1": 41, "lat2": 47, "lng1": 27, "lng2": 42}, ] MILITARY_TYPES = { "C17": ("شحن عسكري", "cargo"), "C130": ("شحن عسكري", "cargo"), "C5": ("شحن عسكري", "cargo"), "KC135": ("تزويد بالوقود", "military"), "KC10": ("تزويد بالوقود", "military"), "KC46": ("تزويد بالوقود", "military"), "E3": ("إنذار مبكر", "recon"), "E8": ("استطلاع", "recon"), "RC135": ("استطلاع إلكتروني", "recon"), "EP3": ("استطلاع", "recon"), "P8": ("مراقبة بحرية", "recon"), "P3": ("مراقبة بحرية", "recon"), "RQ4": ("استطلاع بدون طيار", "drone"), "MQ9": ("مسيّرة هجومية", "drone"), "MQ1": ("مسيّرة", "drone"), "FORTE": ("استطلاع", "recon"), "F15": ("مقاتلة", "military"), "F16": ("مقاتلة", "military"), "F18": ("مقاتلة", "military"), "F22": ("مقاتلة", "military"), "F35": ("مقاتلة", "military"), "B52": ("قاذفة", "military"), "B1": ("قاذفة", "military"), "B2": ("قاذفة", "military"), "A10": ("هجوم أرضي", "military"), "AH64": ("هليكوبتر هجومية", "military"), "V22": ("أوسبري", "military"), "GLEX": ("استطلاع", "recon"), } MILITARY_CALLSIGN_PREFIXES = [ "FORTE", "JAKE", "VIPER", "REAP", "RCH", "HAWK", "DUKE", "NAVY", "ARMY", "EVAC", "REACH", "SKULL", "DOOM", "FURY", "RAGE", "IRON", "BOLT", "STEEL", "COBRA", "TIGER", "EAGLE", "SPARTN", "TOPCAT", "WRATH", "ROGUE", "SABER", "LANCE", "DEMON", "GHOST", "STING", "TORCH", "ATLAS", "GIANT", ] def fetch(self) -> List[Flight]: flights = [] for bound in self.MILITARY_BOUNDS: try: # ADS-B Exchange V2 API format lat_c = (bound["lat1"] + bound["lat2"]) / 2 lng_c = (bound["lng1"] + bound["lng2"]) / 2 api_url = f"https://globe.adsbexchange.com/data/aircraft.json" params = { "lat": lat_c, "lon": lng_c, "range": 500, } resp = requests.get(api_url, params=params, headers={ **HEADERS, "Referer": "https://globe.adsbexchange.com/", }, timeout=15) if resp.status_code != 200: # Try alternative endpoints for alt_url in [ f"https://globe.adsbexchange.com/data/globe_{int(lat_c)}_{int(lng_c)}.json", f"https://adsbexchange.com/api/aircraft/json/lat/{lat_c}/lon/{lng_c}/dist/500/", ]: try: resp = requests.get(alt_url, headers=HEADERS, timeout=10) if resp.status_code == 200: break except Exception: continue if resp.status_code == 200: data = resp.json() ac_list = data.get("ac", data.get("aircraft", data.get("acList", []))) for ac in ac_list: if not isinstance(ac, dict): continue callsign = (ac.get("flight", ac.get("call", ac.get("Icao", "")))).strip() ac_type = ac.get("t", ac.get("type", ac.get("Mdl", ""))) alt = ac.get("alt_baro", ac.get("Alt", ac.get("altitude", 0))) speed = ac.get("gs", ac.get("Spd", ac.get("speed", 0))) lat = ac.get("lat", ac.get("Lat", 0)) lng = ac.get("lon", ac.get("Long", ac.get("lng", 0))) heading = ac.get("track", ac.get("Trak", ac.get("heading", 0))) hex_code = ac.get("hex", ac.get("Icao", "")) squawk = ac.get("squawk", ac.get("Sqk", "")) if not callsign and not ac_type: continue if not lat or not lng: continue # فلترة: هل هي طائرة عسكرية؟ is_military = False flight_type = "military" aircraft_name = ac_type # تحقق من البادئة for prefix in self.MILITARY_CALLSIGN_PREFIXES: if callsign.upper().startswith(prefix): is_military = True break # تحقق من نوع الطائرة for mil_type, (type_name, ft) in self.MILITARY_TYPES.items(): if mil_type in (ac_type or "").upper() or mil_type in callsign.upper(): is_military = True aircraft_name = type_name flight_type = ft break # تحقق من squawk العسكري if squawk in ["7700", "7600", "7500"] or (squawk and squawk.startswith("0")): is_military = True # لا ترسبوندر = محتمل عسكري if not squawk and alt and int(str(alt).replace("ground","0")) > 25000: is_military = True if is_military: flights.append(Flight( callsign=callsign or hex_code, aircraft_type=aircraft_name or ac_type or "غير معروف", altitude=f"{alt} ft" if alt else "—", speed=f"{speed} kts" if speed else "—", lat=float(lat), lng=float(lng), heading=f"{heading}°" if heading else "—", origin=bound["name"], flight_type=flight_type, source="ADS-B Exchange", hex_code=hex_code, squawk=squawk, timestamp=time.time(), )) except Exception as e: logger.warning(f"[ADS-B] منطقة {bound['name']}: {e}") return flights class FlightRadarScraper(BaseScraper): """جمع بيانات الطيران من Flightradar24""" name = "Flightradar24" url = "https://www.flightradar24.com" # مناطق المراقبة ZONES = [ {"name": "الشرق الأوسط", "bounds": "37,25,55,32"}, {"name": "شرق المتوسط", "bounds": "33,30,37,36"}, {"name": "البحر الأحمر", "bounds": "12,38,22,46"}, ] def fetch(self) -> List[Flight]: flights = [] for zone in self.ZONES: try: bounds = zone["bounds"] api_url = f"https://data-cloud.flightradar24.com/zones/fcgi/feed.js" params = { "bounds": bounds, "faa": "1", "satellite": "1", "mlat": "1", "flarm": "0", "adsb": "1", "gnd": "0", "air": "1", "vehicles": "0", "estimated": "0", "gliders": "0", "stats": "0", } resp = requests.get(api_url, params=params, headers={ **HEADERS, "Referer": "https://www.flightradar24.com/", "Origin": "https://www.flightradar24.com", }, timeout=15) if resp.status_code == 200: data = resp.json() for key, val in data.items(): if key in ("full_count", "version", "stats"): continue if not isinstance(val, list) or len(val) < 10: continue # FR24 format: [hex, lat, lng, heading, alt, speed, squawk, radar, type, reg, timestamp, origin, dest, flight, ?, ?, callsign, ...] try: hex_code = val[0] if len(val) > 0 else "" lat = float(val[1]) if len(val) > 1 else 0 lng = float(val[2]) if len(val) > 2 else 0 heading = val[3] if len(val) > 3 else 0 alt = val[4] if len(val) > 4 else 0 speed = val[5] if len(val) > 5 else 0 squawk = str(val[6]) if len(val) > 6 else "" ac_type = val[8] if len(val) > 8 else "" callsign = str(val[16] if len(val) > 16 else val[13] if len(val) > 13 else "") if not lat or not lng: continue # فلترة عسكرية is_military = False flight_type = "military" aircraft_name = ac_type for mil_type, (tname, ft) in ADSBExchangeScraper.MILITARY_TYPES.items(): if mil_type in (ac_type or "").upper() or mil_type in callsign.upper(): is_military = True aircraft_name = tname flight_type = ft break for prefix in ADSBExchangeScraper.MILITARY_CALLSIGN_PREFIXES: if callsign.upper().startswith(prefix): is_military = True break # squawk 0000 or military ranges if squawk and (squawk == "0000" or squawk.startswith("0") or squawk in ["7700","7600","7500"]): is_military = True if is_military: flights.append(Flight( callsign=callsign.strip() or hex_code, aircraft_type=aircraft_name or ac_type or "غير معروف", altitude=f"{alt} ft", speed=f"{speed} kts", lat=float(lat), lng=float(lng), heading=f"{heading}°", origin=zone["name"], flight_type=flight_type, source="Flightradar24", hex_code=str(hex_code), squawk=squawk, timestamp=time.time(), )) except (ValueError, IndexError): continue except Exception as e: logger.warning(f"[FR24] منطقة {zone['name']}: {e}") return flights # ===== مدير البيانات المركزي ===== class DataManager: """يدير جمع وتخزين وفلترة كل البيانات""" def __init__(self): self.events: List[Event] = [] self.flights: List[Flight] = [] self.source_status: Dict[str, Dict] = {} self.last_update: float = 0 self.is_fetching: bool = False self.lock = threading.Lock() self.event_scrapers = [ BamqamScraper(), WorldMonitorScraper(), ConflictlyScraper(), PizzintScraper(), ] self.flight_scrapers = [ ADSBExchangeScraper(), FlightRadarScraper(), ] def fetch_all(self): """جلب كل البيانات من جميع المصادر""" if self.is_fetching: return self.is_fetching = True new_events = [] new_flights = [] # جمع الأحداث for scraper in self.event_scrapers: items = scraper.safe_fetch() new_events.extend(items) self.source_status[scraper.name] = { "status": scraper.status, "count": len(items), "last_fetch": scraper.last_fetch, "error": scraper.error_msg, "url": scraper.url, } # جمع بيانات الطيران for scraper in self.flight_scrapers: items = scraper.safe_fetch() if isinstance(items, list) and len(items) > 0: if isinstance(items[0], Flight): new_flights.extend(items) else: # convert events if returned new_events.extend([e for e in items if isinstance(e, Event)]) self.source_status[scraper.name] = { "status": scraper.status, "count": len(items), "last_fetch": scraper.last_fetch, "error": scraper.error_msg, "url": scraper.url, } # حذف التكرارات ودمج البيانات with self.lock: # حذف الأحداث المنتهية (أكثر من 24 ساعة) existing = {e.id: e for e in self.events if not e.is_expired} for ev in new_events: if ev.id not in existing: existing[ev.id] = ev self.events = list(existing.values()) # تحديث الطائرات (استبدال كامل) seen_flights = {} for fl in new_flights: key = fl.callsign or fl.hex_code if key and key not in seen_flights: seen_flights[key] = fl if seen_flights: self.flights = list(seen_flights.values()) self.last_update = time.time() self.is_fetching = False logger.info(f"تم التحديث: {len(self.events)} حدث, {len(self.flights)} طائرة") def get_events(self, region="all", severity=None, event_type=None, search=""): """جلب الأحداث مع الفلترة""" with self.lock: filtered = [e for e in self.events if not e.is_expired] if region and region != "all" and region != "جميع المناطق": region_key = {v: k for k, v in REGION_MAP.items()}.get(region, region) filtered = [e for e in filtered if e.region == region_key] if severity: sev_map = {"🔴 حرج": "critical", "🟠 مرتفع": "high", "🟡 متوسط": "medium", "🔵 منخفض": "low"} active = [sev_map.get(s, s) for s in severity] filtered = [e for e in filtered if e.severity in active] if event_type: type_map = {"💥 غارات": "strike", "🚀 صواريخ": "missile", "✈️ طيران": "aircraft", "🎖️ قوات": "troops", "🚢 بحري": "naval"} active = [type_map.get(t, t) for t in event_type] filtered = [e for e in filtered if e.event_type in active] if search: filtered = [e for e in filtered if search in e.title or search in e.location or search in e.details] filtered.sort(key=lambda x: x.timestamp, reverse=True) return filtered def get_flights(self): """جلب بيانات الطيران""" with self.lock: return list(self.flights) def get_source_status(self): return dict(self.source_status) def get_stats(self): events = [e for e in self.events if not e.is_expired] return { "total": len(events), "strikes": len([e for e in events if e.event_type == "strike"]), "missiles": len([e for e in events if e.event_type == "missile"]), "aircraft": len([e for e in events if e.event_type == "aircraft"]), "troops": len([e for e in events if e.event_type == "troops"]), "naval": len([e for e in events if e.event_type == "naval"]), "critical": len([e for e in events if e.severity == "critical"]), "flights": len(self.flights), } REGION_MAP = { "all": "جميع المناطق", "middleeast": "الشرق الأوسط", "europe": "أوروبا الشرقية", "africa": "شمال أفريقيا", "redSea": "البحر الأحمر", "asia": "جنوب آسيا", }