ai-cms-platform / app.py
zeltera's picture
Upload 3 files
1063df3 verified
from flask import Flask, render_template, request, jsonify, redirect, url_for, send_file, flash, session
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from flask_bcrypt import Bcrypt
from flask_cors import CORS
from datetime import datetime, timedelta
import time
import random
import json
import csv
import io
import secrets
import os
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# ==================== CONFIGURATION ====================
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///ai_cms.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-key-change-in-production-xyz123')
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
app.config['SESSION_COOKIE_SECURE'] = True # ← HF Spaces uses HTTPS
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'None' # ← Allow cross-site cookies
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_recycle': 300,
'pool_pre_ping': True,
}
db = SQLAlchemy(app)
bcrypt = Bcrypt(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
CORS(app, supports_credentials=True)
# ==================== DATABASE MODELS ====================
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(120), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_admin = db.Column(db.Boolean, default=False)
def set_password(self, password):
self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
def check_password(self, password):
return bcrypt.check_password_hash(self.password_hash, password)
class APIKey(db.Model):
__tablename__ = 'api_keys'
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String(64), unique=True, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False)
name = db.Column(db.String(100))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_used = db.Column(db.DateTime)
is_active = db.Column(db.Boolean, default=True)
usage_count = db.Column(db.Integer, default=0)
user = db.relationship('User', backref='api_keys')
@staticmethod
def generate_key():
return secrets.token_hex(32)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'key': self.key,
'created_at': self.created_at.strftime("%Y-%m-%d %H:%M:%S"),
'last_used': self.last_used.strftime("%Y-%m-%d %H:%M:%S") if self.last_used else 'Never',
'usage_count': self.usage_count,
'is_active': self.is_active
}
class APIEndpoint(db.Model):
__tablename__ = 'api_endpoints'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False)
route = db.Column(db.String(100), nullable=False)
model = db.Column(db.String(50), nullable=False)
prompt = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_active = db.Column(db.Boolean, default=True)
total_calls = db.Column(db.Integer, default=0)
total_tokens = db.Column(db.Integer, default=0)
total_cost = db.Column(db.Float, default=0.0)
user = db.relationship('User', backref='endpoints')
logs = db.relationship('APILog', backref='endpoint', lazy=True, cascade='all, delete-orphan')
def to_dict(self):
return {
'id': self.id,
'route': self.route,
'model': self.model,
'prompt': self.prompt,
'created_at': self.created_at.strftime("%Y-%m-%d %H:%M:%S"),
'is_active': self.is_active,
'total_calls': self.total_calls,
'total_tokens': self.total_tokens,
'total_cost': round(self.total_cost, 4)
}
class APILog(db.Model):
__tablename__ = 'api_logs'
id = db.Column(db.Integer, primary_key=True)
endpoint_id = db.Column(db.Integer, db.ForeignKey('api_endpoints.id', ondelete='CASCADE'), nullable=False)
api_key_id = db.Column(db.Integer, db.ForeignKey('api_keys.id', ondelete='SET NULL'), nullable=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
input_data = db.Column(db.Text)
output_data = db.Column(db.Text)
tokens_used = db.Column(db.Integer)
latency = db.Column(db.Float)
cost = db.Column(db.Float)
status_code = db.Column(db.Integer, default=200)
api_key = db.relationship('APIKey', backref='logs')
def to_dict(self):
return {
'id': self.id,
'endpoint_id': self.endpoint_id,
'timestamp': self.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
'tokens_used': self.tokens_used,
'latency': round(self.latency, 3),
'cost': round(self.cost, 4),
'status_code': self.status_code
}
# ==================== LOGIN MANAGER ====================
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# ==================== HELPER FUNCTIONS ====================
def generate_mock_response(prompt, input_data):
time.sleep(random.uniform(0.3, 0.8))
return f"AI Response based on prompt: '{prompt[:30]}...' and input: {json.dumps(input_data)}"
# ==================== AUTH ROUTES ====================
@app.route('/login', methods=['GET', 'POST'])
def login():
logger.info("Login route accessed")
if current_user.is_authenticated:
logger.info("User already authenticated, redirecting to dashboard")
return redirect(url_for('dashboard'))
if request.method == 'POST':
try:
logger.info(f"Login attempt - Content-Type: {request.content_type}")
data = request.get_json() if request.is_json else request.form
logger.info(f"Login data received")
username = data.get('username') if data else None
password = data.get('password') if data else None
if not username or not password:
logger.warning("Missing username or password")
if request.is_json:
return jsonify({"status": "error", "message": "Username and password required"}), 400
flash('Username and password required', 'error')
return render_template('login.html')
logger.info(f"Looking for user: {username}")
user = User.query.filter_by(username=username).first()
if not user:
logger.warning(f"User not found: {username}")
if request.is_json:
return jsonify({"status": "error", "message": "Invalid credentials"}), 401
flash('Invalid username or password', 'error')
return render_template('login.html')
logger.info(f"User found: {user.username}, checking password...")
if user.check_password(password):
logger.info("Password correct, logging in user...")
login_user(user, remember=True)
session.permanent = True
logger.info(f"User logged in: {user.username}")
next_page = request.args.get('next')
if request.is_json:
return jsonify({"status": "success", "message": "Login successful"})
return redirect(next_page) if next_page else redirect(url_for('dashboard'))
else:
logger.warning("Password incorrect")
if request.is_json:
return jsonify({"status": "error", "message": "Invalid credentials"}), 401
flash('Invalid username or password', 'error')
except Exception as e:
logger.error(f"Login error: {str(e)}", exc_info=True)
if request.is_json:
return jsonify({"status": "error", "message": f"Internal error: {str(e)}"}), 500
flash('An error occurred', 'error')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
if request.method == 'POST':
try:
data = request.get_json() if request.is_json else request.form
username = data.get('username')
password = data.get('password')
if User.query.filter_by(username=username).first():
if request.is_json:
return jsonify({"status": "error", "message": "Username exists"}), 400
flash('Username already exists', 'error')
return render_template('register.html')
user = User(username=username)
user.set_password(password)
if User.query.count() == 0:
user.is_admin = True
db.session.add(user)
db.session.commit()
logger.info(f"User registered: {username}")
if request.is_json:
return jsonify({"status": "success", "message": "Registration successful"})
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Register error: {str(e)}", exc_info=True)
if request.is_json:
return jsonify({"status": "error", "message": str(e)}), 500
flash('An error occurred', 'error')
return render_template('register.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
# ==================== FRONTEND ROUTES ====================
@app.route('/')
@login_required
def dashboard():
return render_template('index.html')
# ==================== CMS API ENDPOINTS ====================
@app.route('/api/cms/create', methods=['POST'])
@login_required
def create_api():
data = request.json
existing = APIEndpoint.query.filter_by(route=data['route'], user_id=current_user.id).first()
if existing:
return jsonify({"status": "error", "message": "Route already exists"}), 400
new_api = APIEndpoint(
user_id=current_user.id,
route=data['route'],
model=data['model'],
prompt=data['prompt']
)
db.session.add(new_api)
db.session.commit()
return jsonify({"status": "success", "message": "API Created Successfully", "api": new_api.to_dict()})
@app.route('/api/cms/list', methods=['GET'])
@login_required
def list_apis():
apis = APIEndpoint.query.filter_by(user_id=current_user.id).all()
return jsonify([api.to_dict() for api in apis])
@app.route('/api/cms/stats', methods=['GET'])
@login_required
def get_stats():
apis = APIEndpoint.query.filter_by(user_id=current_user.id).all()
total_calls = sum(api.total_calls for api in apis)
total_tokens = sum(api.total_tokens for api in apis)
total_cost = sum(api.total_cost for api in apis)
return jsonify({
"total_calls": total_calls,
"total_tokens": total_tokens,
"total_cost": round(total_cost, 4),
"active_apis": len([api for api in apis if api.is_active])
})
@app.route('/api/cms/logs', methods=['GET'])
@login_required
def get_logs():
user_endpoint_ids = [api.id for api in APIEndpoint.query.filter_by(user_id=current_user.id).all()]
logs = APILog.query.filter(APILog.endpoint_id.in_(user_endpoint_ids)).order_by(APILog.timestamp.desc()).limit(50).all()
return jsonify([log.to_dict() for log in logs])
@app.route('/api/cms/delete/<int:api_id>', methods=['DELETE'])
@login_required
def delete_api(api_id):
api = APIEndpoint.query.filter_by(id=api_id, user_id=current_user.id).first_or_404()
db.session.delete(api)
db.session.commit()
return jsonify({"status": "success", "message": "API Deleted"})
@app.route('/api/cms/export', methods=['GET'])
@login_required
def export_logs():
user_endpoint_ids = [api.id for api in APIEndpoint.query.filter_by(user_id=current_user.id).all()]
logs = APILog.query.filter(APILog.endpoint_id.in_(user_endpoint_ids)).order_by(APILog.timestamp.desc()).all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['Timestamp', 'Endpoint', 'Tokens', 'Latency', 'Cost', 'Status'])
for log in logs:
writer.writerow([
log.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
log.endpoint.route,
log.tokens_used,
round(log.latency, 3),
round(log.cost, 4),
log.status_code
])
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8')),
mimetype='text/csv',
as_attachment=True,
download_name=f'api_logs_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
)
# ==================== API KEY MANAGEMENT ====================
@app.route('/api/cms/keys', methods=['GET'])
@login_required
def list_keys():
keys = APIKey.query.filter_by(user_id=current_user.id).all()
return jsonify([k.to_dict() for k in keys])
@app.route('/api/cms/keys', methods=['POST'])
@login_required
def create_key():
data = request.json
new_key = APIKey(
user_id=current_user.id,
key=APIKey.generate_key(),
name=data.get('name', 'Unnamed Key')
)
db.session.add(new_key)
db.session.commit()
return jsonify({"status": "success", "key": new_key.key})
@app.route('/api/cms/keys/<int:key_id>', methods=['DELETE'])
@login_required
def delete_key(key_id):
key = APIKey.query.filter_by(id=key_id, user_id=current_user.id).first_or_404()
db.session.delete(key)
db.session.commit()
return jsonify({"status": "success", "message": "Key Deleted"})
@app.route('/api/cms/keys/<int:key_id>/toggle', methods=['POST'])
@login_required
def toggle_key(key_id):
key = APIKey.query.filter_by(id=key_id, user_id=current_user.id).first_or_404()
key.is_active = not key.is_active
db.session.commit()
return jsonify({"status": "success", "is_active": key.is_active})
# ==================== DYNAMIC USER API ENDPOINTS ====================
@app.route('/user-api/<path:route>', methods=['GET', 'POST'])
def dynamic_endpoint(route):
api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
target_api = APIEndpoint.query.filter_by(route=route, is_active=True).first()
if not target_api:
return jsonify({"error": "Endpoint not found"}), 404
api_key_obj = None
if api_key:
api_key_obj = APIKey.query.filter_by(key=api_key, is_active=True).first()
if not api_key_obj:
return jsonify({"error": "Invalid API Key"}), 401
api_key_obj.last_used = datetime.utcnow()
api_key_obj.usage_count += 1
start_time = time.time()
input_data = request.json if request.is_json else request.args.to_dict()
response_text = generate_mock_response(target_api.prompt, input_data)
latency = time.time() - start_time
tokens_used = len(response_text) // 4
cost = (tokens_used / 1000) * 0.002
target_api.total_calls += 1
target_api.total_tokens += tokens_used
target_api.total_cost += cost
log = APILog(
endpoint_id=target_api.id,
api_key_id=api_key_obj.id if api_key_obj else None,
input_data=json.dumps(input_data),
output_data=response_text,
tokens_used=tokens_used,
latency=latency,
cost=cost,
status_code=200
)
db.session.add(log)
db.session.commit()
return jsonify({
"response": response_text,
"meta": {
"model": target_api.model,
"latency": round(latency, 3),
"tokens": tokens_used,
"cost": round(cost, 4)
}
})
# ==================== DEBUG ROUTES ====================
@app.route('/debug-db')
def debug_db():
try:
users = User.query.all()
return jsonify({
'status': 'ok',
'users': [{'id': u.id, 'username': u.username} for u in users],
'user_count': len(users)
})
except Exception as e:
return jsonify({
'status': 'error',
'error': str(e)
}), 500
@app.route('/debug-session')
def debug_session():
return jsonify({
'is_authenticated': current_user.is_authenticated,
'user_id': current_user.id if current_user.is_authenticated else None,
'username': current_user.username if current_user.is_authenticated else None,
'session_id': session.get('_id', 'No session')
})
# ==================== ERROR HANDLERS ====================
@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
logger.error(f"Internal error: {str(error)}")
if request.is_json:
return jsonify({"status": "error", "message": "Internal server error"}), 500
return render_template('500.html'), 500
@app.errorhandler(404)
def not_found(error):
if request.is_json:
return jsonify({"status": "error", "message": "Not found"}), 404
return render_template('404.html'), 404
# ==================== DATABASE INITIALIZATION ====================
def init_db():
with app.app_context():
db.create_all()
if User.query.count() == 0:
admin = User(username='admin', is_admin=True)
admin.set_password('admin123')
db.session.add(admin)
db.session.commit()
logger.info("✅ Default admin created (username: admin, password: admin123)")
logger.info("✅ Database initialized!")
# ==================== MAIN ====================
if __name__ == '__main__':
init_db()
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=False)