| from flask import Flask, render_template, request, jsonify, redirect, url_for
|
| import requests
|
| from bs4 import BeautifulSoup
|
| import re
|
| import os
|
| from dotenv import load_dotenv
|
| from pymongo import MongoClient
|
| import threading
|
| import time
|
|
|
|
|
| load_dotenv()
|
|
|
| app = Flask(__name__)
|
|
|
|
|
| MONGO_URI = os.environ.get("MONGO_URI")
|
| client = MongoClient(MONGO_URI)
|
| db = client["steam_tracker"]
|
| watchlist_col = db["watchlist"]
|
|
|
|
|
| def get_app_id_from_name(game_name):
|
| """Searches Steam for a game name and returns its App ID"""
|
| search_url = f"https://store.steampowered.com/api/storesearch/?term={game_name}&l=english&cc=IN"
|
| try:
|
| response = requests.get(search_url, timeout=5)
|
| data = response.json()
|
| if data.get('total', 0) > 0:
|
|
|
| return str(data['items'][0]['id'])
|
| except Exception:
|
| pass
|
| return None
|
|
|
|
|
| def scrape_steam_price(app_id):
|
| url = f"https://store.steampowered.com/app/{app_id}/?cc=in"
|
| cookies = {'birthtime': '283993201', 'lastagecheckage': '1-January-1979'}
|
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
|
|
|
| try:
|
| response = requests.get(url, cookies=cookies, headers=headers, timeout=10)
|
| if response.status_code != 200:
|
| return None
|
|
|
| soup = BeautifulSoup(response.text, 'html.parser')
|
| title_element = soup.find('div', id='appHubAppName')
|
| game_title = title_element.text.strip() if title_element else f"App ID: {app_id}"
|
|
|
| price_element = soup.find('div', class_='discount_final_price') or soup.find('div', class_='game_purchase_price')
|
|
|
| if price_element:
|
| raw_price = price_element.text.strip()
|
|
|
| clean_price = raw_price.replace(',', '')
|
| numeric_parts = re.findall(r'\d+\.?\d*', clean_price)
|
| if numeric_parts:
|
| price_float = float(numeric_parts[0])
|
| price_str = raw_price
|
| if '₹' not in price_str and 'rs' not in price_str.lower():
|
| price_str = f"₹ {price_float:,.2f}"
|
| return {"title": game_title, "price_str": price_str, "price_float": price_float}
|
| elif "free" in raw_price.lower():
|
| return {"title": game_title, "price_str": "Free to Play", "price_float": 0.0}
|
| return None
|
| except Exception:
|
| return None
|
|
|
|
|
| def background_price_checker():
|
| while True:
|
| time.sleep(60)
|
| try:
|
| games = list(watchlist_col.find({}, {"_id": 1, "alert_type": 1, "threshold": 1, "triggered": 1, "initial_price": 1}))
|
| for game in games:
|
| app_id = game["_id"]
|
| alert_type = game.get("alert_type")
|
| threshold = int(game.get("threshold", 0))
|
| triggered = game.get("triggered", 0)
|
|
|
| data = scrape_steam_price(app_id)
|
| if data:
|
| current_price = data['price_float']
|
| price_str = data['price_str']
|
|
|
| is_triggered = 0
|
| if alert_type == "target_price" and current_price <= threshold:
|
| is_triggered = 1
|
| elif alert_type == "discount_drop" and "discount" in price_str.lower():
|
| is_triggered = 1
|
|
|
| update_fields = {
|
| "current_price": current_price,
|
| "price_str": price_str
|
| }
|
| update_fields["triggered"] = is_triggered
|
|
|
| watchlist_col.update_one({"_id": app_id}, {"$set": update_fields})
|
| except Exception as e:
|
| print(f"Background worker error: {e}")
|
|
|
| threading.Thread(target=background_price_checker, daemon=True).start()
|
|
|
|
|
| @app.route('/', methods=['GET', 'POST'])
|
| def index():
|
| error_msg = None
|
|
|
| if request.method == 'POST':
|
| user_input = request.form.get('app_id', '').strip()
|
| alert_type = request.form.get('alert_type')
|
| try:
|
| threshold = int(float(request.form.get('threshold', 0)))
|
| except ValueError:
|
| threshold = 0
|
|
|
| if user_input:
|
|
|
| if user_input.isdigit():
|
| app_id = user_input
|
| else:
|
| app_id = get_app_id_from_name(user_input)
|
|
|
| if not app_id:
|
| error_msg = f"Could not find a Steam game matching '{user_input}'. Try using the exact App ID."
|
| else:
|
| data = scrape_steam_price(app_id)
|
| if data:
|
|
|
| current_price = data['price_float']
|
| price_str = data['price_str']
|
| is_triggered = 0
|
| if alert_type == "target_price" and current_price <= threshold:
|
| is_triggered = 1
|
| elif alert_type == "discount_drop" and "discount" in price_str.lower():
|
| is_triggered = 1
|
|
|
| try:
|
| watchlist_col.replace_one(
|
| {"_id": app_id},
|
| {
|
| "_id": app_id,
|
| "app_id": app_id,
|
| "title": data['title'],
|
| "initial_price": data['price_float'],
|
| "current_price": data['price_float'],
|
| "initial_price_str": data['price_str'],
|
| "price_str": data['price_str'],
|
| "alert_type": alert_type,
|
| "threshold": threshold,
|
| "triggered": is_triggered
|
| },
|
| upsert=True
|
| )
|
| except Exception as e:
|
| error_msg = f"Database error: {e}"
|
| else:
|
| error_msg = "Could not fetch game details. Check the App ID."
|
|
|
|
|
| search_query = request.args.get('search', '').strip()
|
| try:
|
| if search_query:
|
| cursor = watchlist_col.find({"title": {"$regex": re.escape(search_query), "$options": "i"}})
|
| else:
|
| cursor = watchlist_col.find()
|
|
|
| watchlist = []
|
| for doc in cursor:
|
| watchlist.append((
|
| doc.get("_id"),
|
| doc.get("title", ""),
|
| doc.get("initial_price_str", ""),
|
| doc.get("price_str", ""),
|
| doc.get("alert_type", ""),
|
| int(doc.get("threshold", 0)),
|
| doc.get("triggered", 0),
|
| doc.get("initial_price", 0.0),
|
| doc.get("current_price", 0.0)
|
| ))
|
| except Exception as e:
|
| watchlist = []
|
| error_msg = f"Database error: {e}"
|
|
|
| return render_template('index.html', watchlist=watchlist, error=error_msg, search_query=search_query)
|
|
|
|
|
| @app.route('/delete/<app_id>', methods=['POST'])
|
| def delete_game(app_id):
|
| watchlist_col.delete_one({"_id": app_id})
|
| return redirect(url_for('index'))
|
|
|
| @app.route('/api/alerts')
|
| def check_alerts():
|
| alerts = []
|
| try:
|
| cursor = watchlist_col.find({"triggered": 1}, {"_id": 1, "title": 1, "price_str": 1})
|
| for doc in cursor:
|
| alerts.append({
|
| "app_id": doc.get("_id"),
|
| "title": doc.get("title", ""),
|
| "price": doc.get("price_str", "")
|
| })
|
| except Exception:
|
| pass
|
| return jsonify(alerts)
|
|
|
| @app.route('/api/dismiss', methods=['POST'])
|
| def dismiss_alert():
|
| return jsonify({"status": "success"})
|
|
|
| if __name__ == '__main__':
|
| app.run(debug=True) |