import os import json import csv from io import StringIO from functools import wraps from datetime import datetime from flask import Flask, request, jsonify from flask_cors import CORS from dotenv import load_dotenv # Firebase Admin SDK import firebase_admin from firebase_admin import credentials, auth, storage, db as firebase_db, initialize_app # Exa.ai from exa_py import Exa # Google GenAI (Gemini) from google import genai # ─── Load environment ─────────────────────────────────────────────────────────── load_dotenv() EXA_API_KEY = os.getenv("EXA_API_KEY") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") FIREBASE_JSON = os.getenv("FIREBASE") FIREBASE_DB_URL = os.getenv("Firebase_DB") STORAGE_BUCKET = os.getenv("Firebase_Storage") if not (EXA_API_KEY and GEMINI_API_KEY and FIREBASE_JSON and FIREBASE_DB_URL and STORAGE_BUCKET): raise RuntimeError("Missing one or more required env vars.") # ─── Initialize Firebase ──────────────────────────────────────────────────────── cred = credentials.Certificate(json.loads(FIREBASE_JSON)) initialize_app(cred, { "storageBucket": STORAGE_BUCKET, "databaseURL": FIREBASE_DB_URL }) bucket = storage.bucket() # ─── Ensure dummy admin exists ─────────────────────────────────────────────────── try: admin_user = auth.get_user_by_email("gsamukange@yahoo.com") except firebase_admin._auth_utils.UserNotFoundError: admin_user = auth.create_user( email="gsamukange@yahoo.com", email_verified=True, password="marco2025", display_name="Admin" ) auth.set_custom_user_claims(admin_user.uid, {"admin": True}) # ─── Initialize Exa.ai ────────────────────────────────────────────────────────── exa = Exa(EXA_API_KEY) # ─── Initialize Gemini Client ─────────────────────────────────────────────────── client = genai.Client(api_key=GEMINI_API_KEY) MODEL = "gemini-2.0-flash-001" # ─── Flask App ────────────────────────────────────────────────────────────────── app = Flask(__name__) CORS(app) # ─── Helpers ──────────────────────────────────────────────────────────────────── def verify_id_token(f): @wraps(f) def wrapper(*args, **kwargs): auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return jsonify({"error": "Missing or invalid Authorization header"}), 401 id_token = auth_header.split(" ")[1] try: decoded = auth.verify_id_token(id_token) request.user = decoded except Exception: return jsonify({"error": "Invalid or expired token"}), 401 return f(*args, **kwargs) return wrapper def require_admin(f): @wraps(f) def wrapper(*args, **kwargs): if not getattr(request, "user", None) or not request.user.get("admin", False): return jsonify({"error": "Admin privileges required"}), 403 return f(*args, **kwargs) return wrapper def increment_query_count(ip): sanitized_ip = ip.replace(".", "_") ref = firebase_db.reference(f"ip_queries/{sanitized_ip}") current = ref.get() count = current["count"] + 1 if current else 1 ref.set({"count": count}) return count def store_chat(ip, message, response): ref = firebase_db.reference("chats") ref.push({ "ip": ip, "message": message, "response": response, "timestamp": datetime.utcnow().isoformat() }) def get_user_record(uid): return firebase_db.reference(f"users/{uid}").get() # ─── Routes ───────────────────────────────────────────────────────────────────── @app.route("/chat", methods=["POST"]) def chat(): user_ip = request.remote_addr or "0.0.0.0" user_ip = user_ip.replace(".", "_") count = increment_query_count(user_ip) need_login = count > 5 token = request.headers.get("Authorization", "") user_info = None if token.startswith("Bearer "): try: user_info = auth.verify_id_token(token.split(" ")[1]) except: user_info = None if need_login and not user_info: return jsonify({ "error": "Please log in to continue after 5 free queries", "login_required": True }), 403 data = request.get_json() user_message = data.get("message", "").strip() classify_prompt = ( "Categorize the user's intent as JSON list of one or both:\n" "['self-help','product_search']\n\n" f"User: \"{user_message}\"" ) classify_resp = client.models.generate_content(model=MODEL, contents=classify_prompt) try: intents = json.loads(classify_resp.text) except: intents = ["self-help"] parts = [] if "self-help" in intents: help_prompt = f"You are a helpful assistant. Give step-by-step guidance for: \"{user_message}\"" help_resp = client.models.generate_content(model=MODEL, contents=help_prompt) parts.append(help_resp.text.strip()) search_q = user_message + " ingredients" exa_res = exa.search_and_contents(search_q, type="auto", text=True) links = [r.url for r in exa_res.results[:3]] if links: parts.append("Here are some useful links to get supplies:\n" + "\n".join(f"- {u}" for u in links)) if "product_search" in intents and "self-help" not in intents: exa_res = exa.search_and_contents(user_message, type="auto", text=True) links = [r.url for r in exa_res.results[:5]] rec_prompt = ( "You are a shopping assistant. Suggest products for:\n" f"\"{user_message}\"\n" "Always include links:\n" + "\n".join(links) ) rec_resp = client.models.generate_content(model=MODEL, contents=rec_prompt) parts.append(rec_resp.text.strip()) if not parts: fallback = client.models.generate_content(model=MODEL, contents=f"Help the user with: \"{user_message}\"") parts.append(fallback.text.strip()) final_response = "\n\n".join(parts) store_chat(user_ip, user_message, final_response) return jsonify({"response": final_response}) @app.route("/auth/signup", methods=["POST"]) def signup(): data = request.get_json() email = data.get("email") pwd = data.get("password") try: user = auth.create_user(email=email, password=pwd) firebase_db.reference(f"users/{user.uid}").set({ "email": email, "preferences": {} }) return jsonify({"message": "User created"}), 201 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/dashboard", methods=["GET"]) @verify_id_token def user_dashboard(): uid = request.user["uid"] user_data = get_user_record(uid) if not user_data: return jsonify({"error": "User not found"}), 404 return jsonify({ "email": user_data.get("email"), "preferences": user_data.get("preferences", {}) }) @app.route("/dashboard/preferences", methods=["POST"]) @verify_id_token def update_preferences(): uid = request.user["uid"] prefs = request.get_json().get("preferences", {}) ref = firebase_db.reference(f"users/{uid}/preferences") ref.set(prefs) return jsonify({"message": "Preferences updated"}) # ── Admin Routes ─────────────────────────────────────────────────────────────── @app.route("/admin/upload_links", methods=["POST"]) @verify_id_token @require_admin def upload_links(): if "file" not in request.files: return jsonify({"error": "CSV file required"}), 400 f = request.files["file"] stream = StringIO(f.stream.read().decode("utf-8")) reader = csv.DictReader(stream) ref = firebase_db.reference("sponsoredLinks") for row in reader: ref.push({ "keyword": row.get("keyword"), "url": row.get("url") }) return jsonify({"message": "Links uploaded from CSV"}) @app.route("/admin/add_link", methods=["POST"]) @verify_id_token @require_admin def add_link(): data = request.get_json() keyword = data.get("keyword") url = data.get("url") if not (keyword and url): return jsonify({"error": "Both 'keyword' and 'url' are required"}), 400 ref = firebase_db.reference("sponsoredLinks") ref.push({"keyword": keyword, "url": url}) return jsonify({"message": "Link added successfully"}) @app.route("/admin/update_link/", methods=["PUT"]) @verify_id_token @require_admin def update_link(link_id): data = request.get_json() keyword = data.get("keyword") url = data.get("url") if not (keyword and url): return jsonify({"error": "Both 'keyword' and 'url' are required"}), 400 ref = firebase_db.reference(f"sponsoredLinks/{link_id}") if not ref.get(): return jsonify({"error": "Link not found"}), 404 ref.update({"keyword": keyword, "url": url}) return jsonify({"message": "Link updated successfully"}) @app.route("/admin/sponsored_links", methods=["GET"]) @verify_id_token @require_admin def get_sponsored_links(): ref = firebase_db.reference("sponsoredLinks") data = ref.get() or {} # Convert to a list of objects with id included links = [{"id": key, "keyword": val.get("keyword"), "url": val.get("url")} for key, val in data.items()] return jsonify({"sponsored_links": links}) @app.route("/admin/delete_link/", methods=["DELETE"]) @verify_id_token @require_admin def delete_link(link_id): ref = firebase_db.reference(f"sponsoredLinks/{link_id}") if not ref.get(): return jsonify({"error": "Link not found"}), 404 ref.delete() return jsonify({"message": "Link deleted successfully"}) @app.route("/admin/stats", methods=["GET"]) @verify_id_token @require_admin def stats(): users = auth.list_users().users total_users = len(users) ip_data = firebase_db.reference("ip_queries").get() or {} total_queries = sum(val.get("count", 0) for val in ip_data.values()) return jsonify({ "total_users": total_users, "total_queries": total_queries }) # ─── Run ─────────────────────────────────────────────────────────────────────── if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True)