Spaces:
Running
Running
| import os, jwt, datetime, json | |
| from flask import Flask, request, jsonify | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from models import db, User, SearchHistory | |
| from flask_cors import CORS | |
| from functools import wraps | |
| from datetime import datetime as dt | |
| SECRET_KEY = os.getenv("SECRET_KEY", "ultra_secret_dev_key") | |
| app = Flask(__name__) | |
| app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///users.db" | |
| app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False | |
| db.init_app(app) | |
| CORS(app) | |
| with app.app_context(): | |
| db.create_all() | |
| def token_required(f): | |
| def decorated(*args, **kwargs): | |
| token = None | |
| if "Authorization" in request.headers: | |
| token = request.headers["Authorization"].split(" ")[-1] | |
| if not token: | |
| return jsonify({"message": "Token is missing!"}), 401 | |
| try: | |
| data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) | |
| current_user = User.query.get(data["user_id"]) | |
| except Exception: | |
| return jsonify({"message": "Token is invalid!"}), 401 | |
| return f(current_user, *args, **kwargs) | |
| return decorated | |
| def signup(): | |
| data = request.json | |
| if User.query.filter_by(username=data["username"]).first(): | |
| return jsonify({"message": "User already exists"}), 400 | |
| hashed_pw = generate_password_hash(data["password"]) | |
| new_user = User(username=data["username"], password_hash=hashed_pw) | |
| db.session.add(new_user) | |
| db.session.commit() | |
| return jsonify({"message": "User created"}), 201 | |
| def login(): | |
| data = request.json | |
| user = User.query.filter_by(username=data["username"]).first() | |
| if not user or not check_password_hash(user.password_hash, data["password"]): | |
| return jsonify({"message": "Invalid credentials"}), 401 | |
| token = jwt.encode({ | |
| "user_id": user.id, | |
| "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=2) | |
| }, SECRET_KEY, algorithm="HS256") | |
| return jsonify({"token": token}) | |
| def secure_search(current_user): | |
| data = request.get_json() | |
| query = data.get("query", "") | |
| filters = data.get("filters", {}) | |
| # Save search to history | |
| entry = SearchHistory( | |
| query=query, | |
| filters=json.dumps(filters), | |
| user_id=current_user.id, | |
| timestamp=dt.utcnow() | |
| ) | |
| db.session.add(entry) | |
| db.session.commit() | |
| # Simulate filtered results | |
| results = [ | |
| {"title": f"{query} ({', '.join(k for k, v in filters.items() if v)})", "url": "#"} | |
| ] | |
| return jsonify({ | |
| "user": current_user.username, | |
| "results": results, | |
| "ai_preview": "https://fake-preview.com/image.png" | |
| }) | |
| def get_history(current_user): | |
| history = SearchHistory.query.filter_by(user_id=current_user.id)\ | |
| .order_by(SearchHistory.timestamp.desc()).limit(20).all() | |
| return jsonify([ | |
| { | |
| "query": h.query, | |
| "filters": json.loads(h.filters or '{}'), | |
| "timestamp": h.timestamp.isoformat() | |
| } | |
| for h in history | |
| ]) | |
| def clear_history(current_user): | |
| deleted = SearchHistory.query.filter_by(user_id=current_user.id).delete() | |
| db.session.commit() | |
| return jsonify({"message": f"{deleted} entries deleted."}), 200 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860)) | |