|
|
|
|
|
from playwright.sync_api import sync_playwright |
|
|
import pandas as pd |
|
|
from datetime import datetime |
|
|
import os |
|
|
import psycopg2 |
|
|
import calendar |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
def get_china_cpi(output_dir='data/raw/'): |
|
|
""" |
|
|
Scrape China CPI data from investing.com and upload to two Aiven PostgreSQL tables: |
|
|
- china_cpi_raw: unprocessed data |
|
|
- china_cpi_processed: processed data (year, month, actual) |
|
|
""" |
|
|
url = 'https://www.investing.com/economic-calendar/chinese-cpi-459' |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
output_file = os.path.join(output_dir, f"china_cpi_{timestamp}.csv") |
|
|
|
|
|
|
|
|
with sync_playwright() as p: |
|
|
browser = p.chromium.launch(headless=True, args=['--disable-blink-features=AutomationControlled']) |
|
|
context = browser.new_context( |
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
|
|
viewport={'width': 1920, 'height': 1080} |
|
|
) |
|
|
page = context.new_page() |
|
|
page.goto(url, timeout=120000, wait_until='domcontentloaded') |
|
|
|
|
|
|
|
|
try: |
|
|
page.click('button#onetrust-accept-btn-handler', timeout=5000) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
page.wait_for_selector('#eventHistoryTable459', timeout=30000) |
|
|
|
|
|
|
|
|
max_clicks = 70 |
|
|
click_count = 0 |
|
|
previous_row_count = 0 |
|
|
while click_count < max_clicks: |
|
|
show_more_btn = page.query_selector('#showMoreHistory459 a:visible') |
|
|
if not show_more_btn: |
|
|
break |
|
|
current_row_count = len(page.query_selector_all('#eventHistoryTable459 tbody tr')) |
|
|
if current_row_count == previous_row_count: |
|
|
break |
|
|
|
|
|
page.evaluate('document.querySelector("#showMoreHistory459 a").click()') |
|
|
page.wait_for_function( |
|
|
f"document.querySelectorAll('#eventHistoryTable459 tbody tr').length > {current_row_count}", |
|
|
timeout=45000 |
|
|
) |
|
|
click_count += 1 |
|
|
previous_row_count = current_row_count |
|
|
|
|
|
|
|
|
table = page.query_selector('#eventHistoryTable459') |
|
|
data = [] |
|
|
for row in table.query_selector_all('tbody tr'): |
|
|
cells = row.query_selector_all('td') |
|
|
if len(cells) < 5: |
|
|
continue |
|
|
date_text = cells[0].inner_text().strip() |
|
|
date_parts = date_text.split(' (') |
|
|
release_date = pd.to_datetime(date_parts[0], format='%b %d, %Y', errors='coerce') |
|
|
reference_month = date_parts[1][:-1] if len(date_parts) > 1 else None |
|
|
actual_span = cells[2].query_selector('span') |
|
|
actual_value = actual_span.inner_text().strip() if actual_span else None |
|
|
actual_status = actual_span.get_attribute('title') if actual_span else None |
|
|
data.append({ |
|
|
'Release Date': release_date, |
|
|
'Reference Month': reference_month, |
|
|
'Time': cells[1].inner_text().strip(), |
|
|
'Actual': actual_value, |
|
|
'Actual Status': actual_status, |
|
|
'Forecast': cells[3].inner_text().strip(), |
|
|
'Previous': cells[4].inner_text().strip() |
|
|
}) |
|
|
|
|
|
browser.close() |
|
|
|
|
|
if not data: |
|
|
print("No data collected.") |
|
|
return |
|
|
|
|
|
|
|
|
df = pd.DataFrame(data) |
|
|
df_raw = df.copy() |
|
|
df = df.sort_values('Release Date', ascending=False).reset_index(drop=True) |
|
|
|
|
|
|
|
|
conn = psycopg2.connect(os.getenv('DB_CONNECTION_STRING')) |
|
|
cur = conn.cursor() |
|
|
|
|
|
|
|
|
cur.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS china_cpi_raw ( |
|
|
id SERIAL PRIMARY KEY, |
|
|
release_date TIMESTAMP, |
|
|
reference_month TEXT, |
|
|
time TEXT, |
|
|
actual TEXT, |
|
|
actual_status TEXT, |
|
|
forecast TEXT, |
|
|
previous TEXT, |
|
|
UNIQUE (release_date) |
|
|
) |
|
|
""") |
|
|
for _, row in df_raw.iterrows(): |
|
|
cur.execute(""" |
|
|
INSERT INTO china_cpi_raw (release_date, reference_month, time, actual, actual_status, forecast, previous) |
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s) |
|
|
ON CONFLICT (release_date) DO NOTHING |
|
|
""", ( |
|
|
row['Release Date'], |
|
|
row['Reference Month'], |
|
|
row['Time'], |
|
|
row['Actual'], |
|
|
row['Actual Status'], |
|
|
row['Forecast'], |
|
|
row['Previous'] |
|
|
)) |
|
|
|
|
|
|
|
|
for col in ['Actual', 'Forecast', 'Previous']: |
|
|
df[col] = pd.to_numeric(df[col].str.replace('%', '', regex=False), errors='coerce') / 100 |
|
|
|
|
|
month_abbr = {name.lower(): num for num, name in enumerate(calendar.month_abbr) if num} |
|
|
df['Reference Month Num'] = df['Reference Month'].str.lower().map(month_abbr) |
|
|
|
|
|
df = df.dropna(subset=['Reference Month Num']) |
|
|
df['Release Month'] = df['Release Date'].dt.month |
|
|
df['Release Year'] = df['Release Date'].dt.year |
|
|
df['Ref Year'] = df.apply( |
|
|
lambda x: x['Release Year'] if x['Reference Month Num'] < x['Release Month'] else x['Release Year'] - 1, |
|
|
axis=1 |
|
|
) |
|
|
df['Year'] = df['Ref Year'].astype(int) |
|
|
df['Month'] = df['Reference Month Num'].astype(int) |
|
|
|
|
|
|
|
|
cur.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS china_cpi_processed ( |
|
|
year INTEGER, |
|
|
month INTEGER, |
|
|
actual FLOAT, |
|
|
PRIMARY KEY (year, month) |
|
|
) |
|
|
""") |
|
|
for _, row in df.iterrows(): |
|
|
cur.execute(""" |
|
|
INSERT INTO china_cpi_processed (year, month, actual) |
|
|
VALUES (%s, %s, %s) |
|
|
ON CONFLICT (year, month) DO NOTHING |
|
|
""", ( |
|
|
row['Year'], |
|
|
row['Month'], |
|
|
row['Actual'] |
|
|
)) |
|
|
|
|
|
|
|
|
conn.commit() |
|
|
cur.close() |
|
|
conn.close() |
|
|
print("Data successfully uploaded to Aiven PostgreSQL tables: china_cpi_raw and china_cpi_processed") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
get_china_cpi() |