File size: 7,029 Bytes
f2826d8
a705291
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2826d8
a705291
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# Ensure scraper.py contains a complete function that can be imported
from playwright.sync_api import sync_playwright
import pandas as pd
from datetime import datetime
import os
import psycopg2
import calendar
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

def get_china_cpi(output_dir='data/raw/'):
    """
    Scrape China CPI data from investing.com and upload to two Aiven PostgreSQL tables:
    - china_cpi_raw: unprocessed data
    - china_cpi_processed: processed data (year, month, actual)
    """
    url = 'https://www.investing.com/economic-calendar/chinese-cpi-459'
    os.makedirs(output_dir, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = os.path.join(output_dir, f"china_cpi_{timestamp}.csv")

    # Launch Playwright browser
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True, args=['--disable-blink-features=AutomationControlled'])
        context = browser.new_context(
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            viewport={'width': 1920, 'height': 1080}
        )
        page = context.new_page()
        page.goto(url, timeout=120000, wait_until='domcontentloaded')

        # Handle cookie consent popup
        try:
            page.click('button#onetrust-accept-btn-handler', timeout=5000)
        except:
            pass

        # Wait for the data table to load
        page.wait_for_selector('#eventHistoryTable459', timeout=30000)

        # Expand table to load all historical data
        max_clicks = 70
        click_count = 0
        previous_row_count = 0
        while click_count < max_clicks:
            show_more_btn = page.query_selector('#showMoreHistory459 a:visible')
            if not show_more_btn:
                break
            current_row_count = len(page.query_selector_all('#eventHistoryTable459 tbody tr'))
            if current_row_count == previous_row_count:
                break
            # Fix the JavaScript method name
            page.evaluate('document.querySelector("#showMoreHistory459 a").click()')
            page.wait_for_function(
                f"document.querySelectorAll('#eventHistoryTable459 tbody tr').length > {current_row_count}",
                timeout=45000
            )
            click_count += 1
            previous_row_count = current_row_count

        # Extract table data into a list of dictionaries
        table = page.query_selector('#eventHistoryTable459')
        data = []
        for row in table.query_selector_all('tbody tr'):
            cells = row.query_selector_all('td')
            if len(cells) < 5:
                continue
            date_text = cells[0].inner_text().strip()
            date_parts = date_text.split(' (')
            release_date = pd.to_datetime(date_parts[0], format='%b %d, %Y', errors='coerce')
            reference_month = date_parts[1][:-1] if len(date_parts) > 1 else None
            actual_span = cells[2].query_selector('span')
            actual_value = actual_span.inner_text().strip() if actual_span else None
            actual_status = actual_span.get_attribute('title') if actual_span else None
            data.append({
                'Release Date': release_date,
                'Reference Month': reference_month,
                'Time': cells[1].inner_text().strip(),
                'Actual': actual_value,
                'Actual Status': actual_status,
                'Forecast': cells[3].inner_text().strip(),
                'Previous': cells[4].inner_text().strip()
            })

        browser.close()

        if not data:
            print("No data collected.")
            return

        # Create DataFrame and preserve raw data
        df = pd.DataFrame(data)
        df_raw = df.copy()  # Copy raw data before any processing
        df = df.sort_values('Release Date', ascending=False).reset_index(drop=True)

        # Connect to Aiven PostgreSQL database
        conn = psycopg2.connect(os.getenv('DB_CONNECTION_STRING'))
        cur = conn.cursor()

        # Create and populate the unprocessed data table: china_cpi_raw
        cur.execute("""
            CREATE TABLE IF NOT EXISTS china_cpi_raw (
                id SERIAL PRIMARY KEY,
                release_date TIMESTAMP,
                reference_month TEXT,
                time TEXT,
                actual TEXT,
                actual_status TEXT,
                forecast TEXT,
                previous TEXT,
                UNIQUE (release_date)
            )
        """)
        for _, row in df_raw.iterrows():
            cur.execute("""
                INSERT INTO china_cpi_raw (release_date, reference_month, time, actual, actual_status, forecast, previous)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (release_date) DO NOTHING
            """, (
                row['Release Date'],
                row['Reference Month'],
                row['Time'],
                row['Actual'],
                row['Actual Status'],
                row['Forecast'],
                row['Previous']
            ))

        # Process data for the processed table
        for col in ['Actual', 'Forecast', 'Previous']:
            df[col] = pd.to_numeric(df[col].str.replace('%', '', regex=False), errors='coerce') / 100

        month_abbr = {name.lower(): num for num, name in enumerate(calendar.month_abbr) if num}
        df['Reference Month Num'] = df['Reference Month'].str.lower().map(month_abbr)
        # Drop rows with invalid reference months to ensure integer columns
        df = df.dropna(subset=['Reference Month Num'])
        df['Release Month'] = df['Release Date'].dt.month
        df['Release Year'] = df['Release Date'].dt.year
        df['Ref Year'] = df.apply(
            lambda x: x['Release Year'] if x['Reference Month Num'] < x['Release Month'] else x['Release Year'] - 1,
            axis=1
        )
        df['Year'] = df['Ref Year'].astype(int)
        df['Month'] = df['Reference Month Num'].astype(int)

        # Create and populate the processed data table: china_cpi_processed
        cur.execute("""
            CREATE TABLE IF NOT EXISTS china_cpi_processed (
                year INTEGER,
                month INTEGER,
                actual FLOAT,
                PRIMARY KEY (year, month)
            )
        """)
        for _, row in df.iterrows():
            cur.execute("""
                INSERT INTO china_cpi_processed (year, month, actual)
                VALUES (%s, %s, %s)
                ON CONFLICT (year, month) DO NOTHING
            """, (
                row['Year'],
                row['Month'],
                row['Actual']
            ))

        # Commit changes and close database connection
        conn.commit()
        cur.close()
        conn.close()
        print("Data successfully uploaded to Aiven PostgreSQL tables: china_cpi_raw and china_cpi_processed")

if __name__ == "__main__":
    get_china_cpi()