ADY201M / scraper.py
Zok213
fix
f2826d8
# Ensure scraper.py contains a complete function that can be imported
from playwright.sync_api import sync_playwright
import pandas as pd
from datetime import datetime
import os
import psycopg2
import calendar
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def get_china_cpi(output_dir='data/raw/'):
"""
Scrape China CPI data from investing.com and upload to two Aiven PostgreSQL tables:
- china_cpi_raw: unprocessed data
- china_cpi_processed: processed data (year, month, actual)
"""
url = 'https://www.investing.com/economic-calendar/chinese-cpi-459'
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f"china_cpi_{timestamp}.csv")
# Launch Playwright browser
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=['--disable-blink-features=AutomationControlled'])
context = browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
viewport={'width': 1920, 'height': 1080}
)
page = context.new_page()
page.goto(url, timeout=120000, wait_until='domcontentloaded')
# Handle cookie consent popup
try:
page.click('button#onetrust-accept-btn-handler', timeout=5000)
except:
pass
# Wait for the data table to load
page.wait_for_selector('#eventHistoryTable459', timeout=30000)
# Expand table to load all historical data
max_clicks = 70
click_count = 0
previous_row_count = 0
while click_count < max_clicks:
show_more_btn = page.query_selector('#showMoreHistory459 a:visible')
if not show_more_btn:
break
current_row_count = len(page.query_selector_all('#eventHistoryTable459 tbody tr'))
if current_row_count == previous_row_count:
break
# Fix the JavaScript method name
page.evaluate('document.querySelector("#showMoreHistory459 a").click()')
page.wait_for_function(
f"document.querySelectorAll('#eventHistoryTable459 tbody tr').length > {current_row_count}",
timeout=45000
)
click_count += 1
previous_row_count = current_row_count
# Extract table data into a list of dictionaries
table = page.query_selector('#eventHistoryTable459')
data = []
for row in table.query_selector_all('tbody tr'):
cells = row.query_selector_all('td')
if len(cells) < 5:
continue
date_text = cells[0].inner_text().strip()
date_parts = date_text.split(' (')
release_date = pd.to_datetime(date_parts[0], format='%b %d, %Y', errors='coerce')
reference_month = date_parts[1][:-1] if len(date_parts) > 1 else None
actual_span = cells[2].query_selector('span')
actual_value = actual_span.inner_text().strip() if actual_span else None
actual_status = actual_span.get_attribute('title') if actual_span else None
data.append({
'Release Date': release_date,
'Reference Month': reference_month,
'Time': cells[1].inner_text().strip(),
'Actual': actual_value,
'Actual Status': actual_status,
'Forecast': cells[3].inner_text().strip(),
'Previous': cells[4].inner_text().strip()
})
browser.close()
if not data:
print("No data collected.")
return
# Create DataFrame and preserve raw data
df = pd.DataFrame(data)
df_raw = df.copy() # Copy raw data before any processing
df = df.sort_values('Release Date', ascending=False).reset_index(drop=True)
# Connect to Aiven PostgreSQL database
conn = psycopg2.connect(os.getenv('DB_CONNECTION_STRING'))
cur = conn.cursor()
# Create and populate the unprocessed data table: china_cpi_raw
cur.execute("""
CREATE TABLE IF NOT EXISTS china_cpi_raw (
id SERIAL PRIMARY KEY,
release_date TIMESTAMP,
reference_month TEXT,
time TEXT,
actual TEXT,
actual_status TEXT,
forecast TEXT,
previous TEXT,
UNIQUE (release_date)
)
""")
for _, row in df_raw.iterrows():
cur.execute("""
INSERT INTO china_cpi_raw (release_date, reference_month, time, actual, actual_status, forecast, previous)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (release_date) DO NOTHING
""", (
row['Release Date'],
row['Reference Month'],
row['Time'],
row['Actual'],
row['Actual Status'],
row['Forecast'],
row['Previous']
))
# Process data for the processed table
for col in ['Actual', 'Forecast', 'Previous']:
df[col] = pd.to_numeric(df[col].str.replace('%', '', regex=False), errors='coerce') / 100
month_abbr = {name.lower(): num for num, name in enumerate(calendar.month_abbr) if num}
df['Reference Month Num'] = df['Reference Month'].str.lower().map(month_abbr)
# Drop rows with invalid reference months to ensure integer columns
df = df.dropna(subset=['Reference Month Num'])
df['Release Month'] = df['Release Date'].dt.month
df['Release Year'] = df['Release Date'].dt.year
df['Ref Year'] = df.apply(
lambda x: x['Release Year'] if x['Reference Month Num'] < x['Release Month'] else x['Release Year'] - 1,
axis=1
)
df['Year'] = df['Ref Year'].astype(int)
df['Month'] = df['Reference Month Num'].astype(int)
# Create and populate the processed data table: china_cpi_processed
cur.execute("""
CREATE TABLE IF NOT EXISTS china_cpi_processed (
year INTEGER,
month INTEGER,
actual FLOAT,
PRIMARY KEY (year, month)
)
""")
for _, row in df.iterrows():
cur.execute("""
INSERT INTO china_cpi_processed (year, month, actual)
VALUES (%s, %s, %s)
ON CONFLICT (year, month) DO NOTHING
""", (
row['Year'],
row['Month'],
row['Actual']
))
# Commit changes and close database connection
conn.commit()
cur.close()
conn.close()
print("Data successfully uploaded to Aiven PostgreSQL tables: china_cpi_raw and china_cpi_processed")
if __name__ == "__main__":
get_china_cpi()