testtt / app.py
lljz66's picture
Update app.py
342a97d verified
from playwright.sync_api import sync_playwright
import os, json, time, logging, gzip, shutil
from urllib.parse import urlparse, parse_qs
from huggingface_hub import HfApi
# =========================
# 1. إعداد السجلات (Logging)
# =========================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
log = logging.getLogger(__name__)
# =========================
# 2. الدوال المساعدة (Helpers)
# =========================
def extract_id(url):
"""استخراج معرف الاستشارة من الرابط"""
try:
return parse_qs(urlparse(url).query).get("refConsultation", [""])[0]
except:
return "UNKNOWN"
def extract_field_value(page, label_text):
"""
استخراج القيمة لحقل معين بذكاء.
يبحث عن التسمية، ثم يحاول استخراج النص المجاور لها أو الذي يليها.
"""
try:
# نبحث عن العناصر التي تحتوي على نص التسمية
# نستخدم first للحصول على أقرب تطابق، لكن قد نحتاج لتجربة عدة عناصر إذا فشل الأول
locators = page.locator(f"text={label_text}").all()
for el in locators:
if not el.is_visible():
continue
# الطريقة 1: التحقق من النص الكامل للعنصر (غالباً التسمية والقيمة في نفس الـ td/div)
full_text = el.inner_text().strip()
# تنظيف النص من الرموز غير المرئية
clean_full = " ".join(full_text.split())
# محاولة الفصل بالنقطتين (العربية أو الإنجليزية مع مسافات)
for sep in [" : ", ":", " : "]:
if sep in clean_full:
parts = clean_full.split(sep, 1)
if len(parts) > 1:
val = parts[1].strip()
if val and val != label_text:
return val
# الطريقة 2: إذا لم نجد فاصلاً، قد تكون القيمة في عنصر sibling (td التالي مثلاً)
# نحاول الوصول للأب ثم نبحث عن td آخر
parent = el.locator("xpath=..")
if parent.count() > 0:
# البحث عن أي td أو div داخل الأب لا يحتوي على التسمية نفسها (لنفترض أنه القيمة)
# هذه طريقة تقريبية تعتمد على هيكل الجداول الشائع
siblings = parent.locator("td, div").all()
for sib in siblings:
sib_text = sib.inner_text().strip()
if sib_text and label_text not in sib_text and len(sib_text) > 2:
return sib_text
# الطريقة 3: إذا كان النص طويلاً ويحتوي على التسمية في البداية فقط
if clean_full.startswith(label_text):
val = clean_full[len(label_text):].strip()
# إزالة النقطتين في البداية إذا وجدت
val = val.lstrip(":").strip()
if val:
return val
return ""
except Exception as e:
# log.debug(f"Error extracting {label_text}: {e}")
return ""
def safe_download(page, locator, file_prefix, download_dir="downloads"):
"""تحميل ملف بأمان وإرجاع المسار المحلي"""
os.makedirs(download_dir, exist_ok=True)
if locator.count() == 0:
return False, None, "Not found"
try:
with page.expect_download(timeout=20000) as dl_info:
locator.first.click()
download = dl_info.value
# تحديد الامتداد
suggested_name = download.suggested_filename or "file"
_, ext = os.path.splitext(suggested_name)
if not ext:
ext = ".bin"
local_path = os.path.join(download_dir, f"{file_prefix}{ext}")
download.save_as(local_path)
return True, local_path, "Success"
except Exception as e:
return False, None, str(e)
def compress_file_gzip(input_path):
"""ضغط ملف باستخدام gzip لتوفير المساحة على HF"""
output_path = input_path + ".gz"
with open(input_path, 'rb') as f_in, gzip.open(output_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
return output_path
def upload_consultation_to_hf(consultation_id, metadata, files_dict, repo_id, api):
"""
رفع بيانات استشارة واحدة إلى Hugging Face Dataset
الهيكل: data/consultations/{ID}/metadata.json + files/
"""
base_repo_path = f"data/consultations/{consultation_id}"
# 1. رفع ملف البيانات الوصفية (Metadata)
meta_filename = f"temp_meta_{consultation_id}.json"
try:
with open(meta_filename, "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
api.upload_file(
path_or_fileobj=meta_filename,
path_in_repo=f"{base_repo_path}/metadata.json",
repo_id=repo_id,
repo_type="dataset"
)
log.info(f"[UPLOAD] {base_repo_path}/metadata.json")
finally:
if os.path.exists(meta_filename):
os.remove(meta_filename)
# 2. رفع الملفات المرفقة (Files)
for file_type, local_path in files_dict.items():
if not local_path or not os.path.exists(local_path):
continue
file_name = os.path.basename(local_path)
repo_file_path = f"{base_repo_path}/files/{file_name}"
# ضغط الملفات الكبيرة (> 5MB) لتوفير المساحة
file_size = os.path.getsize(local_path)
upload_path = local_path
if file_size > 5 * 1024 * 1024:
log.info(f"[COMPRESS] Compressing {file_name} ({file_size/1024/1024:.2f} MB)...")
upload_path = compress_file_gzip(local_path)
repo_file_path += ".gz"
try:
api.upload_file(
path_or_fileobj=upload_path,
path_in_repo=repo_file_path,
repo_id=repo_id,
repo_type="dataset"
)
log.info(f"[UPLOAD] {repo_file_path}")
except Exception as e:
log.error(f"[UPLOAD ERROR] Failed to upload {file_name}: {e}")
finally:
# حذف الملف المضغوط المؤقت إذا وجد
if upload_path != local_path and os.path.exists(upload_path):
os.remove(upload_path)
# =========================
# 3. الوظيفة الرئيسية (Main Run)
# =========================
def run():
repo_id = "lljz66/opentender_morocco_data"
# تأكد من وجود متغير البيئة HF_TOKEN
api = HfApi()
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=["--no-sandbox", "--disable-dev-shm-usage"])
context = browser.new_context(accept_downloads=True)
page = context.new_page()
log.info("Starting crawl...")
# الذهاب لصفحة البحث المتقدم
page.goto("https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons&consAnnulee=1")
page.wait_for_load_state("domcontentloaded")
# === إعداد معايير البحث ===
# ملاحظة: عدل التواريخ حسب الحاجة
page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneStart").fill("25/04/2026")
page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneEnd").fill("25/10/2026")
page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneCalculeStart").fill("25/10/2025")
page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneCalculeEnd").fill("25/04/2026")
# النقر على بحث
page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_lancerRecherche").click()
page.wait_for_timeout(5000) # انتظار تحميل النتائج
max_test = 10 # عدد الصفوف للتجربة
processed_count = 0
for i in range(max_test):
log.info(f"Processing row {i+1}/{max_test}")
# ⚠️ هام: إعادة تعريف locator للصفوف في كل مرة لتجنب مشكلة Stale Element
rows = page.locator("tr:has(div[id*='panelAction'])")
# التحقق من وجود الصف
if rows.count() <= i:
log.warning(f"Row {i+1} not found. Ending loop.")
break
row = rows.nth(i)
link = row.locator("a[href*='EntrepriseDetailConsultation']").first
# استخراج الرابط
try:
url = link.get_attribute("href", timeout=5000)
except:
log.warning(f"Could not get href for row {i+1}. Skipping.")
continue
if not url:
continue
full_url = "https://www.marchespublics.gov.ma/" + url
log.info(f"Opening: {full_url}")
# === فتح صفحة التفاصيل ===
link.click()
page.wait_for_load_state("domcontentloaded")
# انتظار إضافي لضمان تحميل المحتوى الديناميكي
page.wait_for_timeout(2000)
consultation_id = extract_id(page.url)
if not consultation_id or consultation_id == "UNKNOWN":
log.warning("Could not extract Consultation ID. Skipping.")
# محاولة العودة قبل المتابعة
try: page.go_back(); page.wait_for_timeout(2000)
except: pass
continue
log.info(f"Consultation ID: {consultation_id}")
# === استخراج البيانات النصية ===
metadata = {
"consultation_id": consultation_id,
"reference": extract_field_value(page, "Référence"),
"objet": extract_field_value(page, "Objet"),
"date_limite": extract_field_value(page, "Date et heure limite de remise des plis"),
"date_annulation": extract_field_value(page, "Date d'annulation"),
"source_url": full_url,
"scraped_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
log.info(f"Extracted Data -> Ref: '{metadata['reference'][:20]}...', DateLim: '{metadata['date_limite']}'")
# === التنقل لتبويب الوثائق (Onglet 1) ===
# نتحقق من وجود التبويب وننقر عليه إذا لزم الأمر
onglet = page.locator("#ongletLayer1").first
if onglet.is_visible():
onglet.click()
page.wait_for_timeout(2000)
files_dict = {} # لتخزين مسارات الملفات المحلية
# --- 1. معالجة RC (Règlement de consultation) ---
rc_locator = page.locator("text=Règlement de consultation").first
rc_ok, rc_path, rc_msg = safe_download(page, rc_locator, f"rc_{consultation_id}")
metadata["rc_downloaded"] = rc_ok
if rc_ok and rc_path:
files_dict["rc"] = rc_path
log.info(f"[RC] Downloaded: {os.path.basename(rc_path)}")
elif rc_msg != "Not found":
log.warning(f"[RC] Failed: {rc_msg}")
# --- 2. معالجة PV (Extrait de PV) ---
# الـ PV غالباً صفحة نصية وليست ملف تحميل مباشر
pv_locator = page.locator("text=Extrait de PV").first
if pv_locator.is_visible():
try:
pv_locator.click()
page.wait_for_load_state("domcontentloaded")
page.wait_for_timeout(1500)
pv_text = page.locator("body").inner_text().strip()
metadata["pv_available"] = True
metadata["pv_preview"] = pv_text[:300]
# حفظ النص كملف محلي
pv_file = f"downloads/pv_{consultation_id}.txt"
with open(pv_file, "w", encoding="utf-8") as f:
f.write(pv_text)
files_dict["pv"] = pv_file
log.info("[PV] Extracted and saved")
# العودة مرتين (كما في المنطق السابق الناجح)
for _ in range(2):
ret_btn = page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").first
if ret_btn.is_visible():
ret_btn.click()
page.wait_for_load_state("domcontentloaded")
page.wait_for_timeout(1000)
except Exception as e:
log.warning(f"[PV] Error processing: {e}")
metadata["pv_available"] = False
# محاولة عودة طارئة
try:
page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").click()
page.wait_for_timeout(1000)
except: pass
else:
metadata["pv_available"] = False
# --- 3. معالجة AVIS (Fichier joint / Avis) ---
# نبحث عن "Fichier joint" أو "Avis de..."
avis_locator = page.locator("text=Fichier joint").first
if not avis_locator.is_visible():
avis_locator = page.locator("text=Avis de").first # محاولة بديلة
avis_ok, avis_path, avis_msg = safe_download(page, avis_locator, f"avis_{consultation_id}")
metadata["avis_downloaded"] = avis_ok
if avis_ok and avis_path:
files_dict["avis"] = avis_path
log.info(f"[AVIS] Downloaded: {os.path.basename(avis_path)}")
elif avis_msg != "Not found":
log.warning(f"[AVIS] Failed: {avis_msg}")
# === العودة لصفحة النتائج ===
try:
# استخدام الزر المحدد للعودة
back_btn = page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").first
if back_btn.is_visible():
back_btn.click()
else:
page.go_back() # بديل
page.wait_for_load_state("domcontentloaded")
page.wait_for_timeout(2000) # استقرار الصفحة
log.info("Returned to search results")
except Exception as e:
log.error(f"Failed to return to search page: {e}")
break # إيقاف الحلقة إذا فشل العودة
# === الرفع إلى Hugging Face ===
try:
upload_consultation_to_hf(consultation_id, metadata, files_dict, repo_id, api)
processed_count += 1
log.info(f"[COMPLETE] {consultation_id} successfully processed and uploaded.")
except Exception as e:
log.error(f"[FATAL UPLOAD ERROR] for {consultation_id}: {e}")
log.info(f"Crawl finished. Total processed: {processed_count}")
browser.close()
if __name__ == "__main__":
run()