from playwright.sync_api import sync_playwright import os, json, time, logging, gzip, shutil from urllib.parse import urlparse, parse_qs from huggingface_hub import HfApi # ========================= # 1. إعداد السجلات (Logging) # ========================= logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) log = logging.getLogger(__name__) # ========================= # 2. الدوال المساعدة (Helpers) # ========================= def extract_id(url): """استخراج معرف الاستشارة من الرابط""" try: return parse_qs(urlparse(url).query).get("refConsultation", [""])[0] except: return "UNKNOWN" def extract_field_value(page, label_text): """ استخراج القيمة لحقل معين بذكاء. يبحث عن التسمية، ثم يحاول استخراج النص المجاور لها أو الذي يليها. """ try: # نبحث عن العناصر التي تحتوي على نص التسمية # نستخدم first للحصول على أقرب تطابق، لكن قد نحتاج لتجربة عدة عناصر إذا فشل الأول locators = page.locator(f"text={label_text}").all() for el in locators: if not el.is_visible(): continue # الطريقة 1: التحقق من النص الكامل للعنصر (غالباً التسمية والقيمة في نفس الـ td/div) full_text = el.inner_text().strip() # تنظيف النص من الرموز غير المرئية clean_full = " ".join(full_text.split()) # محاولة الفصل بالنقطتين (العربية أو الإنجليزية مع مسافات) for sep in [" : ", ":", " : "]: if sep in clean_full: parts = clean_full.split(sep, 1) if len(parts) > 1: val = parts[1].strip() if val and val != label_text: return val # الطريقة 2: إذا لم نجد فاصلاً، قد تكون القيمة في عنصر sibling (td التالي مثلاً) # نحاول الوصول للأب ثم نبحث عن td آخر parent = el.locator("xpath=..") if parent.count() > 0: # البحث عن أي td أو div داخل الأب لا يحتوي على التسمية نفسها (لنفترض أنه القيمة) # هذه طريقة تقريبية تعتمد على هيكل الجداول الشائع siblings = parent.locator("td, div").all() for sib in siblings: sib_text = sib.inner_text().strip() if sib_text and label_text not in sib_text and len(sib_text) > 2: return sib_text # الطريقة 3: إذا كان النص طويلاً ويحتوي على التسمية في البداية فقط if clean_full.startswith(label_text): val = clean_full[len(label_text):].strip() # إزالة النقطتين في البداية إذا وجدت val = val.lstrip(":").strip() if val: return val return "" except Exception as e: # log.debug(f"Error extracting {label_text}: {e}") return "" def safe_download(page, locator, file_prefix, download_dir="downloads"): """تحميل ملف بأمان وإرجاع المسار المحلي""" os.makedirs(download_dir, exist_ok=True) if locator.count() == 0: return False, None, "Not found" try: with page.expect_download(timeout=20000) as dl_info: locator.first.click() download = dl_info.value # تحديد الامتداد suggested_name = download.suggested_filename or "file" _, ext = os.path.splitext(suggested_name) if not ext: ext = ".bin" local_path = os.path.join(download_dir, f"{file_prefix}{ext}") download.save_as(local_path) return True, local_path, "Success" except Exception as e: return False, None, str(e) def compress_file_gzip(input_path): """ضغط ملف باستخدام gzip لتوفير المساحة على HF""" output_path = input_path + ".gz" with open(input_path, 'rb') as f_in, gzip.open(output_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) return output_path def upload_consultation_to_hf(consultation_id, metadata, files_dict, repo_id, api): """ رفع بيانات استشارة واحدة إلى Hugging Face Dataset الهيكل: data/consultations/{ID}/metadata.json + files/ """ base_repo_path = f"data/consultations/{consultation_id}" # 1. رفع ملف البيانات الوصفية (Metadata) meta_filename = f"temp_meta_{consultation_id}.json" try: with open(meta_filename, "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) api.upload_file( path_or_fileobj=meta_filename, path_in_repo=f"{base_repo_path}/metadata.json", repo_id=repo_id, repo_type="dataset" ) log.info(f"[UPLOAD] {base_repo_path}/metadata.json") finally: if os.path.exists(meta_filename): os.remove(meta_filename) # 2. رفع الملفات المرفقة (Files) for file_type, local_path in files_dict.items(): if not local_path or not os.path.exists(local_path): continue file_name = os.path.basename(local_path) repo_file_path = f"{base_repo_path}/files/{file_name}" # ضغط الملفات الكبيرة (> 5MB) لتوفير المساحة file_size = os.path.getsize(local_path) upload_path = local_path if file_size > 5 * 1024 * 1024: log.info(f"[COMPRESS] Compressing {file_name} ({file_size/1024/1024:.2f} MB)...") upload_path = compress_file_gzip(local_path) repo_file_path += ".gz" try: api.upload_file( path_or_fileobj=upload_path, path_in_repo=repo_file_path, repo_id=repo_id, repo_type="dataset" ) log.info(f"[UPLOAD] {repo_file_path}") except Exception as e: log.error(f"[UPLOAD ERROR] Failed to upload {file_name}: {e}") finally: # حذف الملف المضغوط المؤقت إذا وجد if upload_path != local_path and os.path.exists(upload_path): os.remove(upload_path) # ========================= # 3. الوظيفة الرئيسية (Main Run) # ========================= def run(): repo_id = "lljz66/opentender_morocco_data" # تأكد من وجود متغير البيئة HF_TOKEN api = HfApi() with sync_playwright() as p: browser = p.chromium.launch(headless=True, args=["--no-sandbox", "--disable-dev-shm-usage"]) context = browser.new_context(accept_downloads=True) page = context.new_page() log.info("Starting crawl...") # الذهاب لصفحة البحث المتقدم page.goto("https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons&consAnnulee=1") page.wait_for_load_state("domcontentloaded") # === إعداد معايير البحث === # ملاحظة: عدل التواريخ حسب الحاجة page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneStart").fill("25/04/2026") page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneEnd").fill("25/10/2026") page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneCalculeStart").fill("25/10/2025") page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneCalculeEnd").fill("25/04/2026") # النقر على بحث page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_lancerRecherche").click() page.wait_for_timeout(5000) # انتظار تحميل النتائج max_test = 10 # عدد الصفوف للتجربة processed_count = 0 for i in range(max_test): log.info(f"Processing row {i+1}/{max_test}") # ⚠️ هام: إعادة تعريف locator للصفوف في كل مرة لتجنب مشكلة Stale Element rows = page.locator("tr:has(div[id*='panelAction'])") # التحقق من وجود الصف if rows.count() <= i: log.warning(f"Row {i+1} not found. Ending loop.") break row = rows.nth(i) link = row.locator("a[href*='EntrepriseDetailConsultation']").first # استخراج الرابط try: url = link.get_attribute("href", timeout=5000) except: log.warning(f"Could not get href for row {i+1}. Skipping.") continue if not url: continue full_url = "https://www.marchespublics.gov.ma/" + url log.info(f"Opening: {full_url}") # === فتح صفحة التفاصيل === link.click() page.wait_for_load_state("domcontentloaded") # انتظار إضافي لضمان تحميل المحتوى الديناميكي page.wait_for_timeout(2000) consultation_id = extract_id(page.url) if not consultation_id or consultation_id == "UNKNOWN": log.warning("Could not extract Consultation ID. Skipping.") # محاولة العودة قبل المتابعة try: page.go_back(); page.wait_for_timeout(2000) except: pass continue log.info(f"Consultation ID: {consultation_id}") # === استخراج البيانات النصية === metadata = { "consultation_id": consultation_id, "reference": extract_field_value(page, "Référence"), "objet": extract_field_value(page, "Objet"), "date_limite": extract_field_value(page, "Date et heure limite de remise des plis"), "date_annulation": extract_field_value(page, "Date d'annulation"), "source_url": full_url, "scraped_at": time.strftime("%Y-%m-%d %H:%M:%S") } log.info(f"Extracted Data -> Ref: '{metadata['reference'][:20]}...', DateLim: '{metadata['date_limite']}'") # === التنقل لتبويب الوثائق (Onglet 1) === # نتحقق من وجود التبويب وننقر عليه إذا لزم الأمر onglet = page.locator("#ongletLayer1").first if onglet.is_visible(): onglet.click() page.wait_for_timeout(2000) files_dict = {} # لتخزين مسارات الملفات المحلية # --- 1. معالجة RC (Règlement de consultation) --- rc_locator = page.locator("text=Règlement de consultation").first rc_ok, rc_path, rc_msg = safe_download(page, rc_locator, f"rc_{consultation_id}") metadata["rc_downloaded"] = rc_ok if rc_ok and rc_path: files_dict["rc"] = rc_path log.info(f"[RC] Downloaded: {os.path.basename(rc_path)}") elif rc_msg != "Not found": log.warning(f"[RC] Failed: {rc_msg}") # --- 2. معالجة PV (Extrait de PV) --- # الـ PV غالباً صفحة نصية وليست ملف تحميل مباشر pv_locator = page.locator("text=Extrait de PV").first if pv_locator.is_visible(): try: pv_locator.click() page.wait_for_load_state("domcontentloaded") page.wait_for_timeout(1500) pv_text = page.locator("body").inner_text().strip() metadata["pv_available"] = True metadata["pv_preview"] = pv_text[:300] # حفظ النص كملف محلي pv_file = f"downloads/pv_{consultation_id}.txt" with open(pv_file, "w", encoding="utf-8") as f: f.write(pv_text) files_dict["pv"] = pv_file log.info("[PV] Extracted and saved") # العودة مرتين (كما في المنطق السابق الناجح) for _ in range(2): ret_btn = page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").first if ret_btn.is_visible(): ret_btn.click() page.wait_for_load_state("domcontentloaded") page.wait_for_timeout(1000) except Exception as e: log.warning(f"[PV] Error processing: {e}") metadata["pv_available"] = False # محاولة عودة طارئة try: page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").click() page.wait_for_timeout(1000) except: pass else: metadata["pv_available"] = False # --- 3. معالجة AVIS (Fichier joint / Avis) --- # نبحث عن "Fichier joint" أو "Avis de..." avis_locator = page.locator("text=Fichier joint").first if not avis_locator.is_visible(): avis_locator = page.locator("text=Avis de").first # محاولة بديلة avis_ok, avis_path, avis_msg = safe_download(page, avis_locator, f"avis_{consultation_id}") metadata["avis_downloaded"] = avis_ok if avis_ok and avis_path: files_dict["avis"] = avis_path log.info(f"[AVIS] Downloaded: {os.path.basename(avis_path)}") elif avis_msg != "Not found": log.warning(f"[AVIS] Failed: {avis_msg}") # === العودة لصفحة النتائج === try: # استخدام الزر المحدد للعودة back_btn = page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").first if back_btn.is_visible(): back_btn.click() else: page.go_back() # بديل page.wait_for_load_state("domcontentloaded") page.wait_for_timeout(2000) # استقرار الصفحة log.info("Returned to search results") except Exception as e: log.error(f"Failed to return to search page: {e}") break # إيقاف الحلقة إذا فشل العودة # === الرفع إلى Hugging Face === try: upload_consultation_to_hf(consultation_id, metadata, files_dict, repo_id, api) processed_count += 1 log.info(f"[COMPLETE] {consultation_id} successfully processed and uploaded.") except Exception as e: log.error(f"[FATAL UPLOAD ERROR] for {consultation_id}: {e}") log.info(f"Crawl finished. Total processed: {processed_count}") browser.close() if __name__ == "__main__": run()