File size: 3,746 Bytes
173c8bb
c6d679f
 
 
 
 
 
 
 
 
173c8bb
c6d679f
 
173c8bb
 
 
c6d679f
173c8bb
c6d679f
 
 
 
 
 
 
 
173c8bb
c6d679f
 
173c8bb
c6d679f
 
 
173c8bb
c6d679f
173c8bb
 
 
 
 
c6d679f
 
173c8bb
 
c6d679f
173c8bb
 
 
 
 
 
 
c6d679f
173c8bb
 
 
c6d679f
 
 
173c8bb
c6d679f
 
173c8bb
c6d679f
 
 
 
 
173c8bb
c6d679f
173c8bb
c6d679f
 
 
173c8bb
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# scraper.py

import time
import re
import logging
import pandas as pd
from datetime import datetime, timedelta
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import undetected_chromedriver as uc

# --- FIX: Changed relative imports to direct imports ---
from csv_util import ensure_csv_header, read_existing_data, write_data_to_csv, merge_new_data
from detail_parser import parse_detail_table, detail_data_to_string

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)

def parse_calendar_day(driver, the_date: datetime, scrape_details=False, existing_df=None) -> pd.DataFrame:
    date_str = the_date.strftime('%b%d.%Y').lower()
    url = f"https://www.forexfactory.com/calendar?day={date_str}"
    logger.info(f"Scraping URL: {url}")
    driver.get(url)
    try:
        WebDriverWait(driver, 20).until(EC.visibility_of_element_located((By.XPATH, '//table[contains(@class,"calendar__table")]')))
    except TimeoutException:
        logger.warning(f"Page did not load for day={the_date.date()}")
        return pd.DataFrame()
    rows = driver.find_elements(By.XPATH, '//tr[contains(@class,"calendar__row")]')
    data_list = []
    for row in rows:
        if "day-breaker" in row.get_attribute("class") or "no-event" in row.get_attribute("class"): continue
        try:
            time_el, currency_el, impact_el, event_el, actual_el, forecast_el, previous_el = [
                row.find_element(By.XPATH, f'.//td[contains(@class,"calendar__{cell}")]')
                for cell in ["time", "currency", "impact", "event", "actual", "forecast", "previous"]
            ]
        except NoSuchElementException: continue
        time_text = time_el.text.strip()
        currency_text = currency_el.text.strip()
        try: impact_text = impact_el.find_element(By.XPATH, './/span').get_attribute("title") or ""
        except Exception: impact_text = ""
        event_text = event_el.text.strip()
        event_dt = the_date
        m = re.match(r'(\d{1,2}):(\d{2})(am|pm)', time_text.lower())
        if m:
            hh, mm, ampm = int(m.group(1)), int(m.group(2)), m.group(3)
            if ampm == 'pm' and hh < 12: hh += 12
            if ampm == 'am' and hh == 12: hh = 0
            event_dt = event_dt.replace(hour=hh, minute=mm, second=0)
        data_list.append({
            "DateTime": event_dt.isoformat(), "Currency": currency_text, "Impact": impact_text,
            "Event": event_text, "Actual": actual_el.text.strip(), "Forecast": forecast_el.text.strip(),
            "Previous": previous_el.text.strip(), "Detail": ""
        })
    return pd.DataFrame(data_list)

def scrape_range_pandas(from_date: datetime, to_date: datetime, output_csv: str, scrape_details=False):
    ensure_csv_header(output_csv)
    existing_df = read_existing_data(output_csv)
    driver = uc.Chrome(headless=True, use_subprocess=False)
    driver.set_window_size(1400, 1000)
    try:
        current_day = from_date
        while current_day <= to_date:
            logger.info(f"Scraping day {current_day.strftime('%Y-%m-%d')}...")
            df_new = parse_calendar_day(driver, current_day, scrape_details=scrape_details, existing_df=existing_df)
            if not df_new.empty:
                existing_df = merge_new_data(existing_df, df_new)
                write_data_to_csv(existing_df, output_csv)
            current_day += timedelta(days=1)
    finally:
        if driver: driver.quit()
    logger.info(f"Scraping done.")