StockFinance / refresh_data.py
bhartiya75's picture
Update refresh_data.py
48512cb verified
import requests
import json
import sqlite3
import pandas as pd
from datetime import datetime
import os
import time
import json
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
import io
from googleapiclient.errors import HttpError
# Database path
DB_PATH = "stocks.db"
# Google Drive sync credentials and folder (use Secrets in production)
GOOGLE_SERVICE_ACCOUNT_JSON = os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON", "YOUR_GOOGLE_SERVICE_ACCOUNT_JSON")
GOOGLE_DRIVE_FOLDER_ID = os.environ.get("GOOGLE_DRIVE_FOLDER_ID", "YOUR_GOOGLE_DRIVE_FOLDER_ID")
# Alpha Vantage API key (set as environment variable or hardcode temporarily for testing, update for Spaces)
ALPHA_VANTAGE_API_KEY = os.environ.get("ALPHA_VANTAGE_API_KEY", "YOUR_ALPHA_VANTAGE_API_KEY")
# List of tickers to refresh
popular_stocks = ["AAPL", "GOOGL", "GOOG", "MSFT", "AMZN", "TSLA", "FB", "NVDA", "IBM", "INTC", "ORCL", "META"]
def fetch_alpha_vantage_quote(ticker, api_key):
url = f"https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol={ticker}&apikey={api_key}"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
if "Global Quote" in data:
return data["Global Quote"]
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching quote for {ticker}: {str(e)}")
return None
def fetch_alpha_vantage_history(ticker, api_key):
url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={ticker}&outputsize=compact&apikey={api_key}"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
if "Time Series (Daily)" in data:
daily_data = data["Time Series (Daily)"]
df = pd.DataFrame.from_dict(daily_data, orient='index')
df = df.astype(float)
df.index = pd.to_datetime(df.index)
df = df.sort_index(ascending=True)
df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
return df[['Close', 'Volume']].to_json()
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching history for {ticker}: {str(e)}")
return None
def fetch_alpha_vantage_overview(ticker, api_key):
url = f"https://www.alphavantage.co/query?function=OVERVIEW&symbol={ticker}&apikey={api_key}"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
return data
except requests.exceptions.RequestException as e:
print(f"Error fetching overview for {ticker}: {str(e)}")
return None
def sync_database_to_drive():
try:
# Parse service account JSON from Secrets
service_account_info = json.loads(GOOGLE_SERVICE_ACCOUNT_JSON)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
drive_service = build("drive", "v3", credentials=credentials)
# Upload stocks.db to Google Drive
file_metadata = {
"name": "stocks.db",
"parents": [GOOGLE_DRIVE_FOLDER_ID]
}
media = MediaFileUpload(DB_PATH, mimetype="application/octet-stream")
file = drive_service.files().create(body=file_metadata, media_body=media, fields="id").execute()
print(f"Successfully synced SQLite database to Google Drive (File ID: {file.get('id')})")
except HttpError as e:
if e.resp.status == 404:
print("Google Drive folder not found. Check GOOGLE_DRIVE_FOLDER_ID and permissions.")
# Optionally, create a new file or folder, but this requires additional logic
else:
print(f"Error syncing database to Google Drive: {str(e)}")
except Exception as e:
print(f"Error syncing database to Google Drive: {str(e)}")
def sync_database_from_drive():
try:
# Parse service account JSON from Secrets
service_account_info = json.loads(GOOGLE_SERVICE_ACCOUNT_JSON)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
drive_service = build("drive", "v3", credentials=credentials)
# Download stocks.db from Google Drive
file_metadata = drive_service.files().list(q=f"'{GOOGLE_DRIVE_FOLDER_ID}' in parents and name='stocks.db'").execute()
files = file_metadata.get("files", [])
if files:
request = drive_service.files().get_media(fileId=files[0]["id"])
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
status, done = downloader.next_chunk()
with open(DB_PATH, "wb") as f:
f.write(fh.getvalue())
print("Successfully synced SQLite database from Google Drive.")
else:
print("No stocks.db found in Google Drive. Creating a new database.")
conn = sqlite3.connect(DB_PATH)
conn.close()
except HttpError as e:
if e.resp.status == 404:
print("stocks.db not found in Google Drive. Creating a new local database.")
conn = sqlite3.connect(DB_PATH)
conn.close()
else:
print(f"Error syncing database from Google Drive: {str(e)}")
except Exception as e:
print(f"Error syncing database from Google Drive: {str(e)}")
def update_database():
# Sync database from Google Drive before updating
sync_database_from_drive()
# Connect to SQLite database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS stocks (
ticker TEXT PRIMARY KEY,
quote TEXT,
history TEXT,
overview TEXT,
last_updated TEXT
)
""")
# Fetch and store data for each ticker, with delays to respect rate limits
for ticker in popular_stocks:
quote = fetch_alpha_vantage_quote(ticker, ALPHA_VANTAGE_API_KEY)
history = fetch_alpha_vantage_history(ticker, ALPHA_VANTAGE_API_KEY)
overview = fetch_alpha_vantage_overview(ticker, ALPHA_VANTAGE_API_KEY)
if quote and history and overview:
cursor.execute("""
REPLACE INTO stocks (ticker, quote, history, overview, last_updated)
VALUES (?, ?, ?, ?, ?)
""", (ticker, json.dumps(quote), history, json.dumps(overview), datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print(f"Skipping {ticker} due to missing data")
# Delay to respect 5 requests/minute limit (12 seconds per request for 5 requests/min)
time.sleep(12)
conn.commit()
conn.close()
# Sync updated database to Google Drive
sync_database_to_drive()
if __name__ == "__main__":
update_database()