Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, request, jsonify, redirect, url_for, send_file, flash, session | |
| from flask_sqlalchemy import SQLAlchemy | |
| from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user | |
| from flask_bcrypt import Bcrypt | |
| from flask_cors import CORS | |
| from datetime import datetime, timedelta | |
| import time | |
| import random | |
| import json | |
| import csv | |
| import io | |
| import secrets | |
| import os | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| # ==================== CONFIGURATION ==================== | |
| app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///ai_cms.db') | |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
| app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-key-change-in-production-xyz123') | |
| app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) | |
| app.config['SESSION_COOKIE_SECURE'] = True # ← HF Spaces uses HTTPS | |
| app.config['SESSION_COOKIE_HTTPONLY'] = True | |
| app.config['SESSION_COOKIE_SAMESITE'] = 'None' # ← Allow cross-site cookies | |
| app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { | |
| 'pool_recycle': 300, | |
| 'pool_pre_ping': True, | |
| } | |
| db = SQLAlchemy(app) | |
| bcrypt = Bcrypt(app) | |
| login_manager = LoginManager(app) | |
| login_manager.login_view = 'login' | |
| CORS(app, supports_credentials=True) | |
| # ==================== DATABASE MODELS ==================== | |
| class User(UserMixin, db.Model): | |
| __tablename__ = 'users' | |
| id = db.Column(db.Integer, primary_key=True) | |
| username = db.Column(db.String(80), unique=True, nullable=False) | |
| password_hash = db.Column(db.String(120), nullable=False) | |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| is_admin = db.Column(db.Boolean, default=False) | |
| def set_password(self, password): | |
| self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8') | |
| def check_password(self, password): | |
| return bcrypt.check_password_hash(self.password_hash, password) | |
| class APIKey(db.Model): | |
| __tablename__ = 'api_keys' | |
| id = db.Column(db.Integer, primary_key=True) | |
| key = db.Column(db.String(64), unique=True, nullable=False) | |
| user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False) | |
| name = db.Column(db.String(100)) | |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| last_used = db.Column(db.DateTime) | |
| is_active = db.Column(db.Boolean, default=True) | |
| usage_count = db.Column(db.Integer, default=0) | |
| user = db.relationship('User', backref='api_keys') | |
| def generate_key(): | |
| return secrets.token_hex(32) | |
| def to_dict(self): | |
| return { | |
| 'id': self.id, | |
| 'name': self.name, | |
| 'key': self.key, | |
| 'created_at': self.created_at.strftime("%Y-%m-%d %H:%M:%S"), | |
| 'last_used': self.last_used.strftime("%Y-%m-%d %H:%M:%S") if self.last_used else 'Never', | |
| 'usage_count': self.usage_count, | |
| 'is_active': self.is_active | |
| } | |
| class APIEndpoint(db.Model): | |
| __tablename__ = 'api_endpoints' | |
| id = db.Column(db.Integer, primary_key=True) | |
| user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False) | |
| route = db.Column(db.String(100), nullable=False) | |
| model = db.Column(db.String(50), nullable=False) | |
| prompt = db.Column(db.Text, nullable=False) | |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| is_active = db.Column(db.Boolean, default=True) | |
| total_calls = db.Column(db.Integer, default=0) | |
| total_tokens = db.Column(db.Integer, default=0) | |
| total_cost = db.Column(db.Float, default=0.0) | |
| user = db.relationship('User', backref='endpoints') | |
| logs = db.relationship('APILog', backref='endpoint', lazy=True, cascade='all, delete-orphan') | |
| def to_dict(self): | |
| return { | |
| 'id': self.id, | |
| 'route': self.route, | |
| 'model': self.model, | |
| 'prompt': self.prompt, | |
| 'created_at': self.created_at.strftime("%Y-%m-%d %H:%M:%S"), | |
| 'is_active': self.is_active, | |
| 'total_calls': self.total_calls, | |
| 'total_tokens': self.total_tokens, | |
| 'total_cost': round(self.total_cost, 4) | |
| } | |
| class APILog(db.Model): | |
| __tablename__ = 'api_logs' | |
| id = db.Column(db.Integer, primary_key=True) | |
| endpoint_id = db.Column(db.Integer, db.ForeignKey('api_endpoints.id', ondelete='CASCADE'), nullable=False) | |
| api_key_id = db.Column(db.Integer, db.ForeignKey('api_keys.id', ondelete='SET NULL'), nullable=True) | |
| timestamp = db.Column(db.DateTime, default=datetime.utcnow) | |
| input_data = db.Column(db.Text) | |
| output_data = db.Column(db.Text) | |
| tokens_used = db.Column(db.Integer) | |
| latency = db.Column(db.Float) | |
| cost = db.Column(db.Float) | |
| status_code = db.Column(db.Integer, default=200) | |
| api_key = db.relationship('APIKey', backref='logs') | |
| def to_dict(self): | |
| return { | |
| 'id': self.id, | |
| 'endpoint_id': self.endpoint_id, | |
| 'timestamp': self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), | |
| 'tokens_used': self.tokens_used, | |
| 'latency': round(self.latency, 3), | |
| 'cost': round(self.cost, 4), | |
| 'status_code': self.status_code | |
| } | |
| # ==================== LOGIN MANAGER ==================== | |
| def load_user(user_id): | |
| return User.query.get(int(user_id)) | |
| # ==================== HELPER FUNCTIONS ==================== | |
| def generate_mock_response(prompt, input_data): | |
| time.sleep(random.uniform(0.3, 0.8)) | |
| return f"AI Response based on prompt: '{prompt[:30]}...' and input: {json.dumps(input_data)}" | |
| # ==================== AUTH ROUTES ==================== | |
| def login(): | |
| logger.info("Login route accessed") | |
| if current_user.is_authenticated: | |
| logger.info("User already authenticated, redirecting to dashboard") | |
| return redirect(url_for('dashboard')) | |
| if request.method == 'POST': | |
| try: | |
| logger.info(f"Login attempt - Content-Type: {request.content_type}") | |
| data = request.get_json() if request.is_json else request.form | |
| logger.info(f"Login data received") | |
| username = data.get('username') if data else None | |
| password = data.get('password') if data else None | |
| if not username or not password: | |
| logger.warning("Missing username or password") | |
| if request.is_json: | |
| return jsonify({"status": "error", "message": "Username and password required"}), 400 | |
| flash('Username and password required', 'error') | |
| return render_template('login.html') | |
| logger.info(f"Looking for user: {username}") | |
| user = User.query.filter_by(username=username).first() | |
| if not user: | |
| logger.warning(f"User not found: {username}") | |
| if request.is_json: | |
| return jsonify({"status": "error", "message": "Invalid credentials"}), 401 | |
| flash('Invalid username or password', 'error') | |
| return render_template('login.html') | |
| logger.info(f"User found: {user.username}, checking password...") | |
| if user.check_password(password): | |
| logger.info("Password correct, logging in user...") | |
| login_user(user, remember=True) | |
| session.permanent = True | |
| logger.info(f"User logged in: {user.username}") | |
| next_page = request.args.get('next') | |
| if request.is_json: | |
| return jsonify({"status": "success", "message": "Login successful"}) | |
| return redirect(next_page) if next_page else redirect(url_for('dashboard')) | |
| else: | |
| logger.warning("Password incorrect") | |
| if request.is_json: | |
| return jsonify({"status": "error", "message": "Invalid credentials"}), 401 | |
| flash('Invalid username or password', 'error') | |
| except Exception as e: | |
| logger.error(f"Login error: {str(e)}", exc_info=True) | |
| if request.is_json: | |
| return jsonify({"status": "error", "message": f"Internal error: {str(e)}"}), 500 | |
| flash('An error occurred', 'error') | |
| return render_template('login.html') | |
| def register(): | |
| if current_user.is_authenticated: | |
| return redirect(url_for('dashboard')) | |
| if request.method == 'POST': | |
| try: | |
| data = request.get_json() if request.is_json else request.form | |
| username = data.get('username') | |
| password = data.get('password') | |
| if User.query.filter_by(username=username).first(): | |
| if request.is_json: | |
| return jsonify({"status": "error", "message": "Username exists"}), 400 | |
| flash('Username already exists', 'error') | |
| return render_template('register.html') | |
| user = User(username=username) | |
| user.set_password(password) | |
| if User.query.count() == 0: | |
| user.is_admin = True | |
| db.session.add(user) | |
| db.session.commit() | |
| logger.info(f"User registered: {username}") | |
| if request.is_json: | |
| return jsonify({"status": "success", "message": "Registration successful"}) | |
| return redirect(url_for('login')) | |
| except Exception as e: | |
| logger.error(f"Register error: {str(e)}", exc_info=True) | |
| if request.is_json: | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| flash('An error occurred', 'error') | |
| return render_template('register.html') | |
| def logout(): | |
| logout_user() | |
| return redirect(url_for('login')) | |
| # ==================== FRONTEND ROUTES ==================== | |
| def dashboard(): | |
| return render_template('index.html') | |
| # ==================== CMS API ENDPOINTS ==================== | |
| def create_api(): | |
| data = request.json | |
| existing = APIEndpoint.query.filter_by(route=data['route'], user_id=current_user.id).first() | |
| if existing: | |
| return jsonify({"status": "error", "message": "Route already exists"}), 400 | |
| new_api = APIEndpoint( | |
| user_id=current_user.id, | |
| route=data['route'], | |
| model=data['model'], | |
| prompt=data['prompt'] | |
| ) | |
| db.session.add(new_api) | |
| db.session.commit() | |
| return jsonify({"status": "success", "message": "API Created Successfully", "api": new_api.to_dict()}) | |
| def list_apis(): | |
| apis = APIEndpoint.query.filter_by(user_id=current_user.id).all() | |
| return jsonify([api.to_dict() for api in apis]) | |
| def get_stats(): | |
| apis = APIEndpoint.query.filter_by(user_id=current_user.id).all() | |
| total_calls = sum(api.total_calls for api in apis) | |
| total_tokens = sum(api.total_tokens for api in apis) | |
| total_cost = sum(api.total_cost for api in apis) | |
| return jsonify({ | |
| "total_calls": total_calls, | |
| "total_tokens": total_tokens, | |
| "total_cost": round(total_cost, 4), | |
| "active_apis": len([api for api in apis if api.is_active]) | |
| }) | |
| def get_logs(): | |
| user_endpoint_ids = [api.id for api in APIEndpoint.query.filter_by(user_id=current_user.id).all()] | |
| logs = APILog.query.filter(APILog.endpoint_id.in_(user_endpoint_ids)).order_by(APILog.timestamp.desc()).limit(50).all() | |
| return jsonify([log.to_dict() for log in logs]) | |
| def delete_api(api_id): | |
| api = APIEndpoint.query.filter_by(id=api_id, user_id=current_user.id).first_or_404() | |
| db.session.delete(api) | |
| db.session.commit() | |
| return jsonify({"status": "success", "message": "API Deleted"}) | |
| def export_logs(): | |
| user_endpoint_ids = [api.id for api in APIEndpoint.query.filter_by(user_id=current_user.id).all()] | |
| logs = APILog.query.filter(APILog.endpoint_id.in_(user_endpoint_ids)).order_by(APILog.timestamp.desc()).all() | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow(['Timestamp', 'Endpoint', 'Tokens', 'Latency', 'Cost', 'Status']) | |
| for log in logs: | |
| writer.writerow([ | |
| log.timestamp.strftime("%Y-%m-%d %H:%M:%S"), | |
| log.endpoint.route, | |
| log.tokens_used, | |
| round(log.latency, 3), | |
| round(log.cost, 4), | |
| log.status_code | |
| ]) | |
| output.seek(0) | |
| return send_file( | |
| io.BytesIO(output.getvalue().encode('utf-8')), | |
| mimetype='text/csv', | |
| as_attachment=True, | |
| download_name=f'api_logs_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv' | |
| ) | |
| # ==================== API KEY MANAGEMENT ==================== | |
| def list_keys(): | |
| keys = APIKey.query.filter_by(user_id=current_user.id).all() | |
| return jsonify([k.to_dict() for k in keys]) | |
| def create_key(): | |
| data = request.json | |
| new_key = APIKey( | |
| user_id=current_user.id, | |
| key=APIKey.generate_key(), | |
| name=data.get('name', 'Unnamed Key') | |
| ) | |
| db.session.add(new_key) | |
| db.session.commit() | |
| return jsonify({"status": "success", "key": new_key.key}) | |
| def delete_key(key_id): | |
| key = APIKey.query.filter_by(id=key_id, user_id=current_user.id).first_or_404() | |
| db.session.delete(key) | |
| db.session.commit() | |
| return jsonify({"status": "success", "message": "Key Deleted"}) | |
| def toggle_key(key_id): | |
| key = APIKey.query.filter_by(id=key_id, user_id=current_user.id).first_or_404() | |
| key.is_active = not key.is_active | |
| db.session.commit() | |
| return jsonify({"status": "success", "is_active": key.is_active}) | |
| # ==================== DYNAMIC USER API ENDPOINTS ==================== | |
| def dynamic_endpoint(route): | |
| api_key = request.headers.get('X-API-Key') or request.args.get('api_key') | |
| target_api = APIEndpoint.query.filter_by(route=route, is_active=True).first() | |
| if not target_api: | |
| return jsonify({"error": "Endpoint not found"}), 404 | |
| api_key_obj = None | |
| if api_key: | |
| api_key_obj = APIKey.query.filter_by(key=api_key, is_active=True).first() | |
| if not api_key_obj: | |
| return jsonify({"error": "Invalid API Key"}), 401 | |
| api_key_obj.last_used = datetime.utcnow() | |
| api_key_obj.usage_count += 1 | |
| start_time = time.time() | |
| input_data = request.json if request.is_json else request.args.to_dict() | |
| response_text = generate_mock_response(target_api.prompt, input_data) | |
| latency = time.time() - start_time | |
| tokens_used = len(response_text) // 4 | |
| cost = (tokens_used / 1000) * 0.002 | |
| target_api.total_calls += 1 | |
| target_api.total_tokens += tokens_used | |
| target_api.total_cost += cost | |
| log = APILog( | |
| endpoint_id=target_api.id, | |
| api_key_id=api_key_obj.id if api_key_obj else None, | |
| input_data=json.dumps(input_data), | |
| output_data=response_text, | |
| tokens_used=tokens_used, | |
| latency=latency, | |
| cost=cost, | |
| status_code=200 | |
| ) | |
| db.session.add(log) | |
| db.session.commit() | |
| return jsonify({ | |
| "response": response_text, | |
| "meta": { | |
| "model": target_api.model, | |
| "latency": round(latency, 3), | |
| "tokens": tokens_used, | |
| "cost": round(cost, 4) | |
| } | |
| }) | |
| # ==================== DEBUG ROUTES ==================== | |
| def debug_db(): | |
| try: | |
| users = User.query.all() | |
| return jsonify({ | |
| 'status': 'ok', | |
| 'users': [{'id': u.id, 'username': u.username} for u in users], | |
| 'user_count': len(users) | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'error': str(e) | |
| }), 500 | |
| def debug_session(): | |
| return jsonify({ | |
| 'is_authenticated': current_user.is_authenticated, | |
| 'user_id': current_user.id if current_user.is_authenticated else None, | |
| 'username': current_user.username if current_user.is_authenticated else None, | |
| 'session_id': session.get('_id', 'No session') | |
| }) | |
| # ==================== ERROR HANDLERS ==================== | |
| def internal_error(error): | |
| db.session.rollback() | |
| logger.error(f"Internal error: {str(error)}") | |
| if request.is_json: | |
| return jsonify({"status": "error", "message": "Internal server error"}), 500 | |
| return render_template('500.html'), 500 | |
| def not_found(error): | |
| if request.is_json: | |
| return jsonify({"status": "error", "message": "Not found"}), 404 | |
| return render_template('404.html'), 404 | |
| # ==================== DATABASE INITIALIZATION ==================== | |
| def init_db(): | |
| with app.app_context(): | |
| db.create_all() | |
| if User.query.count() == 0: | |
| admin = User(username='admin', is_admin=True) | |
| admin.set_password('admin123') | |
| db.session.add(admin) | |
| db.session.commit() | |
| logger.info("✅ Default admin created (username: admin, password: admin123)") | |
| logger.info("✅ Database initialized!") | |
| # ==================== MAIN ==================== | |
| if __name__ == '__main__': | |
| init_db() | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port, debug=False) |