| from playwright.sync_api import sync_playwright |
| import os, json, time, logging, gzip, shutil |
| from urllib.parse import urlparse, parse_qs |
| from huggingface_hub import HfApi |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s | %(levelname)-8s | %(message)s', |
| datefmt='%Y-%m-%d %H:%M:%S' |
| ) |
| log = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| def extract_id(url): |
| """استخراج معرف الاستشارة من الرابط""" |
| try: |
| return parse_qs(urlparse(url).query).get("refConsultation", [""])[0] |
| except: |
| return "UNKNOWN" |
|
|
| def extract_field_value(page, label_text): |
| """ |
| استخراج القيمة لحقل معين بذكاء. |
| يبحث عن التسمية، ثم يحاول استخراج النص المجاور لها أو الذي يليها. |
| """ |
| try: |
| |
| |
| locators = page.locator(f"text={label_text}").all() |
| |
| for el in locators: |
| if not el.is_visible(): |
| continue |
| |
| |
| full_text = el.inner_text().strip() |
| |
| |
| clean_full = " ".join(full_text.split()) |
| |
| |
| for sep in [" : ", ":", " : "]: |
| if sep in clean_full: |
| parts = clean_full.split(sep, 1) |
| if len(parts) > 1: |
| val = parts[1].strip() |
| if val and val != label_text: |
| return val |
|
|
| |
| |
| parent = el.locator("xpath=..") |
| if parent.count() > 0: |
| |
| |
| siblings = parent.locator("td, div").all() |
| for sib in siblings: |
| sib_text = sib.inner_text().strip() |
| if sib_text and label_text not in sib_text and len(sib_text) > 2: |
| return sib_text |
|
|
| |
| if clean_full.startswith(label_text): |
| val = clean_full[len(label_text):].strip() |
| |
| val = val.lstrip(":").strip() |
| if val: |
| return val |
|
|
| return "" |
| except Exception as e: |
| |
| return "" |
|
|
| def safe_download(page, locator, file_prefix, download_dir="downloads"): |
| """تحميل ملف بأمان وإرجاع المسار المحلي""" |
| os.makedirs(download_dir, exist_ok=True) |
| if locator.count() == 0: |
| return False, None, "Not found" |
| |
| try: |
| with page.expect_download(timeout=20000) as dl_info: |
| locator.first.click() |
| download = dl_info.value |
| |
| |
| suggested_name = download.suggested_filename or "file" |
| _, ext = os.path.splitext(suggested_name) |
| if not ext: |
| ext = ".bin" |
| |
| local_path = os.path.join(download_dir, f"{file_prefix}{ext}") |
| download.save_as(local_path) |
| return True, local_path, "Success" |
| except Exception as e: |
| return False, None, str(e) |
|
|
| def compress_file_gzip(input_path): |
| """ضغط ملف باستخدام gzip لتوفير المساحة على HF""" |
| output_path = input_path + ".gz" |
| with open(input_path, 'rb') as f_in, gzip.open(output_path, 'wb') as f_out: |
| shutil.copyfileobj(f_in, f_out) |
| return output_path |
|
|
| def upload_consultation_to_hf(consultation_id, metadata, files_dict, repo_id, api): |
| """ |
| رفع بيانات استشارة واحدة إلى Hugging Face Dataset |
| الهيكل: data/consultations/{ID}/metadata.json + files/ |
| """ |
| base_repo_path = f"data/consultations/{consultation_id}" |
| |
| |
| meta_filename = f"temp_meta_{consultation_id}.json" |
| try: |
| with open(meta_filename, "w", encoding="utf-8") as f: |
| json.dump(metadata, f, ensure_ascii=False, indent=2) |
| |
| api.upload_file( |
| path_or_fileobj=meta_filename, |
| path_in_repo=f"{base_repo_path}/metadata.json", |
| repo_id=repo_id, |
| repo_type="dataset" |
| ) |
| log.info(f"[UPLOAD] {base_repo_path}/metadata.json") |
| finally: |
| if os.path.exists(meta_filename): |
| os.remove(meta_filename) |
|
|
| |
| for file_type, local_path in files_dict.items(): |
| if not local_path or not os.path.exists(local_path): |
| continue |
| |
| file_name = os.path.basename(local_path) |
| repo_file_path = f"{base_repo_path}/files/{file_name}" |
| |
| |
| file_size = os.path.getsize(local_path) |
| upload_path = local_path |
| |
| if file_size > 5 * 1024 * 1024: |
| log.info(f"[COMPRESS] Compressing {file_name} ({file_size/1024/1024:.2f} MB)...") |
| upload_path = compress_file_gzip(local_path) |
| repo_file_path += ".gz" |
| |
| try: |
| api.upload_file( |
| path_or_fileobj=upload_path, |
| path_in_repo=repo_file_path, |
| repo_id=repo_id, |
| repo_type="dataset" |
| ) |
| log.info(f"[UPLOAD] {repo_file_path}") |
| except Exception as e: |
| log.error(f"[UPLOAD ERROR] Failed to upload {file_name}: {e}") |
| finally: |
| |
| if upload_path != local_path and os.path.exists(upload_path): |
| os.remove(upload_path) |
|
|
| |
| |
| |
| def run(): |
| repo_id = "lljz66/opentender_morocco_data" |
| |
| api = HfApi() |
| |
| with sync_playwright() as p: |
| browser = p.chromium.launch(headless=True, args=["--no-sandbox", "--disable-dev-shm-usage"]) |
| context = browser.new_context(accept_downloads=True) |
| page = context.new_page() |
|
|
| log.info("Starting crawl...") |
| |
| |
| page.goto("https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons&consAnnulee=1") |
| page.wait_for_load_state("domcontentloaded") |
|
|
| |
| |
| page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneStart").fill("25/04/2026") |
| page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneEnd").fill("25/10/2026") |
| page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneCalculeStart").fill("25/10/2025") |
| page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_dateMiseEnLigneCalculeEnd").fill("25/04/2026") |
| |
| |
| page.locator("#ctl0_CONTENU_PAGE_AdvancedSearch_lancerRecherche").click() |
| page.wait_for_timeout(5000) |
|
|
| max_test = 10 |
| processed_count = 0 |
|
|
| for i in range(max_test): |
| log.info(f"Processing row {i+1}/{max_test}") |
| |
| |
| rows = page.locator("tr:has(div[id*='panelAction'])") |
| |
| |
| if rows.count() <= i: |
| log.warning(f"Row {i+1} not found. Ending loop.") |
| break |
| |
| row = rows.nth(i) |
| link = row.locator("a[href*='EntrepriseDetailConsultation']").first |
| |
| |
| try: |
| url = link.get_attribute("href", timeout=5000) |
| except: |
| log.warning(f"Could not get href for row {i+1}. Skipping.") |
| continue |
| |
| if not url: |
| continue |
| |
| full_url = "https://www.marchespublics.gov.ma/" + url |
| log.info(f"Opening: {full_url}") |
|
|
| |
| link.click() |
| page.wait_for_load_state("domcontentloaded") |
| |
| page.wait_for_timeout(2000) |
|
|
| consultation_id = extract_id(page.url) |
| if not consultation_id or consultation_id == "UNKNOWN": |
| log.warning("Could not extract Consultation ID. Skipping.") |
| |
| try: page.go_back(); page.wait_for_timeout(2000) |
| except: pass |
| continue |
| |
| log.info(f"Consultation ID: {consultation_id}") |
|
|
| |
| metadata = { |
| "consultation_id": consultation_id, |
| "reference": extract_field_value(page, "Référence"), |
| "objet": extract_field_value(page, "Objet"), |
| "date_limite": extract_field_value(page, "Date et heure limite de remise des plis"), |
| "date_annulation": extract_field_value(page, "Date d'annulation"), |
| "source_url": full_url, |
| "scraped_at": time.strftime("%Y-%m-%d %H:%M:%S") |
| } |
| |
| log.info(f"Extracted Data -> Ref: '{metadata['reference'][:20]}...', DateLim: '{metadata['date_limite']}'") |
|
|
| |
| |
| onglet = page.locator("#ongletLayer1").first |
| if onglet.is_visible(): |
| onglet.click() |
| page.wait_for_timeout(2000) |
|
|
| files_dict = {} |
|
|
| |
| rc_locator = page.locator("text=Règlement de consultation").first |
| rc_ok, rc_path, rc_msg = safe_download(page, rc_locator, f"rc_{consultation_id}") |
| metadata["rc_downloaded"] = rc_ok |
| if rc_ok and rc_path: |
| files_dict["rc"] = rc_path |
| log.info(f"[RC] Downloaded: {os.path.basename(rc_path)}") |
| elif rc_msg != "Not found": |
| log.warning(f"[RC] Failed: {rc_msg}") |
|
|
| |
| |
| pv_locator = page.locator("text=Extrait de PV").first |
| if pv_locator.is_visible(): |
| try: |
| pv_locator.click() |
| page.wait_for_load_state("domcontentloaded") |
| page.wait_for_timeout(1500) |
| |
| pv_text = page.locator("body").inner_text().strip() |
| metadata["pv_available"] = True |
| metadata["pv_preview"] = pv_text[:300] |
| |
| |
| pv_file = f"downloads/pv_{consultation_id}.txt" |
| with open(pv_file, "w", encoding="utf-8") as f: |
| f.write(pv_text) |
| files_dict["pv"] = pv_file |
| log.info("[PV] Extracted and saved") |
| |
| |
| for _ in range(2): |
| ret_btn = page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").first |
| if ret_btn.is_visible(): |
| ret_btn.click() |
| page.wait_for_load_state("domcontentloaded") |
| page.wait_for_timeout(1000) |
| except Exception as e: |
| log.warning(f"[PV] Error processing: {e}") |
| metadata["pv_available"] = False |
| |
| try: |
| page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").click() |
| page.wait_for_timeout(1000) |
| except: pass |
| else: |
| metadata["pv_available"] = False |
|
|
| |
| |
| avis_locator = page.locator("text=Fichier joint").first |
| if not avis_locator.is_visible(): |
| avis_locator = page.locator("text=Avis de").first |
| |
| avis_ok, avis_path, avis_msg = safe_download(page, avis_locator, f"avis_{consultation_id}") |
| metadata["avis_downloaded"] = avis_ok |
| if avis_ok and avis_path: |
| files_dict["avis"] = avis_path |
| log.info(f"[AVIS] Downloaded: {os.path.basename(avis_path)}") |
| elif avis_msg != "Not found": |
| log.warning(f"[AVIS] Failed: {avis_msg}") |
|
|
| |
| try: |
| |
| back_btn = page.locator("#ctl0_CONTENU_PAGE_linkRetourBas").first |
| if back_btn.is_visible(): |
| back_btn.click() |
| else: |
| page.go_back() |
| |
| page.wait_for_load_state("domcontentloaded") |
| page.wait_for_timeout(2000) |
| log.info("Returned to search results") |
| except Exception as e: |
| log.error(f"Failed to return to search page: {e}") |
| break |
|
|
| |
| try: |
| upload_consultation_to_hf(consultation_id, metadata, files_dict, repo_id, api) |
| processed_count += 1 |
| log.info(f"[COMPLETE] {consultation_id} successfully processed and uploaded.") |
| except Exception as e: |
| log.error(f"[FATAL UPLOAD ERROR] for {consultation_id}: {e}") |
|
|
| log.info(f"Crawl finished. Total processed: {processed_count}") |
| browser.close() |
|
|
| if __name__ == "__main__": |
| run() |