ADY201M / app.py
Zok213
fix
f2826d8
from fastapi import FastAPI, Query, BackgroundTasks
from typing import Optional
from datetime import date
import psycopg2
import os
from dotenv import load_dotenv
# Import your scraper function
from scraper import get_china_cpi
# Load environment variables from .env file (optional, for local development)
load_dotenv()
app = FastAPI()
def get_db_connection():
conn = psycopg2.connect(os.getenv('DB_CONNECTION_STRING'))
return conn
# Add a new endpoint to run the scraper
@app.get("/run-scraper")
async def run_scraper(background_tasks: BackgroundTasks):
"""Run the China CPI scraper in the background"""
background_tasks.add_task(get_china_cpi)
return {"message": "Scraper started in background"}
# Add a root endpoint
@app.get("/")
def root():
"""Root endpoint with API information"""
return {
"message": "China CPI API",
"endpoints": [
{"path": "/", "method": "GET", "description": "This information"},
{"path": "/run-scraper", "method": "GET", "description": "Trigger the data scraper"},
{"path": "/data", "method": "GET", "description": "Get CPI data with optional filters"},
{"path": "/latest", "method": "GET", "description": "Get the latest CPI data"}
]
}
# Your existing endpoints
@app.get("/data")
def get_data(
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
limit: int = Query(100, ge=1, le=1000)
):
conn = get_db_connection()
cur = conn.cursor()
# Modify this query to use the china_cpi_processed table
query = """
SELECT p.year, p.month, p.actual, r.release_date, r.forecast, r.previous, r.time, r.actual_status
FROM china_cpi_processed p
JOIN china_cpi_raw r ON
(p.year = EXTRACT(YEAR FROM r.release_date) AND p.month = EXTRACT(MONTH FROM r.release_date))
WHERE 1=1
"""
params = []
if start_date:
query += " AND DATE(r.release_date) >= %s"
params.append(start_date)
if end_date:
query += " AND DATE(r.release_date) <= %s"
params.append(end_date)
query += " ORDER BY r.release_date DESC LIMIT %s"
params.append(limit)
cur.execute(query, params)
rows = cur.fetchall()
cur.close()
conn.close()
return [
{
"year": row[0],
"month": row[1],
"actual": row[2],
"release_date": row[3],
"forecast": row[4],
"previous": row[5],
"time": row[6],
"actual_status": row[7]
} for row in rows
]
@app.get("/latest")
def get_latest():
conn = get_db_connection()
cur = conn.cursor()
# Modify to use your existing tables
cur.execute("""
SELECT p.year, p.month, p.actual, r.release_date, r.forecast, r.previous, r.time, r.actual_status
FROM china_cpi_processed p
JOIN china_cpi_raw r ON
(p.year = EXTRACT(YEAR FROM r.release_date) AND p.month = EXTRACT(MONTH FROM r.release_date))
ORDER BY r.release_date DESC LIMIT 1
""")
row = cur.fetchone()
cur.close()
conn.close()
if row:
return {
"year": row[0],
"month": row[1],
"actual": row[2],
"release_date": row[3],
"forecast": row[4],
"previous": row[5],
"time": row[6],
"actual_status": row[7]
}
return {"error": "No data found"}