import asyncio import sqlite3 import json import os import time from datetime import datetime from pathlib import Path from playwright.async_api import async_playwright from bs4 import BeautifulSoup import pandas as pd # ========== الإعدادات ========== DB_PATH = "/tmp/marches_publics.db" DATA_DIR = Path("/app/data") RAW_DIR = DATA_DIR / "raw" RAW_DIR.mkdir(parents=True, exist_ok=True) # ========== جميع الروابط الصحيحة ========== URLS = { "consultations_en_cours": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons", "recherche_avancee_consultations": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons", "avis_achat_en_cours": "https://www.marchespublics.gov.ma/bdc/entreprise/consultation/", "bons_commande_attribues": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListeAnnonceAnnuelle&typeAnnonce=1", "toutes_annonces_info": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AvisInformation", "tous_extraits_pv": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn", "tous_resultats_definitifs": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn", "tous_rapports_achevement": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn", "tous_rapports_presentation": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AvisRapportPresentation", "toutes_decisions_resiliation": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn", "recherche_avancee_toutes": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn", "programme_previsionnel": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListePPs", "synthese_rapport_audit": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListeSRA", "marches_attribues": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListeAnnonceAnnuelle&typeAnnonce=2", "conventions_contrats": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListeAnnonceAnnuelle&typeAnnonce=3", "consultations_annulees": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons&consAnnulee=1", "recherche_avancee_annulees": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons&consAnnulee=1", } # ========== دوال قاعدة البيانات ========== def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS scraped_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_name TEXT NOT NULL, source_url TEXT NOT NULL, scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, page_number INTEGER, total_pages INTEGER, raw_html TEXT, parsed_data JSON, status TEXT DEFAULT 'pending', error_message TEXT ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS pagination_state ( source_name TEXT PRIMARY KEY, last_page INTEGER DEFAULT 0, total_items INTEGER, completed BOOLEAN DEFAULT 0, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') c.execute('CREATE INDEX IF NOT EXISTS idx_source ON scraped_data(source_name)') c.execute('CREATE INDEX IF NOT EXISTS idx_status ON scraped_data(status)') conn.commit() conn.close() def backup_db(): import shutil try: shutil.copy2(DB_PATH, DATA_DIR / "marches_publics_backup.db") print("🗄️ Database backed up") except Exception as e: print(f"❌ Backup error: {e}") def export_all_csv(): conn = sqlite3.connect(DB_PATH) for source in URLS.keys(): df = pd.read_sql_query( "SELECT source_name, page_number, parsed_data, scraped_at FROM scraped_data WHERE source_name = ? AND status = 'completed'", conn, params=(source,) ) if not df.empty: records = [] for _, row in df.iterrows(): try: data = json.loads(row['parsed_data']) for item in data.get('items', []): item['source'] = row['source_name'] item['page'] = row['page_number'] item['scraped_at'] = row['scraped_at'] records.append(item) except: pass if records: out_df = pd.json_normalize(records) filename = f"{source}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" out_df.to_csv(DATA_DIR / filename, index=False, encoding='utf-8-sig') print(f"📁 Exported: {filename}") conn.close() # ========== استخراج البيانات ========== def extract_data(soup, source_name): data = [] tables = soup.find_all('table') for table in tables: rows = table.find_all('tr') headers = [th.get_text(strip=True) for th in rows[0].find_all(['th', 'td'])] if rows else [] for row in rows[1:]: cells = row.find_all(['td', 'th']) row_data = {} for i, cell in enumerate(cells): key = headers[i] if i < len(headers) else f'col_{i}' links = cell.find_all('a') if links: row_data[key] = { 'text': cell.get_text(strip=True), 'href': links[0].get('href'), 'links': [a.get('href') for a in links] } else: row_data[key] = cell.get_text(strip=True) if row_data: data.append(row_data) if not data: items = soup.find_all('div', class_=lambda x: x and ('result' in x.lower() or 'item' in x.lower())) for item in items: data.append({'text': item.get_text(strip=True)}) return { 'source': source_name, 'extracted_at': datetime.now().isoformat(), 'item_count': len(data), 'items': data } # ========== وظائف الكشط ========== async def advanced_search_with_retry(page, url, max_retries=2): """يفتح صفحة البحث المتقدم وينقر على زر البحث، مع إعادة المحاولة عند الفشل.""" for attempt in range(max_retries): try: print(f" ⟳ Attempt {attempt+1}...") await page.goto(url, wait_until='networkidle', timeout=60000) submit = page.locator('input[type=submit], button[type=submit], input#rechercher, a.rechercher, form[method=post] input[type=submit]') if await submit.count() > 0: await submit.first.click() await page.wait_for_selector('table, #contenu, .resultats, .liste, .annonce', timeout=15000) else: await page.wait_for_timeout(5000) return await page.content() except Exception as e: print(f" ⚠️ محاولة {attempt+1} فشلت: {e}") await asyncio.sleep(2) return await page.content() async def scrape_direct_list(page, url): """للصفحات التي تعرض النتائج مباشرة.""" await page.goto(url, wait_until='networkidle', timeout=60000) await page.wait_for_selector('table, #contenu, .resultats, .liste', timeout=20000) await page.wait_for_timeout(2000) return await page.content() # ========== حفظ الصفحة ========== def save_page(source, page_num, html): folder = RAW_DIR / source folder.mkdir(exist_ok=True) (folder / f"page_{page_num}.html").write_text(html, encoding='utf-8') print(f" 💾 حفظ {source}/page_{page_num}.html") # ========== الحلقة الرئيسية للكشط ========== async def scrape_all(): print("🚀 بدء عملية الكشط الكاملة...") init_db() async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu', '--single-process'] ) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', locale='fr-FR', timezone_id='Africa/Casablanca' ) await context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); window.chrome = {runtime: {}}; """) conn = sqlite3.connect(DB_PATH) for src_name, url in URLS.items(): print(f"\n📄 {src_name}") page = await context.new_page() try: await asyncio.sleep(2) if "ListeAnnonceAnnuelle" in url or "ListePPs" in url or "ListeSRA" in url: html = await scrape_direct_list(page, url) else: html = await advanced_search_with_retry(page, url) soup = BeautifulSoup(html, 'html.parser') total_pages = 1 pagination = soup.select('a[href*="page="]') if pagination: nums = [int(a.text) for a in pagination if a.text.isdigit()] if nums: total_pages = max(nums) print(f" 📊 إجمالي الصفحات: {total_pages}") save_page(src_name, 1, html) data = extract_data(soup, src_name) conn.execute(''' INSERT INTO scraped_data (source_name, source_url, page_number, total_pages, raw_html, parsed_data, status) VALUES (?,?,?,?,?,?,?) ''', (src_name, page.url, 1, total_pages, html, json.dumps(data, ensure_ascii=False), 'completed')) conn.commit() for pgnum in range(2, total_pages + 1): await asyncio.sleep(2) next_url = f"{url}&page={pgnum}" if '?' in url else f"{url}?page={pgnum}" if "ListeAnnonceAnnuelle" in url or "ListePPs" in url or "ListeSRA" in url: html = await scrape_direct_list(page, next_url) else: html = await advanced_search_with_retry(page, next_url) save_page(src_name, pgnum, html) data = extract_data(BeautifulSoup(html, 'html.parser'), src_name) conn.execute(''' INSERT INTO scraped_data (source_name, source_url, page_number, total_pages, raw_html, parsed_data, status) VALUES (?,?,?,?,?,?,?) ''', (src_name, page.url, pgnum, total_pages, html, json.dumps(data, ensure_ascii=False), 'completed')) conn.commit() print(f" ✅ صفحة {pgnum}") if pgnum % 10 == 0: backup_db() conn.execute(''' INSERT OR REPLACE INTO pagination_state (source_name, last_page, total_items, completed, updated_at) VALUES (?,?,?,1, CURRENT_TIMESTAMP) ''', (src_name, total_pages, total_pages)) conn.commit() except Exception as e: print(f" ❌ خطأ: {e}") conn.execute(''' INSERT INTO scraped_data (source_name, source_url, status, error_message) VALUES (?,?,?,?) ''', (src_name, url, 'error', str(e))) conn.commit() finally: await page.close() conn.close() await browser.close() backup_db() export_all_csv() print("✅ اكتمل الكشط.") if __name__ == "__main__": asyncio.run(scrape_all()) print("🔄 البرنامج في وضع السكون لمنع الإغلاق.") while True: time.sleep(3600)