# Ensure scraper.py contains a complete function that can be imported from playwright.sync_api import sync_playwright import pandas as pd from datetime import datetime import os import psycopg2 import calendar from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() def get_china_cpi(output_dir='data/raw/'): """ Scrape China CPI data from investing.com and upload to two Aiven PostgreSQL tables: - china_cpi_raw: unprocessed data - china_cpi_processed: processed data (year, month, actual) """ url = 'https://www.investing.com/economic-calendar/chinese-cpi-459' os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = os.path.join(output_dir, f"china_cpi_{timestamp}.csv") # Launch Playwright browser with sync_playwright() as p: browser = p.chromium.launch(headless=True, args=['--disable-blink-features=AutomationControlled']) context = browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', viewport={'width': 1920, 'height': 1080} ) page = context.new_page() page.goto(url, timeout=120000, wait_until='domcontentloaded') # Handle cookie consent popup try: page.click('button#onetrust-accept-btn-handler', timeout=5000) except: pass # Wait for the data table to load page.wait_for_selector('#eventHistoryTable459', timeout=30000) # Expand table to load all historical data max_clicks = 70 click_count = 0 previous_row_count = 0 while click_count < max_clicks: show_more_btn = page.query_selector('#showMoreHistory459 a:visible') if not show_more_btn: break current_row_count = len(page.query_selector_all('#eventHistoryTable459 tbody tr')) if current_row_count == previous_row_count: break # Fix the JavaScript method name page.evaluate('document.querySelector("#showMoreHistory459 a").click()') page.wait_for_function( f"document.querySelectorAll('#eventHistoryTable459 tbody tr').length > {current_row_count}", timeout=45000 ) click_count += 1 previous_row_count = current_row_count # Extract table data into a list of dictionaries table = page.query_selector('#eventHistoryTable459') data = [] for row in table.query_selector_all('tbody tr'): cells = row.query_selector_all('td') if len(cells) < 5: continue date_text = cells[0].inner_text().strip() date_parts = date_text.split(' (') release_date = pd.to_datetime(date_parts[0], format='%b %d, %Y', errors='coerce') reference_month = date_parts[1][:-1] if len(date_parts) > 1 else None actual_span = cells[2].query_selector('span') actual_value = actual_span.inner_text().strip() if actual_span else None actual_status = actual_span.get_attribute('title') if actual_span else None data.append({ 'Release Date': release_date, 'Reference Month': reference_month, 'Time': cells[1].inner_text().strip(), 'Actual': actual_value, 'Actual Status': actual_status, 'Forecast': cells[3].inner_text().strip(), 'Previous': cells[4].inner_text().strip() }) browser.close() if not data: print("No data collected.") return # Create DataFrame and preserve raw data df = pd.DataFrame(data) df_raw = df.copy() # Copy raw data before any processing df = df.sort_values('Release Date', ascending=False).reset_index(drop=True) # Connect to Aiven PostgreSQL database conn = psycopg2.connect(os.getenv('DB_CONNECTION_STRING')) cur = conn.cursor() # Create and populate the unprocessed data table: china_cpi_raw cur.execute(""" CREATE TABLE IF NOT EXISTS china_cpi_raw ( id SERIAL PRIMARY KEY, release_date TIMESTAMP, reference_month TEXT, time TEXT, actual TEXT, actual_status TEXT, forecast TEXT, previous TEXT, UNIQUE (release_date) ) """) for _, row in df_raw.iterrows(): cur.execute(""" INSERT INTO china_cpi_raw (release_date, reference_month, time, actual, actual_status, forecast, previous) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (release_date) DO NOTHING """, ( row['Release Date'], row['Reference Month'], row['Time'], row['Actual'], row['Actual Status'], row['Forecast'], row['Previous'] )) # Process data for the processed table for col in ['Actual', 'Forecast', 'Previous']: df[col] = pd.to_numeric(df[col].str.replace('%', '', regex=False), errors='coerce') / 100 month_abbr = {name.lower(): num for num, name in enumerate(calendar.month_abbr) if num} df['Reference Month Num'] = df['Reference Month'].str.lower().map(month_abbr) # Drop rows with invalid reference months to ensure integer columns df = df.dropna(subset=['Reference Month Num']) df['Release Month'] = df['Release Date'].dt.month df['Release Year'] = df['Release Date'].dt.year df['Ref Year'] = df.apply( lambda x: x['Release Year'] if x['Reference Month Num'] < x['Release Month'] else x['Release Year'] - 1, axis=1 ) df['Year'] = df['Ref Year'].astype(int) df['Month'] = df['Reference Month Num'].astype(int) # Create and populate the processed data table: china_cpi_processed cur.execute(""" CREATE TABLE IF NOT EXISTS china_cpi_processed ( year INTEGER, month INTEGER, actual FLOAT, PRIMARY KEY (year, month) ) """) for _, row in df.iterrows(): cur.execute(""" INSERT INTO china_cpi_processed (year, month, actual) VALUES (%s, %s, %s) ON CONFLICT (year, month) DO NOTHING """, ( row['Year'], row['Month'], row['Actual'] )) # Commit changes and close database connection conn.commit() cur.close() conn.close() print("Data successfully uploaded to Aiven PostgreSQL tables: china_cpi_raw and china_cpi_processed") if __name__ == "__main__": get_china_cpi()