The-observer-V1 / scraper.py
ali3133's picture
Upload scraper.py
1a872a2 verified
"""
وحدة جمع البيانات الحقيقية من المصادر الست
Real-time data scraping from 6 conflict monitoring sources
"""
import requests
from bs4 import BeautifulSoup
import json
import re
import time
import hashlib
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict
from abc import ABC, abstractmethod
import threading
import traceback
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("scraper")
# ===== نموذج البيانات =====
@dataclass
class Event:
id: str
title: str
event_type: str # strike, missile, aircraft, troops, naval, drone, defense
severity: str # critical, high, medium, low
category: str # غارة جوية, صاروخ, etc
lat: float
lng: float
location: str
region: str # middleeast, europe, africa, redSea, asia
details: str
source: str
source_url: str = ""
timestamp: float = 0 # unix timestamp
raw_time: str = ""
extra: Dict = field(default_factory=dict)
def to_dict(self):
return asdict(self)
@property
def minutes_ago(self):
if self.timestamp > 0:
return max(1, int((time.time() - self.timestamp) / 60))
return 30
@property
def is_expired(self):
"""الحدث ينتهي بعد 24 ساعة"""
return self.minutes_ago > 1440
@dataclass
class Flight:
callsign: str
aircraft_type: str
altitude: str
speed: str
lat: float
lng: float
heading: str
origin: str
flight_type: str # military, recon, cargo, drone
source: str
hex_code: str = ""
squawk: str = ""
timestamp: float = 0
def to_dict(self):
return asdict(self)
# ===== مصنف المنطقة الجغرافية =====
def classify_region(lat: float, lng: float, text: str = "") -> str:
text_lower = text.lower() if text else ""
# بالنص
me_keywords = ["syria", "iraq", "iran", "yemen", "lebanon", "gaza", "israel",
"سوريا", "عراق", "إيران", "يمن", "لبنان", "غزة", "فلسطين",
"jordan", "الأردن", "qatar", "قطر", "saudi", "سعودي", "bahrain",
"kuwait", "كويت", "uae", "إمارات", "oman", "عمان", "turkey", "تركيا"]
eu_keywords = ["ukraine", "أوكرانيا", "russia", "روسيا", "donbas", "دونباس",
"crimea", "القرم", "belgorod", "بيلغورود", "poland", "romania"]
af_keywords = ["sudan", "سودان", "libya", "ليبيا", "ethiopia", "إثيوبيا",
"somalia", "صومال", "mali", "niger", "chad", "تشاد"]
rs_keywords = ["red sea", "البحر الأحمر", "bab al-mandab", "باب المندب",
"houthi", "حوثي", "aden", "عدن"]
for kw in rs_keywords:
if kw in text_lower:
return "redSea"
for kw in me_keywords:
if kw in text_lower:
return "middleeast"
for kw in eu_keywords:
if kw in text_lower:
return "europe"
for kw in af_keywords:
if kw in text_lower:
return "africa"
# بالإحداثيات
if 10 <= lat <= 42 and 25 <= lng <= 63:
if 12 <= lat <= 20 and 38 <= lng <= 50:
return "redSea"
return "middleeast"
elif 44 <= lat <= 60 and 22 <= lng <= 45:
return "europe"
elif -5 <= lat <= 35 and -20 <= lng <= 55 and lat < 30:
return "africa"
elif 5 <= lat <= 40 and 63 <= lng <= 100:
return "asia"
return "middleeast"
# ===== مصنف نوع الحدث =====
def classify_event_type(text: str) -> tuple:
text_lower = text.lower()
strike_kw = ["airstrike", "strike", "bombing", "shelling", "bombardment", "explosion",
"غارة", "قصف", "ضربة", "انفجار", "تدمير", "استهداف", "air raid"]
missile_kw = ["missile", "rocket", "intercept", "ballistic", "cruise",
"صاروخ", "اعتراض", "باليستي", "كروز", "إطلاق صاروخ"]
aircraft_kw = ["aircraft", "plane", "jet", "fighter", "drone", "uav", "flight",
"طائرة", "مقاتلة", "تحليق", "طيران", "مسيّرة"]
troops_kw = ["troops", "forces", "military movement", "convoy", "deployment",
"قوات", "تحركات", "رتل", "انتشار", "مدرعات", "مشاة", "تعزيز"]
naval_kw = ["naval", "ship", "carrier", "destroyer", "navy", "maritime",
"بحري", "سفينة", "حاملة", "مدمرة", "بارجة", "بحر"]
defense_kw = ["defense", "air defense", "interception", "iron dome", "patriot",
"دفاع", "دفاع جوي", "اعتراض", "قبة حديدية"]
for kw in missile_kw:
if kw in text_lower:
return ("missile", "صاروخ", "high")
for kw in strike_kw:
if kw in text_lower:
return ("strike", "غارة جوية", "critical")
for kw in defense_kw:
if kw in text_lower:
return ("missile", "اعتراض / دفاع", "high")
for kw in aircraft_kw:
if kw in text_lower:
return ("aircraft", "حركة طيران", "medium")
for kw in naval_kw:
if kw in text_lower:
return ("naval", "نشاط بحري", "medium")
for kw in troops_kw:
if kw in text_lower:
return ("troops", "تحرك قوات", "high")
return ("strike", "حدث عسكري", "medium")
# ===== مصنف الخطورة =====
def classify_severity(text: str, event_type: str) -> str:
text_lower = text.lower()
critical_kw = ["breaking", "urgent", "عاجل", "critical", "mass", "multiple",
"killed", "casualties", "قتلى", "شهداء", "مجزرة", "كبير"]
high_kw = ["alert", "warning", "تنبيه", "تحذير", "confirmed", "مؤكد"]
for kw in critical_kw:
if kw in text_lower:
return "critical"
for kw in high_kw:
if kw in text_lower:
return "high"
if event_type in ("strike", "missile"):
return "high"
return "medium"
# ===== معرّف فريد =====
def make_event_id(source: str, text: str) -> str:
raw = f"{source}:{text[:80]}"
return hashlib.md5(raw.encode()).hexdigest()[:12]
# ===== رؤوس الطلبات =====
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9,ar;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}
# ===== الكاشطات =====
class BaseScraper(ABC):
"""كاشط أساسي"""
name: str = "Unknown"
url: str = ""
status: str = "offline"
last_fetch: float = 0
error_msg: str = ""
@abstractmethod
def fetch(self) -> List[Event]:
pass
def safe_fetch(self) -> List[Event]:
try:
logger.info(f"[{self.name}] جارٍ الجمع...")
events = self.fetch()
self.status = "online"
self.last_fetch = time.time()
self.error_msg = ""
logger.info(f"[{self.name}] تم جمع {len(events)} حدث")
return events
except Exception as e:
self.status = "error"
self.error_msg = str(e)
logger.error(f"[{self.name}] خطأ: {e}")
return []
class BamqamScraper(BaseScraper):
"""جمع البيانات من Bamqam"""
name = "Bamqam"
url = "https://bamqam.com"
def fetch(self) -> List[Event]:
events = []
try:
# محاولة جلب الصفحة الرئيسية
resp = requests.get(self.url, headers=HEADERS, timeout=15)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# البحث عن بيانات JSON مضمنة في السكربتات
scripts = soup.find_all("script")
for script in scripts:
text = script.string or ""
# البحث عن أنماط بيانات JSON
json_matches = re.findall(r'(?:events|data|items)\s*[:=]\s*(\[.*?\])', text, re.DOTALL)
for match in json_matches:
try:
items = json.loads(match)
for item in items:
if isinstance(item, dict):
title = item.get("title", item.get("name", item.get("text", "")))
if not title:
continue
lat = float(item.get("lat", item.get("latitude", 33.0)))
lng = float(item.get("lng", item.get("longitude", 44.0)))
etype, cat, sev = classify_event_type(title)
events.append(Event(
id=make_event_id("bamqam", title),
title=title,
event_type=etype,
severity=classify_severity(title, etype),
category=cat,
lat=lat, lng=lng,
location=item.get("location", item.get("place", "—")),
region=classify_region(lat, lng, title),
details=item.get("description", item.get("details", title)),
source="Bamqam",
source_url=self.url,
timestamp=time.time(),
))
except json.JSONDecodeError:
continue
# البحث عن بطاقات / عناصر الأحداث في HTML
cards = soup.select("article, .event, .card, .item, .post, [class*='event'], [class*='alert'], [class*='incident']")
for card in cards[:20]:
title_el = card.select_one("h1,h2,h3,h4,.title,[class*='title'],.heading")
if not title_el:
continue
title = title_el.get_text(strip=True)
if len(title) < 5:
continue
desc_el = card.select_one("p,.desc,.description,.text,[class*='desc'],[class*='text']")
desc = desc_el.get_text(strip=True) if desc_el else title
loc_el = card.select_one("[class*='loc'],[class*='place'],[class*='region']")
loc = loc_el.get_text(strip=True) if loc_el else ""
# محاولة استخراج الإحداثيات
lat, lng = 33.0, 44.0
data_lat = card.get("data-lat") or card.get("data-latitude")
data_lng = card.get("data-lng") or card.get("data-longitude")
if data_lat and data_lng:
lat, lng = float(data_lat), float(data_lng)
etype, cat, sev = classify_event_type(title + " " + desc)
events.append(Event(
id=make_event_id("bamqam", title),
title=title,
event_type=etype,
severity=classify_severity(title + desc, etype),
category=cat,
lat=lat, lng=lng,
location=loc or "—",
region=classify_region(lat, lng, title + " " + loc),
details=desc,
source="Bamqam",
source_url=self.url,
timestamp=time.time(),
))
# محاولة جلب API إن وُجد
for api_path in ["/api/events", "/api/incidents", "/api/data", "/api/v1/events"]:
try:
api_resp = requests.get(f"{self.url}{api_path}", headers=HEADERS, timeout=10)
if api_resp.status_code == 200:
data = api_resp.json()
items = data if isinstance(data, list) else data.get("data", data.get("events", data.get("items", [])))
for item in items[:30]:
if isinstance(item, dict):
title = item.get("title", item.get("name", ""))
if not title:
continue
lat = float(item.get("lat", item.get("latitude", 33.0)))
lng = float(item.get("lng", item.get("longitude", 44.0)))
etype, cat, sev = classify_event_type(title)
events.append(Event(
id=make_event_id("bamqam_api", title),
title=title,
event_type=etype,
severity=classify_severity(title, etype),
category=cat,
lat=lat, lng=lng,
location=item.get("location", "—"),
region=classify_region(lat, lng, title),
details=item.get("description", title),
source="Bamqam",
source_url=self.url,
timestamp=time.time(),
))
except Exception:
continue
except Exception as e:
logger.warning(f"[Bamqam] فشل: {e}")
return events
class WorldMonitorScraper(BaseScraper):
"""جمع البيانات من WorldMonitor"""
name = "WorldMonitor"
url = "https://worldmonitor.app"
def fetch(self) -> List[Event]:
events = []
try:
resp = requests.get(self.url, headers=HEADERS, timeout=15)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# البحث في السكربتات عن بيانات
for script in soup.find_all("script"):
text = script.string or ""
# أنماط Next.js / Nuxt.js
for pattern in [
r'__NEXT_DATA__\s*=\s*({.*?})\s*;',
r'__NUXT__\s*=\s*({.*?})\s*;',
r'window\.__data\s*=\s*({.*?})\s*;',
r'"events"\s*:\s*(\[.*?\])',
r'"incidents"\s*:\s*(\[.*?\])',
r'"markers"\s*:\s*(\[.*?\])',
]:
matches = re.findall(pattern, text, re.DOTALL)
for match in matches:
try:
data = json.loads(match)
items = []
if isinstance(data, list):
items = data
elif isinstance(data, dict):
# traverse nested structure
for key in ["events", "incidents", "markers", "data", "props"]:
if key in data:
val = data[key]
if isinstance(val, list):
items = val
break
elif isinstance(val, dict):
for k2 in ["events", "data", "items", "pageProps"]:
if k2 in val and isinstance(val[k2], (list, dict)):
if isinstance(val[k2], list):
items = val[k2]
elif isinstance(val[k2], dict):
for k3 in val[k2]:
if isinstance(val[k2][k3], list):
items = val[k2][k3]
break
break
for item in items[:30]:
if not isinstance(item, dict):
continue
title = item.get("title", item.get("name", item.get("description", "")))
if not title or len(str(title)) < 5:
continue
lat = float(item.get("lat", item.get("latitude", item.get("y", 33))))
lng = float(item.get("lng", item.get("longitude", item.get("lon", item.get("x", 44)))))
etype, cat, sev = classify_event_type(str(title))
events.append(Event(
id=make_event_id("worldmonitor", str(title)),
title=str(title),
event_type=etype,
severity=classify_severity(str(title), etype),
category=cat,
lat=lat, lng=lng,
location=item.get("location", item.get("place", item.get("region", "—"))),
region=classify_region(lat, lng, str(title)),
details=item.get("details", item.get("description", str(title))),
source="WorldMonitor",
source_url=self.url,
timestamp=time.time(),
))
except (json.JSONDecodeError, ValueError):
continue
# HTML parsing
cards = soup.select("[class*='event'],[class*='incident'],[class*='marker'],[class*='card'],[class*='alert'],article,.item")
for card in cards[:20]:
title_el = card.select_one("h1,h2,h3,h4,h5,.title,[class*='title']")
if not title_el:
continue
title = title_el.get_text(strip=True)
if len(title) < 5:
continue
etype, cat, sev = classify_event_type(title)
lat = float(card.get("data-lat", 33))
lng = float(card.get("data-lng", 44))
events.append(Event(
id=make_event_id("worldmonitor_html", title),
title=title,
event_type=etype,
severity=classify_severity(title, etype),
category=cat,
lat=lat, lng=lng,
location="—",
region=classify_region(lat, lng, title),
details=title,
source="WorldMonitor",
source_url=self.url,
timestamp=time.time(),
))
except Exception as e:
logger.warning(f"[WorldMonitor] فشل: {e}")
return events
class ConflictlyScraper(BaseScraper):
"""جمع البيانات من Conflictly"""
name = "Conflictly"
url = "https://www.conflictly.app"
def fetch(self) -> List[Event]:
events = []
try:
resp = requests.get(self.url, headers=HEADERS, timeout=15)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# نفس الاستراتيجية - البحث عن JSON مضمن
for script in soup.find_all("script"):
text = script.string or ""
for pattern in [
r'"features"\s*:\s*(\[.*?\])',
r'"events"\s*:\s*(\[.*?\])',
r'(?:markers|points|incidents)\s*[:=]\s*(\[.*?\])',
r'__NEXT_DATA__\s*=\s*({.*?})\s*;',
]:
matches = re.findall(pattern, text, re.DOTALL)
for match in matches:
try:
data = json.loads(match)
items = data if isinstance(data, list) else []
if isinstance(data, dict):
# GeoJSON features
items = data.get("features", data.get("events", []))
for item in items[:30]:
if not isinstance(item, dict):
continue
# GeoJSON format
props = item.get("properties", item)
geom = item.get("geometry", {})
title = props.get("title", props.get("name", props.get("description", "")))
if not title:
continue
coords = geom.get("coordinates", [])
if coords and len(coords) >= 2:
lng, lat = float(coords[0]), float(coords[1])
else:
lat = float(props.get("lat", props.get("latitude", 33)))
lng = float(props.get("lng", props.get("longitude", 44)))
etype, cat, sev = classify_event_type(str(title))
events.append(Event(
id=make_event_id("conflictly", str(title)),
title=str(title),
event_type=etype,
severity=classify_severity(str(title), etype),
category=cat,
lat=lat, lng=lng,
location=props.get("location", props.get("place", "—")),
region=classify_region(lat, lng, str(title)),
details=props.get("details", str(title)),
source="Conflictly",
source_url=self.url,
timestamp=time.time(),
))
except (json.JSONDecodeError, ValueError):
continue
# API endpoints
for path in ["/api/events", "/api/incidents", "/api/conflicts", "/api/v1/data"]:
try:
api_resp = requests.get(f"{self.url}{path}", headers={**HEADERS, "Accept": "application/json"}, timeout=10)
if api_resp.status_code == 200 and "json" in api_resp.headers.get("content-type", ""):
data = api_resp.json()
items = data if isinstance(data, list) else data.get("data", data.get("events", []))
for item in items[:30]:
if isinstance(item, dict):
title = item.get("title", item.get("name", ""))
if not title:
continue
lat = float(item.get("lat", item.get("latitude", 33)))
lng = float(item.get("lng", item.get("longitude", 44)))
etype, cat, sev = classify_event_type(str(title))
events.append(Event(
id=make_event_id("conflictly_api", str(title)),
title=str(title),
event_type=etype,
severity=classify_severity(str(title), etype),
category=cat,
lat=lat, lng=lng,
location=item.get("location", "—"),
region=classify_region(lat, lng, str(title)),
details=item.get("description", str(title)),
source="Conflictly",
source_url=self.url,
timestamp=time.time(),
))
except Exception:
continue
except Exception as e:
logger.warning(f"[Conflictly] فشل: {e}")
return events
class PizzintScraper(BaseScraper):
"""جمع البيانات من PIZZINT"""
name = "PIZZINT"
url = "https://www.pizzint.watch"
def fetch(self) -> List[Event]:
events = []
try:
resp = requests.get(self.url, headers=HEADERS, timeout=15)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
for script in soup.find_all("script"):
text = script.string or ""
for pattern in [
r'"events"\s*:\s*(\[.*?\])',
r'"incidents"\s*:\s*(\[.*?\])',
r'"alerts"\s*:\s*(\[.*?\])',
r'__NEXT_DATA__\s*=\s*({.*?})\s*</script>',
]:
matches = re.findall(pattern, text, re.DOTALL)
for match in matches:
try:
data = json.loads(match)
items = data if isinstance(data, list) else []
if isinstance(data, dict):
# Navigate nested
for k in ["props", "pageProps", "events", "data"]:
if k in data:
v = data[k]
if isinstance(v, list):
items = v
break
elif isinstance(v, dict):
for k2 in v:
if isinstance(v[k2], list):
items = v[k2]
break
for item in items[:30]:
if not isinstance(item, dict):
continue
title = item.get("title", item.get("name", ""))
if not title:
continue
lat = float(item.get("lat", item.get("latitude", 33)))
lng = float(item.get("lng", item.get("longitude", 44)))
etype, cat, sev = classify_event_type(str(title))
events.append(Event(
id=make_event_id("pizzint", str(title)),
title=str(title),
event_type=etype,
severity=classify_severity(str(title), etype),
category=cat,
lat=lat, lng=lng,
location=item.get("location", "—"),
region=classify_region(lat, lng, str(title)),
details=item.get("description", str(title)),
source="PIZZINT",
source_url=self.url,
timestamp=time.time(),
))
except (json.JSONDecodeError, ValueError):
continue
# HTML fallback
cards = soup.select("[class*='event'],[class*='alert'],[class*='incident'],[class*='card'],article,.post")
for card in cards[:15]:
title_el = card.select_one("h1,h2,h3,h4,.title,[class*='title']")
if not title_el:
continue
title = title_el.get_text(strip=True)
if len(title) < 5:
continue
etype, cat, sev = classify_event_type(title)
events.append(Event(
id=make_event_id("pizzint_html", title),
title=title,
event_type=etype,
severity=classify_severity(title, etype),
category=cat,
lat=33.0, lng=44.0,
location="—",
region=classify_region(33, 44, title),
details=title,
source="PIZZINT",
source_url=self.url,
timestamp=time.time(),
))
except Exception as e:
logger.warning(f"[PIZZINT] فشل: {e}")
return events
class ADSBExchangeScraper(BaseScraper):
"""جمع بيانات الطيران من ADS-B Exchange"""
name = "ADS-B Exchange"
url = "https://globe.adsbexchange.com"
# نقاط الاهتمام العسكرية
MILITARY_BOUNDS = [
{"name": "شرق المتوسط", "lat1": 32, "lat2": 37, "lng1": 30, "lng2": 37},
{"name": "الخليج العربي", "lat1": 23, "lat2": 32, "lng1": 45, "lng2": 57},
{"name": "البحر الأحمر", "lat1": 12, "lat2": 22, "lng1": 38, "lng2": 46},
{"name": "البحر الأسود", "lat1": 41, "lat2": 47, "lng1": 27, "lng2": 42},
]
MILITARY_TYPES = {
"C17": ("شحن عسكري", "cargo"), "C130": ("شحن عسكري", "cargo"),
"C5": ("شحن عسكري", "cargo"), "KC135": ("تزويد بالوقود", "military"),
"KC10": ("تزويد بالوقود", "military"), "KC46": ("تزويد بالوقود", "military"),
"E3": ("إنذار مبكر", "recon"), "E8": ("استطلاع", "recon"),
"RC135": ("استطلاع إلكتروني", "recon"), "EP3": ("استطلاع", "recon"),
"P8": ("مراقبة بحرية", "recon"), "P3": ("مراقبة بحرية", "recon"),
"RQ4": ("استطلاع بدون طيار", "drone"), "MQ9": ("مسيّرة هجومية", "drone"),
"MQ1": ("مسيّرة", "drone"), "FORTE": ("استطلاع", "recon"),
"F15": ("مقاتلة", "military"), "F16": ("مقاتلة", "military"),
"F18": ("مقاتلة", "military"), "F22": ("مقاتلة", "military"),
"F35": ("مقاتلة", "military"), "B52": ("قاذفة", "military"),
"B1": ("قاذفة", "military"), "B2": ("قاذفة", "military"),
"A10": ("هجوم أرضي", "military"), "AH64": ("هليكوبتر هجومية", "military"),
"V22": ("أوسبري", "military"), "GLEX": ("استطلاع", "recon"),
}
MILITARY_CALLSIGN_PREFIXES = [
"FORTE", "JAKE", "VIPER", "REAP", "RCH", "HAWK", "DUKE",
"NAVY", "ARMY", "EVAC", "REACH", "SKULL", "DOOM", "FURY",
"RAGE", "IRON", "BOLT", "STEEL", "COBRA", "TIGER", "EAGLE",
"SPARTN", "TOPCAT", "WRATH", "ROGUE", "SABER", "LANCE",
"DEMON", "GHOST", "STING", "TORCH", "ATLAS", "GIANT",
]
def fetch(self) -> List[Flight]:
flights = []
for bound in self.MILITARY_BOUNDS:
try:
# ADS-B Exchange V2 API format
lat_c = (bound["lat1"] + bound["lat2"]) / 2
lng_c = (bound["lng1"] + bound["lng2"]) / 2
api_url = f"https://globe.adsbexchange.com/data/aircraft.json"
params = {
"lat": lat_c,
"lon": lng_c,
"range": 500,
}
resp = requests.get(api_url, params=params, headers={
**HEADERS,
"Referer": "https://globe.adsbexchange.com/",
}, timeout=15)
if resp.status_code != 200:
# Try alternative endpoints
for alt_url in [
f"https://globe.adsbexchange.com/data/globe_{int(lat_c)}_{int(lng_c)}.json",
f"https://adsbexchange.com/api/aircraft/json/lat/{lat_c}/lon/{lng_c}/dist/500/",
]:
try:
resp = requests.get(alt_url, headers=HEADERS, timeout=10)
if resp.status_code == 200:
break
except Exception:
continue
if resp.status_code == 200:
data = resp.json()
ac_list = data.get("ac", data.get("aircraft", data.get("acList", [])))
for ac in ac_list:
if not isinstance(ac, dict):
continue
callsign = (ac.get("flight", ac.get("call", ac.get("Icao", "")))).strip()
ac_type = ac.get("t", ac.get("type", ac.get("Mdl", "")))
alt = ac.get("alt_baro", ac.get("Alt", ac.get("altitude", 0)))
speed = ac.get("gs", ac.get("Spd", ac.get("speed", 0)))
lat = ac.get("lat", ac.get("Lat", 0))
lng = ac.get("lon", ac.get("Long", ac.get("lng", 0)))
heading = ac.get("track", ac.get("Trak", ac.get("heading", 0)))
hex_code = ac.get("hex", ac.get("Icao", ""))
squawk = ac.get("squawk", ac.get("Sqk", ""))
if not callsign and not ac_type:
continue
if not lat or not lng:
continue
# فلترة: هل هي طائرة عسكرية؟
is_military = False
flight_type = "military"
aircraft_name = ac_type
# تحقق من البادئة
for prefix in self.MILITARY_CALLSIGN_PREFIXES:
if callsign.upper().startswith(prefix):
is_military = True
break
# تحقق من نوع الطائرة
for mil_type, (type_name, ft) in self.MILITARY_TYPES.items():
if mil_type in (ac_type or "").upper() or mil_type in callsign.upper():
is_military = True
aircraft_name = type_name
flight_type = ft
break
# تحقق من squawk العسكري
if squawk in ["7700", "7600", "7500"] or (squawk and squawk.startswith("0")):
is_military = True
# لا ترسبوندر = محتمل عسكري
if not squawk and alt and int(str(alt).replace("ground","0")) > 25000:
is_military = True
if is_military:
flights.append(Flight(
callsign=callsign or hex_code,
aircraft_type=aircraft_name or ac_type or "غير معروف",
altitude=f"{alt} ft" if alt else "—",
speed=f"{speed} kts" if speed else "—",
lat=float(lat),
lng=float(lng),
heading=f"{heading}°" if heading else "—",
origin=bound["name"],
flight_type=flight_type,
source="ADS-B Exchange",
hex_code=hex_code,
squawk=squawk,
timestamp=time.time(),
))
except Exception as e:
logger.warning(f"[ADS-B] منطقة {bound['name']}: {e}")
return flights
class FlightRadarScraper(BaseScraper):
"""جمع بيانات الطيران من Flightradar24"""
name = "Flightradar24"
url = "https://www.flightradar24.com"
# مناطق المراقبة
ZONES = [
{"name": "الشرق الأوسط", "bounds": "37,25,55,32"},
{"name": "شرق المتوسط", "bounds": "33,30,37,36"},
{"name": "البحر الأحمر", "bounds": "12,38,22,46"},
]
def fetch(self) -> List[Flight]:
flights = []
for zone in self.ZONES:
try:
bounds = zone["bounds"]
api_url = f"https://data-cloud.flightradar24.com/zones/fcgi/feed.js"
params = {
"bounds": bounds,
"faa": "1",
"satellite": "1",
"mlat": "1",
"flarm": "0",
"adsb": "1",
"gnd": "0",
"air": "1",
"vehicles": "0",
"estimated": "0",
"gliders": "0",
"stats": "0",
}
resp = requests.get(api_url, params=params, headers={
**HEADERS,
"Referer": "https://www.flightradar24.com/",
"Origin": "https://www.flightradar24.com",
}, timeout=15)
if resp.status_code == 200:
data = resp.json()
for key, val in data.items():
if key in ("full_count", "version", "stats"):
continue
if not isinstance(val, list) or len(val) < 10:
continue
# FR24 format: [hex, lat, lng, heading, alt, speed, squawk, radar, type, reg, timestamp, origin, dest, flight, ?, ?, callsign, ...]
try:
hex_code = val[0] if len(val) > 0 else ""
lat = float(val[1]) if len(val) > 1 else 0
lng = float(val[2]) if len(val) > 2 else 0
heading = val[3] if len(val) > 3 else 0
alt = val[4] if len(val) > 4 else 0
speed = val[5] if len(val) > 5 else 0
squawk = str(val[6]) if len(val) > 6 else ""
ac_type = val[8] if len(val) > 8 else ""
callsign = str(val[16] if len(val) > 16 else val[13] if len(val) > 13 else "")
if not lat or not lng:
continue
# فلترة عسكرية
is_military = False
flight_type = "military"
aircraft_name = ac_type
for mil_type, (tname, ft) in ADSBExchangeScraper.MILITARY_TYPES.items():
if mil_type in (ac_type or "").upper() or mil_type in callsign.upper():
is_military = True
aircraft_name = tname
flight_type = ft
break
for prefix in ADSBExchangeScraper.MILITARY_CALLSIGN_PREFIXES:
if callsign.upper().startswith(prefix):
is_military = True
break
# squawk 0000 or military ranges
if squawk and (squawk == "0000" or squawk.startswith("0") or squawk in ["7700","7600","7500"]):
is_military = True
if is_military:
flights.append(Flight(
callsign=callsign.strip() or hex_code,
aircraft_type=aircraft_name or ac_type or "غير معروف",
altitude=f"{alt} ft",
speed=f"{speed} kts",
lat=float(lat),
lng=float(lng),
heading=f"{heading}°",
origin=zone["name"],
flight_type=flight_type,
source="Flightradar24",
hex_code=str(hex_code),
squawk=squawk,
timestamp=time.time(),
))
except (ValueError, IndexError):
continue
except Exception as e:
logger.warning(f"[FR24] منطقة {zone['name']}: {e}")
return flights
# ===== مدير البيانات المركزي =====
class DataManager:
"""يدير جمع وتخزين وفلترة كل البيانات"""
def __init__(self):
self.events: List[Event] = []
self.flights: List[Flight] = []
self.source_status: Dict[str, Dict] = {}
self.last_update: float = 0
self.is_fetching: bool = False
self.lock = threading.Lock()
self.event_scrapers = [
BamqamScraper(),
WorldMonitorScraper(),
ConflictlyScraper(),
PizzintScraper(),
]
self.flight_scrapers = [
ADSBExchangeScraper(),
FlightRadarScraper(),
]
def fetch_all(self):
"""جلب كل البيانات من جميع المصادر"""
if self.is_fetching:
return
self.is_fetching = True
new_events = []
new_flights = []
# جمع الأحداث
for scraper in self.event_scrapers:
items = scraper.safe_fetch()
new_events.extend(items)
self.source_status[scraper.name] = {
"status": scraper.status,
"count": len(items),
"last_fetch": scraper.last_fetch,
"error": scraper.error_msg,
"url": scraper.url,
}
# جمع بيانات الطيران
for scraper in self.flight_scrapers:
items = scraper.safe_fetch()
if isinstance(items, list) and len(items) > 0:
if isinstance(items[0], Flight):
new_flights.extend(items)
else:
# convert events if returned
new_events.extend([e for e in items if isinstance(e, Event)])
self.source_status[scraper.name] = {
"status": scraper.status,
"count": len(items),
"last_fetch": scraper.last_fetch,
"error": scraper.error_msg,
"url": scraper.url,
}
# حذف التكرارات ودمج البيانات
with self.lock:
# حذف الأحداث المنتهية (أكثر من 24 ساعة)
existing = {e.id: e for e in self.events if not e.is_expired}
for ev in new_events:
if ev.id not in existing:
existing[ev.id] = ev
self.events = list(existing.values())
# تحديث الطائرات (استبدال كامل)
seen_flights = {}
for fl in new_flights:
key = fl.callsign or fl.hex_code
if key and key not in seen_flights:
seen_flights[key] = fl
if seen_flights:
self.flights = list(seen_flights.values())
self.last_update = time.time()
self.is_fetching = False
logger.info(f"تم التحديث: {len(self.events)} حدث, {len(self.flights)} طائرة")
def get_events(self, region="all", severity=None, event_type=None, search=""):
"""جلب الأحداث مع الفلترة"""
with self.lock:
filtered = [e for e in self.events if not e.is_expired]
if region and region != "all" and region != "جميع المناطق":
region_key = {v: k for k, v in REGION_MAP.items()}.get(region, region)
filtered = [e for e in filtered if e.region == region_key]
if severity:
sev_map = {"🔴 حرج": "critical", "🟠 مرتفع": "high", "🟡 متوسط": "medium", "🔵 منخفض": "low"}
active = [sev_map.get(s, s) for s in severity]
filtered = [e for e in filtered if e.severity in active]
if event_type:
type_map = {"💥 غارات": "strike", "🚀 صواريخ": "missile", "✈️ طيران": "aircraft", "🎖️ قوات": "troops", "🚢 بحري": "naval"}
active = [type_map.get(t, t) for t in event_type]
filtered = [e for e in filtered if e.event_type in active]
if search:
filtered = [e for e in filtered if search in e.title or search in e.location or search in e.details]
filtered.sort(key=lambda x: x.timestamp, reverse=True)
return filtered
def get_flights(self):
"""جلب بيانات الطيران"""
with self.lock:
return list(self.flights)
def get_source_status(self):
return dict(self.source_status)
def get_stats(self):
events = [e for e in self.events if not e.is_expired]
return {
"total": len(events),
"strikes": len([e for e in events if e.event_type == "strike"]),
"missiles": len([e for e in events if e.event_type == "missile"]),
"aircraft": len([e for e in events if e.event_type == "aircraft"]),
"troops": len([e for e in events if e.event_type == "troops"]),
"naval": len([e for e in events if e.event_type == "naval"]),
"critical": len([e for e in events if e.severity == "critical"]),
"flights": len(self.flights),
}
REGION_MAP = {
"all": "جميع المناطق",
"middleeast": "الشرق الأوسط",
"europe": "أوروبا الشرقية",
"africa": "شمال أفريقيا",
"redSea": "البحر الأحمر",
"asia": "جنوب آسيا",
}