File size: 7,029 Bytes
f2826d8 a705291 f2826d8 a705291 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | # Ensure scraper.py contains a complete function that can be imported
from playwright.sync_api import sync_playwright
import pandas as pd
from datetime import datetime
import os
import psycopg2
import calendar
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def get_china_cpi(output_dir='data/raw/'):
"""
Scrape China CPI data from investing.com and upload to two Aiven PostgreSQL tables:
- china_cpi_raw: unprocessed data
- china_cpi_processed: processed data (year, month, actual)
"""
url = 'https://www.investing.com/economic-calendar/chinese-cpi-459'
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f"china_cpi_{timestamp}.csv")
# Launch Playwright browser
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=['--disable-blink-features=AutomationControlled'])
context = browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
viewport={'width': 1920, 'height': 1080}
)
page = context.new_page()
page.goto(url, timeout=120000, wait_until='domcontentloaded')
# Handle cookie consent popup
try:
page.click('button#onetrust-accept-btn-handler', timeout=5000)
except:
pass
# Wait for the data table to load
page.wait_for_selector('#eventHistoryTable459', timeout=30000)
# Expand table to load all historical data
max_clicks = 70
click_count = 0
previous_row_count = 0
while click_count < max_clicks:
show_more_btn = page.query_selector('#showMoreHistory459 a:visible')
if not show_more_btn:
break
current_row_count = len(page.query_selector_all('#eventHistoryTable459 tbody tr'))
if current_row_count == previous_row_count:
break
# Fix the JavaScript method name
page.evaluate('document.querySelector("#showMoreHistory459 a").click()')
page.wait_for_function(
f"document.querySelectorAll('#eventHistoryTable459 tbody tr').length > {current_row_count}",
timeout=45000
)
click_count += 1
previous_row_count = current_row_count
# Extract table data into a list of dictionaries
table = page.query_selector('#eventHistoryTable459')
data = []
for row in table.query_selector_all('tbody tr'):
cells = row.query_selector_all('td')
if len(cells) < 5:
continue
date_text = cells[0].inner_text().strip()
date_parts = date_text.split(' (')
release_date = pd.to_datetime(date_parts[0], format='%b %d, %Y', errors='coerce')
reference_month = date_parts[1][:-1] if len(date_parts) > 1 else None
actual_span = cells[2].query_selector('span')
actual_value = actual_span.inner_text().strip() if actual_span else None
actual_status = actual_span.get_attribute('title') if actual_span else None
data.append({
'Release Date': release_date,
'Reference Month': reference_month,
'Time': cells[1].inner_text().strip(),
'Actual': actual_value,
'Actual Status': actual_status,
'Forecast': cells[3].inner_text().strip(),
'Previous': cells[4].inner_text().strip()
})
browser.close()
if not data:
print("No data collected.")
return
# Create DataFrame and preserve raw data
df = pd.DataFrame(data)
df_raw = df.copy() # Copy raw data before any processing
df = df.sort_values('Release Date', ascending=False).reset_index(drop=True)
# Connect to Aiven PostgreSQL database
conn = psycopg2.connect(os.getenv('DB_CONNECTION_STRING'))
cur = conn.cursor()
# Create and populate the unprocessed data table: china_cpi_raw
cur.execute("""
CREATE TABLE IF NOT EXISTS china_cpi_raw (
id SERIAL PRIMARY KEY,
release_date TIMESTAMP,
reference_month TEXT,
time TEXT,
actual TEXT,
actual_status TEXT,
forecast TEXT,
previous TEXT,
UNIQUE (release_date)
)
""")
for _, row in df_raw.iterrows():
cur.execute("""
INSERT INTO china_cpi_raw (release_date, reference_month, time, actual, actual_status, forecast, previous)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (release_date) DO NOTHING
""", (
row['Release Date'],
row['Reference Month'],
row['Time'],
row['Actual'],
row['Actual Status'],
row['Forecast'],
row['Previous']
))
# Process data for the processed table
for col in ['Actual', 'Forecast', 'Previous']:
df[col] = pd.to_numeric(df[col].str.replace('%', '', regex=False), errors='coerce') / 100
month_abbr = {name.lower(): num for num, name in enumerate(calendar.month_abbr) if num}
df['Reference Month Num'] = df['Reference Month'].str.lower().map(month_abbr)
# Drop rows with invalid reference months to ensure integer columns
df = df.dropna(subset=['Reference Month Num'])
df['Release Month'] = df['Release Date'].dt.month
df['Release Year'] = df['Release Date'].dt.year
df['Ref Year'] = df.apply(
lambda x: x['Release Year'] if x['Reference Month Num'] < x['Release Month'] else x['Release Year'] - 1,
axis=1
)
df['Year'] = df['Ref Year'].astype(int)
df['Month'] = df['Reference Month Num'].astype(int)
# Create and populate the processed data table: china_cpi_processed
cur.execute("""
CREATE TABLE IF NOT EXISTS china_cpi_processed (
year INTEGER,
month INTEGER,
actual FLOAT,
PRIMARY KEY (year, month)
)
""")
for _, row in df.iterrows():
cur.execute("""
INSERT INTO china_cpi_processed (year, month, actual)
VALUES (%s, %s, %s)
ON CONFLICT (year, month) DO NOTHING
""", (
row['Year'],
row['Month'],
row['Actual']
))
# Commit changes and close database connection
conn.commit()
cur.close()
conn.close()
print("Data successfully uploaded to Aiven PostgreSQL tables: china_cpi_raw and china_cpi_processed")
if __name__ == "__main__":
get_china_cpi() |