scrape.gov / app.py
lljz66's picture
Update app.py
1f16459 verified
import asyncio
import sqlite3
import json
import os
import time
from datetime import datetime
from pathlib import Path
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
import pandas as pd
# ========== الإعدادات ==========
DB_PATH = "/tmp/marches_publics.db"
DATA_DIR = Path("/app/data")
RAW_DIR = DATA_DIR / "raw"
RAW_DIR.mkdir(parents=True, exist_ok=True)
# ========== جميع الروابط الصحيحة ==========
URLS = {
"consultations_en_cours": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons",
"recherche_avancee_consultations": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons",
"avis_achat_en_cours": "https://www.marchespublics.gov.ma/bdc/entreprise/consultation/",
"bons_commande_attribues": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListeAnnonceAnnuelle&typeAnnonce=1",
"toutes_annonces_info": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AvisInformation",
"tous_extraits_pv": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn",
"tous_resultats_definitifs": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn",
"tous_rapports_achevement": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn",
"tous_rapports_presentation": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AvisRapportPresentation",
"toutes_decisions_resiliation": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn",
"recherche_avancee_toutes": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&AllAnn",
"programme_previsionnel": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListePPs",
"synthese_rapport_audit": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListeSRA",
"marches_attribues": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListeAnnonceAnnuelle&typeAnnonce=2",
"conventions_contrats": "https://www.marchespublics.gov.ma/index.php?page=entreprise.ListeAnnonceAnnuelle&typeAnnonce=3",
"consultations_annulees": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons&consAnnulee=1",
"recherche_avancee_annulees": "https://www.marchespublics.gov.ma/index.php?page=entreprise.EntrepriseAdvancedSearch&searchAnnCons&consAnnulee=1",
}
# ========== دوال قاعدة البيانات ==========
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS scraped_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_name TEXT NOT NULL,
source_url TEXT NOT NULL,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
page_number INTEGER,
total_pages INTEGER,
raw_html TEXT,
parsed_data JSON,
status TEXT DEFAULT 'pending',
error_message TEXT
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS pagination_state (
source_name TEXT PRIMARY KEY,
last_page INTEGER DEFAULT 0,
total_items INTEGER,
completed BOOLEAN DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
c.execute('CREATE INDEX IF NOT EXISTS idx_source ON scraped_data(source_name)')
c.execute('CREATE INDEX IF NOT EXISTS idx_status ON scraped_data(status)')
conn.commit()
conn.close()
def backup_db():
import shutil
try:
shutil.copy2(DB_PATH, DATA_DIR / "marches_publics_backup.db")
print("🗄️ Database backed up")
except Exception as e:
print(f"❌ Backup error: {e}")
def export_all_csv():
conn = sqlite3.connect(DB_PATH)
for source in URLS.keys():
df = pd.read_sql_query(
"SELECT source_name, page_number, parsed_data, scraped_at FROM scraped_data WHERE source_name = ? AND status = 'completed'",
conn, params=(source,)
)
if not df.empty:
records = []
for _, row in df.iterrows():
try:
data = json.loads(row['parsed_data'])
for item in data.get('items', []):
item['source'] = row['source_name']
item['page'] = row['page_number']
item['scraped_at'] = row['scraped_at']
records.append(item)
except: pass
if records:
out_df = pd.json_normalize(records)
filename = f"{source}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
out_df.to_csv(DATA_DIR / filename, index=False, encoding='utf-8-sig')
print(f"📁 Exported: {filename}")
conn.close()
# ========== استخراج البيانات ==========
def extract_data(soup, source_name):
data = []
tables = soup.find_all('table')
for table in tables:
rows = table.find_all('tr')
headers = [th.get_text(strip=True) for th in rows[0].find_all(['th', 'td'])] if rows else []
for row in rows[1:]:
cells = row.find_all(['td', 'th'])
row_data = {}
for i, cell in enumerate(cells):
key = headers[i] if i < len(headers) else f'col_{i}'
links = cell.find_all('a')
if links:
row_data[key] = {
'text': cell.get_text(strip=True),
'href': links[0].get('href'),
'links': [a.get('href') for a in links]
}
else:
row_data[key] = cell.get_text(strip=True)
if row_data:
data.append(row_data)
if not data:
items = soup.find_all('div', class_=lambda x: x and ('result' in x.lower() or 'item' in x.lower()))
for item in items:
data.append({'text': item.get_text(strip=True)})
return {
'source': source_name,
'extracted_at': datetime.now().isoformat(),
'item_count': len(data),
'items': data
}
# ========== وظائف الكشط ==========
async def advanced_search_with_retry(page, url, max_retries=2):
"""يفتح صفحة البحث المتقدم وينقر على زر البحث، مع إعادة المحاولة عند الفشل."""
for attempt in range(max_retries):
try:
print(f" ⟳ Attempt {attempt+1}...")
await page.goto(url, wait_until='networkidle', timeout=60000)
submit = page.locator('input[type=submit], button[type=submit], input#rechercher, a.rechercher, form[method=post] input[type=submit]')
if await submit.count() > 0:
await submit.first.click()
await page.wait_for_selector('table, #contenu, .resultats, .liste, .annonce', timeout=15000)
else:
await page.wait_for_timeout(5000)
return await page.content()
except Exception as e:
print(f" ⚠️ محاولة {attempt+1} فشلت: {e}")
await asyncio.sleep(2)
return await page.content()
async def scrape_direct_list(page, url):
"""للصفحات التي تعرض النتائج مباشرة."""
await page.goto(url, wait_until='networkidle', timeout=60000)
await page.wait_for_selector('table, #contenu, .resultats, .liste', timeout=20000)
await page.wait_for_timeout(2000)
return await page.content()
# ========== حفظ الصفحة ==========
def save_page(source, page_num, html):
folder = RAW_DIR / source
folder.mkdir(exist_ok=True)
(folder / f"page_{page_num}.html").write_text(html, encoding='utf-8')
print(f" 💾 حفظ {source}/page_{page_num}.html")
# ========== الحلقة الرئيسية للكشط ==========
async def scrape_all():
print("🚀 بدء عملية الكشط الكاملة...")
init_db()
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu', '--single-process']
)
context = await browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
locale='fr-FR',
timezone_id='Africa/Casablanca'
)
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.chrome = {runtime: {}};
""")
conn = sqlite3.connect(DB_PATH)
for src_name, url in URLS.items():
print(f"\n📄 {src_name}")
page = await context.new_page()
try:
await asyncio.sleep(2)
if "ListeAnnonceAnnuelle" in url or "ListePPs" in url or "ListeSRA" in url:
html = await scrape_direct_list(page, url)
else:
html = await advanced_search_with_retry(page, url)
soup = BeautifulSoup(html, 'html.parser')
total_pages = 1
pagination = soup.select('a[href*="page="]')
if pagination:
nums = [int(a.text) for a in pagination if a.text.isdigit()]
if nums: total_pages = max(nums)
print(f" 📊 إجمالي الصفحات: {total_pages}")
save_page(src_name, 1, html)
data = extract_data(soup, src_name)
conn.execute('''
INSERT INTO scraped_data (source_name, source_url, page_number, total_pages, raw_html, parsed_data, status)
VALUES (?,?,?,?,?,?,?)
''', (src_name, page.url, 1, total_pages, html, json.dumps(data, ensure_ascii=False), 'completed'))
conn.commit()
for pgnum in range(2, total_pages + 1):
await asyncio.sleep(2)
next_url = f"{url}&page={pgnum}" if '?' in url else f"{url}?page={pgnum}"
if "ListeAnnonceAnnuelle" in url or "ListePPs" in url or "ListeSRA" in url:
html = await scrape_direct_list(page, next_url)
else:
html = await advanced_search_with_retry(page, next_url)
save_page(src_name, pgnum, html)
data = extract_data(BeautifulSoup(html, 'html.parser'), src_name)
conn.execute('''
INSERT INTO scraped_data (source_name, source_url, page_number, total_pages, raw_html, parsed_data, status)
VALUES (?,?,?,?,?,?,?)
''', (src_name, page.url, pgnum, total_pages, html, json.dumps(data, ensure_ascii=False), 'completed'))
conn.commit()
print(f" ✅ صفحة {pgnum}")
if pgnum % 10 == 0:
backup_db()
conn.execute('''
INSERT OR REPLACE INTO pagination_state (source_name, last_page, total_items, completed, updated_at)
VALUES (?,?,?,1, CURRENT_TIMESTAMP)
''', (src_name, total_pages, total_pages))
conn.commit()
except Exception as e:
print(f" ❌ خطأ: {e}")
conn.execute('''
INSERT INTO scraped_data (source_name, source_url, status, error_message)
VALUES (?,?,?,?)
''', (src_name, url, 'error', str(e)))
conn.commit()
finally:
await page.close()
conn.close()
await browser.close()
backup_db()
export_all_csv()
print("✅ اكتمل الكشط.")
if __name__ == "__main__":
asyncio.run(scrape_all())
print("🔄 البرنامج في وضع السكون لمنع الإغلاق.")
while True:
time.sleep(3600)