KoreAI-API / main.py
rairo's picture
Update main.py
ea090ec verified
raw
history blame
21.9 kB
# app.py
import os
import io
import uuid
import re
import json
import traceback
from datetime import datetime, timedelta
from flask import Flask, request, jsonify
from flask_cors import CORS
import firebase_admin
from firebase_admin import credentials, db, storage, auth
import pandas as pd
from pathlib import Path
# Import the Sozo business logic
from sozo_gen import (
generate_report_draft,
generate_single_chart,
generate_video_from_project,
load_dataframe_safely
)
# -----------------------------------------------------------------------------
# 1. CONFIGURATION & INITIALIZATION
# -----------------------------------------------------------------------------
app = Flask(__name__)
CORS(app)
try:
credentials_json_string = os.environ.get("FIREBASE")
if not credentials_json_string: raise ValueError("FIREBASE env var not set.")
credentials_json = json.loads(credentials_json_string)
firebase_db_url = os.environ.get("Firebase_DB")
firebase_storage_bucket = os.environ.get("Firebase_Storage")
if not firebase_db_url or not firebase_storage_bucket: raise ValueError("Firebase DB/Storage env vars must be set.")
cred = credentials.Certificate(credentials_json)
firebase_admin.initialize_app(cred, {'databaseURL': firebase_db_url, 'storageBucket': firebase_storage_bucket})
print("Firebase Admin SDK initialized successfully.")
except Exception as e:
print(f"FATAL: Error initializing Firebase: {e}")
bucket = storage.bucket()
# -----------------------------------------------------------------------------
# 2. HELPER FUNCTIONS
# -----------------------------------------------------------------------------
def verify_token(token):
try: return auth.verify_id_token(token)['uid']
except Exception: return None
def verify_admin(auth_header):
if not auth_header or not auth_header.startswith('Bearer '): raise ValueError('Invalid token')
token = auth_header.split(' ')[1]
uid = verify_token(token)
if not uid: raise PermissionError('Invalid user')
user_data = db.reference(f'users/{uid}').get()
if not user_data or not user_data.get('is_admin', False): raise PermissionError('Admin access required')
return uid
def is_valid_email(email):
"""Simple regex for basic email validation."""
regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(regex, email) is not None
# -----------------------------------------------------------------------------
# 3. AUTHENTICATION & USER MANAGEMENT
# -----------------------------------------------------------------------------
@app.route('/api/auth/signup', methods=['POST'])
def signup():
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password: return jsonify({'error': 'Email and password are required'}), 400
user = auth.create_user(email=email, password=password)
user_ref = db.reference(f'users/{user.uid}')
user_data = {'email': email, 'credits': 15, 'is_admin': False, 'created_at': datetime.utcnow().isoformat()}
user_ref.set(user_data)
return jsonify({'success': True, 'user': {'uid': user.uid, **user_data}}), 201
except Exception as e: return jsonify({'error': str(e)}), 400
@app.route('/api/user/profile', methods=['GET'])
def get_user_profile():
try:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid token'}), 401
token = auth_header.split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Invalid or expired token'}), 401
user_data = db.reference(f'users/{uid}').get()
if not user_data: return jsonify({'error': 'User not found'}), 404
return jsonify({'uid': uid, **user_data})
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/auth/google-signin', methods=['POST'])
def google_signin():
try:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid token'}), 401
token = auth_header.split(' ')[1]
decoded_token = auth.verify_id_token(token)
uid = decoded_token['uid']
email = decoded_token.get('email')
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data:
user_data = {'email': email, 'credits': 15, 'is_admin': False, 'created_at': datetime.utcnow().isoformat()}
user_ref.set(user_data)
return jsonify({'success': True, 'user': {'uid': uid, **user_data}}), 200
except Exception as e: return jsonify({'error': str(e)}), 400
# -----------------------------------------------------------------------------
# 4. SOZO BUSINESS STUDIO API ENDPOINTS
# -----------------------------------------------------------------------------
@app.route('/api/sozo/projects', methods=['POST'])
def create_sozo_project():
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '': return jsonify({'error': 'No selected file'}), 400
context = request.form.get('context', '')
project_id = uuid.uuid4().hex
file_bytes = file.read()
file.seek(0)
ext = Path(file.filename).suffix
blob_name = f"sozo_projects/{uid}/{project_id}/data{ext}"
blob = bucket.blob(blob_name)
blob.upload_from_string(file_bytes, content_type=file.content_type)
project_ref = db.reference(f'sozo_projects/{project_id}')
project_data = {'uid': uid, 'status': 'uploaded', 'createdAt': datetime.utcnow().isoformat(), 'userContext': context, 'originalDataUrl': blob.public_url, 'originalFilename': file.filename}
project_ref.set(project_data)
df = load_dataframe_safely(io.BytesIO(file_bytes), file.filename)
preview_json = df.head().to_json(orient='records')
return jsonify({'success': True, 'project_id': project_id, 'preview': json.loads(preview_json)}), 201
except Exception as e:
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/api/sozo/projects/<string:project_id>/generate-report', methods=['POST'])
def generate_sozo_report(project_id):
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
project_ref = db.reference(f'sozo_projects/{project_id}')
project_data = project_ref.get()
if not project_data or project_data.get('uid') != uid: return jsonify({'error': 'Project not found or unauthorized'}), 404
blob_path = "/".join(project_data['originalDataUrl'].split('/')[4:])
blob = bucket.blob(blob_path)
file_bytes = blob.download_as_bytes()
draft_data = generate_report_draft(io.BytesIO(file_bytes), project_data['originalFilename'], project_data['userContext'], uid, project_id, bucket)
update_data = {'status': 'draft', 'rawMarkdown': draft_data['raw_md'], 'chartUrls': draft_data['chartUrls']}
project_ref.update(update_data)
return jsonify({'success': True, 'project': {**project_data, **update_data}}), 200
except Exception as e:
db.reference(f'sozo_projects/{project_id}').update({'status': 'failed', 'error': str(e)})
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/api/sozo/projects', methods=['GET'])
def get_sozo_projects():
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
all_projects = db.reference('sozo_projects').order_by_child('uid').equal_to(uid).get()
return jsonify(all_projects or {}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/sozo/projects/<string:project_id>', methods=['GET'])
def get_sozo_project(project_id):
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
project_data = db.reference(f'sozo_projects/{project_id}').get()
if not project_data or project_data.get('uid') != uid: return jsonify({'error': 'Project not found or unauthorized'}), 404
return jsonify(project_data), 200
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/sozo/projects/<string:project_id>/markdown', methods=['PUT'])
def update_sozo_markdown(project_id):
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
project_ref = db.reference(f'sozo_projects/{project_id}')
if not project_ref.get() or project_ref.get().get('uid') != uid: return jsonify({'error': 'Project not found or unauthorized'}), 404
data = request.get_json()
if 'raw_md' not in data: return jsonify({'error': 'raw_md is required'}), 400
project_ref.update({'rawMarkdown': data['raw_md']})
return jsonify({'success': True}), 200
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/sozo/projects/<string:project_id>/charts', methods=['POST'])
def regenerate_sozo_chart(project_id):
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
project_ref = db.reference(f'sozo_projects/{project_id}')
project_data = project_ref.get()
if not project_data or project_data.get('uid') != uid: return jsonify({'error': 'Project not found or unauthorized'}), 404
data = request.get_json()
description = data.get('description')
if not description: return jsonify({'error': 'Chart description is required'}), 400
blob_path = "/".join(project_data['originalDataUrl'].split('/')[4:])
blob = bucket.blob(blob_path)
file_bytes = blob.download_as_bytes()
df = load_dataframe_safely(io.BytesIO(file_bytes), project_data['originalFilename'])
new_chart_url = generate_single_chart(df, description, uid, project_id, bucket)
if not new_chart_url: return jsonify({'error': 'Chart generation failed'}), 500
project_ref.child('chartUrls').update({description: new_chart_url})
return jsonify({'success': True, 'description': description, 'new_url': new_chart_url}), 200
except Exception as e:
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/api/sozo/projects/<string:project_id>/generate-video', methods=['POST'])
def generate_sozo_video(project_id):
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
project_ref = db.reference(f'sozo_projects/{project_id}')
project_data = project_ref.get()
if not project_data or project_data.get('uid') != uid: return jsonify({'error': 'Project not found or unauthorized'}), 404
data = request.get_json()
voice_model = data.get('voice_model', 'aura-2-andromeda-en')
if voice_model not in ['aura-2-andromeda-en', 'aura-2-orpheus-en']: return jsonify({'error': 'Invalid voice model specified'}), 400
project_ref.update({'status': 'generating_video'})
blob_path = "/".join(project_data['originalDataUrl'].split('/')[4:])
blob = bucket.blob(blob_path)
file_bytes = blob.download_as_bytes()
df = load_dataframe_safely(io.BytesIO(file_bytes), project_data['originalFilename'])
video_url = generate_video_from_project(df, project_data.get('rawMarkdown', ''), uid, project_id, voice_model, bucket)
if not video_url: raise Exception("Video generation failed in core function.")
project_ref.update({'status': 'video_complete', 'videoUrl': video_url})
return jsonify({'success': True, 'video_url': video_url}), 200
except Exception as e:
db.reference(f'sozo_projects/{project_id}').update({'status': 'failed', 'error': str(e)})
traceback.print_exc()
return jsonify({'error': str(e)}), 500
# -----------------------------------------------------------------------------
# 5. UNIVERSAL ENDPOINTS (Waitlist, Feedback, Credits)
# -----------------------------------------------------------------------------
@app.route('/join-waitlist', methods=['POST'])
def join_waitlist():
try:
data = request.get_json()
if not data: return jsonify({"status": "error", "message": "Invalid request. JSON payload expected."}), 400
email = data.get('email')
if not email or not is_valid_email(email): return jsonify({"status": "error", "message": "A valid email is required."}), 400
email = email.lower()
waitlist_ref = db.reference('sozo_waitlist')
if waitlist_ref.order_by_child('email').equal_to(email).get():
return jsonify({"status": "success", "message": "You are already on the waitlist!"}), 200
new_entry_ref = waitlist_ref.push()
new_entry_ref.set({'email': email, 'timestamp': datetime.utcnow().isoformat() + 'Z'})
return jsonify({"status": "success", "message": "Thank you for joining the waitlist!"}), 201
except Exception as e:
traceback.print_exc()
return jsonify({"status": "error", "message": "An internal server error occurred."}), 500
@app.route('/api/feedback', methods=['POST'])
def submit_feedback():
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Invalid or expired token'}), 401
data = request.get_json()
message = data.get('message')
if not message: return jsonify({'error': 'message is required'}), 400
user_email = (db.reference(f'users/{uid}').get() or {}).get('email', 'unknown')
feedback_ref = db.reference('feedback').push()
feedback_ref.set({"user_id": uid, "user_email": user_email, "type": data.get('type', 'general'), "message": message, "created_at": datetime.utcnow().isoformat(), "status": "open"})
return jsonify({"success": True, "feedback_id": feedback_ref.key}), 201
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/user/request-credits', methods=['POST'])
def request_credits():
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Invalid token'}), 401
requested_credits = request.get_json().get('requested_credits')
if requested_credits is None: return jsonify({'error': 'requested_credits is required'}), 400
credit_request_ref = db.reference('credit_requests').push()
credit_request_ref.set({'user_id': uid, 'requested_credits': requested_credits, 'status': 'pending', 'requested_at': datetime.utcnow().isoformat()})
return jsonify({'success': True, 'request_id': credit_request_ref.key})
except Exception as e: return jsonify({'error': str(e)}), 500
# -----------------------------------------------------------------------------
# 6. ADMIN ENDPOINTS
# -----------------------------------------------------------------------------
@app.route('/api/admin/dashboard-stats', methods=['GET'])
def get_admin_dashboard_stats():
"""A singular endpoint to fetch all key metrics for the admin dashboard."""
try:
verify_admin(request.headers.get('Authorization', ''))
# Fetch all necessary data in one go
all_users = db.reference('users').get() or {}
all_projects = db.reference('sozo_projects').get() or {}
all_feedback = db.reference('feedback').get() or {}
all_credit_requests = db.reference('credit_requests').get() or {}
waitlist = db.reference('sozo_waitlist').get() or {}
# --- Initialize Stats ---
stats = {
"user_stats": {"total": 0, "new_24h": 0, "new_7d": 0},
"project_stats": {"total": 0, "new_24h": 0, "failed": 0, "videos_generated": 0},
"action_items": {"open_feedback": 0, "pending_credit_requests": 0},
"growth_stats": {"waitlist_total": 0}
}
now = datetime.utcnow()
one_day_ago = now - timedelta(days=1)
seven_days_ago = now - timedelta(days=7)
# --- Process Users ---
stats["user_stats"]["total"] = len(all_users)
for user_data in all_users.values():
created_at_str = user_data.get('created_at')
if created_at_str:
created_at_dt = datetime.fromisoformat(created_at_str)
if created_at_dt > one_day_ago:
stats["user_stats"]["new_24h"] += 1
if created_at_dt > seven_days_ago:
stats["user_stats"]["new_7d"] += 1
# --- Process Projects ---
stats["project_stats"]["total"] = len(all_projects)
for project_data in all_projects.values():
created_at_str = project_data.get('createdAt')
if created_at_str:
created_at_dt = datetime.fromisoformat(created_at_str)
if created_at_dt > one_day_ago:
stats["project_stats"]["new_24h"] += 1
if project_data.get('status') == 'failed':
stats["project_stats"]["failed"] += 1
if project_data.get('status') == 'video_complete':
stats["project_stats"]["videos_generated"] += 1
# --- Process Action Items ---
stats["action_items"]["open_feedback"] = sum(1 for fb in all_feedback.values() if fb.get('status') == 'open')
stats["action_items"]["pending_credit_requests"] = sum(1 for cr in all_credit_requests.values() if cr.get('status') == 'pending')
# --- Process Growth ---
stats["growth_stats"]["waitlist_total"] = len(waitlist)
return jsonify(stats), 200
except Exception as e:
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/credit_requests', methods=['GET'])
def list_credit_requests():
try:
verify_admin(request.headers.get('Authorization', ''))
requests_list = [{'id': req_id, **data} for req_id, data in (db.reference('credit_requests').get() or {}).items()]
return jsonify({'credit_requests': requests_list})
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/admin/credit_requests/<string:request_id>', methods=['PUT'])
def process_credit_request(request_id):
try:
admin_uid = verify_admin(request.headers.get('Authorization', ''))
req_ref = db.reference(f'credit_requests/{request_id}')
req_data = req_ref.get()
if not req_data: return jsonify({'error': 'Credit request not found'}), 404
decision = request.get_json().get('decision')
if decision not in ['approved', 'declined']: return jsonify({'error': 'decision must be "approved" or "declined"'}), 400
if decision == 'approved':
user_ref = db.reference(f'users/{req_data["user_id"]}')
new_total = (user_ref.get() or {}).get('credits', 0) + float(req_data.get('requested_credits', 0))
user_ref.update({'credits': new_total})
req_ref.update({'status': 'approved', 'processed_by': admin_uid, 'processed_at': datetime.utcnow().isoformat()})
return jsonify({'success': True, 'new_user_credits': new_total})
else:
req_ref.update({'status': 'declined', 'processed_by': admin_uid, 'processed_at': datetime.utcnow().isoformat()})
return jsonify({'success': True, 'message': 'Credit request declined'})
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/admin/users', methods=['GET'])
def admin_list_users():
try:
verify_admin(request.headers.get('Authorization', ''))
all_users = db.reference('users').get() or {}
user_list = [{'uid': uid, **data} for uid, data in all_users.items()]
return jsonify({'users': user_list}), 200
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/admin/users/<string:uid>/credits', methods=['PUT'])
def admin_update_credits(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
add_credits = request.get_json().get('add_credits')
if add_credits is None: return jsonify({'error': 'add_credits is required'}), 400
user_ref = db.reference(f'users/{uid}')
if not user_ref.get(): return jsonify({'error': 'User not found'}), 404
new_total = user_ref.get().get('credits', 0) + float(add_credits)
user_ref.update({'credits': new_total})
return jsonify({'success': True, 'new_total_credits': new_total})
except Exception as e: return jsonify({'error': str(e)}), 500
# -----------------------------------------------------------------------------
# 7. MAIN EXECUTION
# -----------------------------------------------------------------------------
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))