Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import uuid | |
| import re | |
| import time | |
| import tempfile | |
| import requests | |
| import json | |
| import pandas as pd | |
| from datetime import datetime | |
| from flask import Flask, request, jsonify, send_file | |
| from flask_cors import CORS, cross_origin | |
| from firebase_admin import credentials, db, storage, auth | |
| import firebase_admin | |
| from PIL import ImageFont, ImageDraw, Image | |
| import logging | |
| import traceback | |
| from video_gen import create_video | |
| import zipfile | |
| from fpdf import FPDF | |
| import tempfile | |
| import urllib.parse | |
| from stories import generateResponse | |
| from styled_video_gen import create_styled_video, DEFAULT_WIDTH, DEFAULT_HEIGHT, DEFAULT_FPS, DEFAULT_TRANSITION_DURATION, DEFAULT_FONT, DEFAULT_LOGO_PATH | |
| # Initialize Flask app and CORS | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Firebase initialization | |
| Firebase_DB = os.getenv("Firebase_DB") | |
| Firebase_Storage = os.getenv("Firebase_Storage") | |
| LOG_FILE_PATH = "/tmp/video_gen.log" | |
| try: | |
| # Retrieve the JSON content from the secret | |
| credentials_json_string = os.environ.get("FIREBASE") | |
| if credentials_json_string: | |
| # Parse the JSON string into a Python dictionary | |
| credentials_json = json.loads(credentials_json_string) | |
| # Initialize Firebase Admin SDK | |
| cred = credentials.Certificate(credentials_json) | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': f'{Firebase_DB}', | |
| 'storageBucket': f'{Firebase_Storage}' | |
| }) | |
| print("Firebase Admin SDK initialized successfully.") | |
| else: | |
| print("FIREBASE secret not set.") | |
| except Exception as e: | |
| print(f"Error initializing Firebase: {e}") | |
| bucket = storage.bucket() | |
| # Helper: Upload a local file to Firebase Storage and return its public URL | |
| def upload_to_storage(local_path, destination_blob_name): | |
| blob = bucket.blob(destination_blob_name) | |
| blob.upload_from_filename(local_path) | |
| return blob.public_url | |
| # Gemini API initialization | |
| api_key = os.environ['Gemini'] | |
| def configure_gemini(): | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| # Helper functions | |
| def verify_token(token): | |
| try: | |
| decoded_token = auth.verify_id_token(token) | |
| return decoded_token['uid'] | |
| except Exception as e: | |
| return None | |
| def verify_admin(auth_header): | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| raise ValueError('Invalid token') | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| raise PermissionError('Invalid user') | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data or not user_data.get('is_admin', False): | |
| raise PermissionError('Admin access required') | |
| return uid | |
| # ---------- Dummy Admin Creation on Startup ---------- | |
| """ | |
| def create_dummy_admin(): | |
| try: | |
| # Try to get the user if it exists | |
| admin_user = auth.get_user_by_email(admin_email) | |
| except firebase_admin.auth.UserNotFoundError: | |
| # Create the dummy admin if not found | |
| admin_user = auth.create_user(email=admin_email, password=admin_password) | |
| # Set or update admin record in the database | |
| admin_ref = db.reference(f'users/{admin_user.uid}') | |
| admin_data = admin_ref.get() or {} | |
| if not admin_data.get('is_admin', False): | |
| admin_ref.set({ | |
| 'email': admin_email, | |
| 'credits': 9999, # Optionally, give admin lots of credits | |
| 'is_admin': True, | |
| 'created_at': datetime.utcnow().isoformat() | |
| }) | |
| print(f"Dummy admin ready: {admin_email}") | |
| """ | |
| # ---------- Authentication Endpoints ---------- | |
| def signup(): | |
| try: | |
| data = request.get_json() | |
| email = data.get('email') | |
| password = data.get('password') | |
| if not email or not password: | |
| return jsonify({'error': 'Email and password are required'}), 400 | |
| # Create user in Firebase Auth | |
| user = auth.create_user(email=email, password=password) | |
| # Set initial user data in the realtime database with 3 starting credits | |
| user_ref = db.reference(f'users/{user.uid}') | |
| user_data = { | |
| 'email': email, | |
| 'credits': 30, | |
| 'is_admin': False, | |
| 'created_at': datetime.utcnow().isoformat() | |
| } | |
| user_ref.set(user_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'user': { | |
| 'uid': user.uid, | |
| **user_data | |
| } | |
| }), 201 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| # ---------- User Profile ---------- | |
| def get_user_profile(): | |
| try: | |
| auth_header = request.headers.get('Authorization', '') | |
| print("Received Auth Header (user):", auth_header) # Debugging | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| user_data = db.reference(f'users/{uid}').get() | |
| print("Fetched User Data (user):", user_data) # Debugging | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| return jsonify({ | |
| 'uid': uid, | |
| 'email': user_data.get('email'), | |
| 'credits': user_data.get('credits', 0), | |
| 'is_admin': user_data.get('is_admin', False) | |
| }) | |
| except Exception as e: | |
| print(f"Error fetching user profile: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def google_signin(): | |
| try: | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| decoded_token = auth.verify_id_token(token) # Verify the token | |
| uid = decoded_token['uid'] | |
| email = decoded_token.get('email') | |
| # Check if user already exists in database | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| # New user, create an entry in the database | |
| user_data = { | |
| 'email': email, | |
| 'credits': 30, # Give new users initial credits | |
| 'is_admin': False, | |
| 'created_at': datetime.utcnow().isoformat(), | |
| } | |
| user_ref.set(user_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'user': { | |
| 'uid': uid, | |
| **user_data | |
| } | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def upload_log(): | |
| """Uploads the log file to Firebase Storage and returns the download URL.""" | |
| try: | |
| log_blob_name = f"logs/{uuid.uuid4().hex}.log" | |
| log_url = upload_to_storage(LOG_FILE_PATH, log_blob_name) | |
| return log_url | |
| except Exception as e: | |
| logging.error(f"❌ ERROR: Failed to upload log file: {e}") | |
| return None | |
| def submit_feedback(): | |
| """ | |
| Allows a user to submit feedback, bug reports, or feature requests. | |
| """ | |
| try: | |
| # --- Authentication --- | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # --- Parse Feedback Data --- | |
| data = request.get_json() | |
| feedback_type = data.get('type', 'general') # e.g. "bug", "feature_request", "general" | |
| message = data.get('message') | |
| if not message: | |
| return jsonify({'error': 'message is required'}), 400 | |
| # Optionally, store some user info like email or name | |
| user_data = db.reference(f'users/{uid}').get() or {} | |
| user_email = user_data.get('email', 'unknown') | |
| # Create a new feedback entry in "feedback" node | |
| feedback_id = str(uuid.uuid4()) | |
| feedback_ref = db.reference(f'feedback/{feedback_id}') | |
| feedback_record = { | |
| "user_id": uid, | |
| "user_email": user_email, | |
| "type": feedback_type, | |
| "message": message, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "status": "open" # admin can mark "resolved" or "in progress" | |
| } | |
| feedback_ref.set(feedback_record) | |
| return jsonify({"success": True, "feedback_id": feedback_id}), 201 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ----------------------- | |
| # Content | |
| # ----------------------- | |
| # ---------- Story Generation Endpoint ---------- | |
| # Configure logging to write to a file | |
| logging.basicConfig(filename=LOG_FILE_PATH, level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") | |
| def generate_story_endpoint(): | |
| try: | |
| # --- Authentication --- | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # --- Check User Credits Before Generation --- | |
| user_ref = db.reference(f"users/{uid}") | |
| user_data = user_ref.get() or {} | |
| current_credits = user_data.get("credits", 0) | |
| if current_credits < 5: | |
| return jsonify({'error': 'Insufficient credits. You need at least 5 credits to generate a story.'}), 403 | |
| # --- Read Request Data --- | |
| data = request.form.to_dict() # For multipart/form-data | |
| input_type = data.get('input_type', 'text') | |
| prompt = data.get('prompt') # For "text" only | |
| story_type = data.get('story_type', 'free_form') | |
| style = data.get('style', 'whimsical') | |
| voice_model = data.get('voice_model', 'aura-asteria-en') | |
| image_model = data.get('image_model', 'hf') | |
| audio_model = data.get('audio_model', 'deepgram') | |
| if input_type not in ["text", "pdf", "wiki", "bible", "youtube", "dataframe"]: | |
| return jsonify({'error': 'Unsupported input_type'}), 400 | |
| # Optionally retrieve these fields if relevant | |
| wiki_url = data.get("wiki_url") | |
| bible_reference = data.get("bible_reference") | |
| youtube_url = data.get("youtube_url") | |
| ext = data.get("ext") # For dataframe usage | |
| # Prepare for story generation | |
| from stories import ( | |
| generate_story_from_text, | |
| get_pdf_text, | |
| get_df, | |
| generate_story_from_dataframe, | |
| generateResponse | |
| ) | |
| story_gen_start = time.time() | |
| full_story = None | |
| # 1) Generate the full story text | |
| if input_type == "text": | |
| if not prompt: | |
| return jsonify({'error': 'Prompt is required for text input'}), 400 | |
| full_story = generate_story_from_text(prompt, story_type) | |
| elif input_type == "pdf": | |
| uploaded_file = request.files.get("file") | |
| if not uploaded_file: | |
| return jsonify({'error': 'No PDF file uploaded'}), 400 | |
| pdf_text = get_pdf_text(uploaded_file) | |
| full_story = generate_story_from_text(pdf_text, story_type) | |
| elif input_type == "dataframe": | |
| uploaded_file = request.files.get("file") | |
| if not uploaded_file or not ext: | |
| return jsonify({'error': 'File and ext are required for dataframe input'}), 400 | |
| df = get_df(uploaded_file, ext) | |
| if df is None: | |
| return jsonify({'error': f'Failed to read {ext} file'}), 400 | |
| full_story = generate_story_from_dataframe(df, story_type) | |
| elif input_type == "wiki": | |
| if not wiki_url: | |
| return jsonify({'error': 'wiki_url is required for input_type "wiki"'}), 400 | |
| from stories import generate_story_from_wiki | |
| full_story = generate_story_from_wiki(wiki_url, story_type) | |
| elif input_type == "bible": | |
| if not bible_reference: | |
| return jsonify({'error': 'bible_reference is required for input_type "bible"'}), 400 | |
| from stories import generate_story_from_bible | |
| full_story = generate_story_from_bible(bible_reference, story_type) | |
| elif input_type == "youtube": | |
| if not youtube_url: | |
| return jsonify({'error': 'youtube_url is required for input_type "youtube"'}), 400 | |
| from stories import generate_story_from_youtube | |
| full_story = generate_story_from_youtube(youtube_url, story_type) | |
| # Measure generation time | |
| story_gen_end = time.time() | |
| story_generation_time = story_gen_end - story_gen_start | |
| if not full_story: | |
| return jsonify({'error': 'Story generation failed'}), 500 | |
| # 2) Split into 5 sections | |
| sections_raw = [s.strip() for s in full_story.split("[break]") if s.strip()] | |
| if len(sections_raw) < 5: | |
| sections_raw += ["(Placeholder section)"] * (5 - len(sections_raw)) | |
| elif len(sections_raw) > 5: | |
| sections_raw = sections_raw[:5] | |
| sections = [] | |
| image_generation_times = [] | |
| audio_generation_times = [] | |
| from image_gen import generate_image_with_retry | |
| from audio_gen import generate_audio | |
| # If input_type is "dataframe", re-use df for chart generation | |
| df = None | |
| if input_type == "dataframe": | |
| uploaded_file = request.files.get("file") | |
| if uploaded_file and ext: | |
| df = get_df(uploaded_file, ext) | |
| # 3) Process each section | |
| for section_text in sections_raw: | |
| img_prompt_match = re.search(r"<(.*?)>", section_text) | |
| img_prompt = img_prompt_match.group(1).strip() if img_prompt_match else section_text[:100] | |
| image_start = time.time() | |
| image_obj = None | |
| # Attempt chart generation if dataframe | |
| if input_type == "dataframe" and df is not None: | |
| try: | |
| chart_str = generateResponse(img_prompt, df) | |
| logging.info(f"chart string: {chart_str}") | |
| if chart_str and chart_str.startswith("data:image/png;base64,"): | |
| base64_data = chart_str.split(",", 1)[1] | |
| logging.info(f"base64 data: {base64_data}") | |
| chart_bytes = base64.b64decode(chart_bytes) | |
| image_obj = Image.open(io.BytesIO(chart_bytes)) | |
| logging.info(f"Image: {image_obj}") | |
| except Exception as e: | |
| logging.error(f"error {e}, DataFrame chart generation error") | |
| print("DataFrame chart generation error:", e) | |
| # Fallback to generate_image_with_retry | |
| if not image_obj: | |
| image_obj, _ = generate_image_with_retry(img_prompt, style, model=image_model) | |
| image_end = time.time() | |
| image_generation_times.append(image_end - image_start) | |
| # Save & upload | |
| image_filename = f"/tmp/{uuid.uuid4().hex}.jpg" | |
| image_obj.save(image_filename, format="JPEG") | |
| image_blob_name = f"stories/{uid}/{uuid.uuid4().hex}.jpg" | |
| image_url = upload_to_storage(image_filename, image_blob_name) | |
| os.remove(image_filename) | |
| # Generate audio without <image> description | |
| audio_text = re.sub(r"<.*?>", "", section_text) | |
| audio_start = time.time() | |
| audio_file_path = generate_audio(audio_text, voice_model, audio_model=audio_model) | |
| audio_end = time.time() | |
| audio_generation_times.append(audio_end - audio_start) | |
| audio_blob_name = f"stories/{uid}/{uuid.uuid4().hex}.mp3" | |
| audio_url = upload_to_storage(audio_file_path, audio_blob_name) | |
| os.remove(audio_file_path) | |
| sections.append({ | |
| "section_text": section_text, | |
| "image_url": image_url, | |
| "audio_url": audio_url | |
| }) | |
| # 4) Store the story | |
| story_id = str(uuid.uuid4()) | |
| story_ref = db.reference(f"stories/{story_id}") | |
| # ---------- STORE INPUT PARAMS ----------- | |
| input_params = { | |
| "input_type": input_type, | |
| "prompt": prompt, # might be None if pdf/dataframe | |
| "wiki_url": wiki_url, # might be None | |
| "bible_reference": bible_reference, | |
| "youtube_url": youtube_url, | |
| "story_type": story_type, | |
| "style": style, | |
| "voice_model": voice_model, | |
| "image_model": image_model, | |
| "audio_model": audio_model, | |
| "ext": ext # for dataframe | |
| } | |
| # ----------------------------------------- | |
| story_record = { | |
| "uid": uid, | |
| "full_story": full_story, | |
| "sections": sections, | |
| "generation_times": { | |
| "story_generation_time": story_generation_time, | |
| "image_generation_times": image_generation_times, | |
| "audio_generation_times": audio_generation_times | |
| }, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "input_type": input_type, | |
| "story_type": story_type, | |
| "input_params": input_params # <-- store them here | |
| } | |
| story_ref.set(story_record) | |
| # Subtract 5 Credits | |
| new_credits = current_credits - 5 | |
| user_ref.update({"credits": new_credits}) | |
| preview = sections[0] if sections else {} | |
| return jsonify({ | |
| "story_id": story_id, | |
| "full_story": full_story, | |
| "preview": preview, | |
| "sections": sections, | |
| "generation_times": story_record["generation_times"], | |
| "new_credits": new_credits, | |
| "input_params": input_params # Return them if you want | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ---------- Video Generation Endpoint ---------- | |
| # Configure logging to write to a file | |
| logging.basicConfig(filename=LOG_FILE_PATH, level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") | |
| def generate_video_endpoint(): | |
| try: | |
| logging.info("➡️ Received video generation request...") | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| logging.error("❌ ERROR: Missing or invalid token") | |
| return jsonify({ | |
| 'error': 'Missing or invalid token', | |
| 'log_url': upload_log() # Upload log file so you can see the error | |
| }), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| logging.error("❌ ERROR: Invalid or expired token") | |
| return jsonify({ | |
| 'error': 'Invalid or expired token', | |
| 'log_url': upload_log() | |
| }), 401 | |
| user_ref = db.reference(f"users/{uid}") | |
| user_data = user_ref.get() or {} | |
| current_credits = user_data.get("credits", 0) | |
| if current_credits < 5: | |
| return jsonify({'error': 'Insufficient credits. You need at least 5 credits to generate a video.'}), 403 | |
| data = request.get_json() | |
| story_id = data.get('story_id') | |
| if not story_id: | |
| logging.error("❌ ERROR: story_id is required") | |
| return jsonify({ | |
| 'error': 'story_id is required', | |
| 'log_url': upload_log() | |
| }), 400 | |
| logging.info(f"Fetching story {story_id} from Firebase...") | |
| story_ref = db.reference(f"stories/{story_id}") | |
| story_data = story_ref.get() | |
| if not story_data: | |
| logging.error("❌ ERROR: Story not found") | |
| return jsonify({ | |
| 'error': 'Story not found', | |
| 'log_url': upload_log() | |
| }), 404 | |
| sections = story_data.get("sections", []) | |
| if not sections: | |
| logging.error("❌ ERROR: No sections found in the story") | |
| return jsonify({ | |
| 'error': 'No sections found in the story', | |
| 'log_url': upload_log() | |
| }), 404 | |
| image_files = [] | |
| audio_files = [] | |
| logging.info(f"Processing {len(sections)} sections...") | |
| # Download each image and audio file | |
| for section in sections: | |
| image_url = section.get("image_url") | |
| audio_url = section.get("audio_url") | |
| logging.info(f"➡️ Downloading image from: {image_url}") | |
| img_resp = requests.get(image_url) | |
| if img_resp.status_code == 200: | |
| img = Image.open(io.BytesIO(img_resp.content)) | |
| image_files.append(img) | |
| logging.info("✅ Image downloaded successfully.") | |
| else: | |
| logging.error(f"❌ ERROR: Failed to download image {image_url}") | |
| logging.info(f"➡️ Downloading audio from: {audio_url}") | |
| aud_resp = requests.get(audio_url) | |
| if aud_resp.status_code == 200: | |
| aud_path = f"/tmp/{uuid.uuid4().hex}.mp3" | |
| with open(aud_path, "wb") as f: | |
| f.write(aud_resp.content) | |
| audio_files.append(aud_path) | |
| logging.info("✅ Audio downloaded successfully.") | |
| else: | |
| logging.error(f"❌ ERROR: Failed to download audio {audio_url}") | |
| if not image_files: | |
| logging.error("❌ ERROR: No valid images found") | |
| return jsonify({ | |
| 'error': 'No images available for video generation', | |
| 'log_url': upload_log() | |
| }), 500 | |
| # Create the video | |
| video_output_path = f"/tmp/{uuid.uuid4().hex}.mp4" | |
| logging.info("Starting create_video...") | |
| video_file = create_video(image_files, audio_files, output_path=video_output_path) | |
| if not video_file: | |
| logging.error("❌ ERROR: Video generation failed") | |
| return jsonify({ | |
| 'error': 'Video generation failed', | |
| 'log_url': upload_log() | |
| }), 500 | |
| logging.info(f"✅ Video generated successfully: {video_file}") | |
| # Upload the video to Firebase Storage | |
| logging.info("Uploading video to Firebase Storage...") | |
| video_blob_name = f"stories/{uid}/{uuid.uuid4().hex}.mp4" | |
| video_url = upload_to_storage(video_file, video_blob_name) | |
| logging.info(f"✅ Video uploaded to {video_url}") | |
| # Update the story record with the video URL | |
| story_ref.update({"video_url": video_url}) | |
| # Deduct 5 credits | |
| new_credits = max(0, current_credits - 5) | |
| user_ref.update({"credits": new_credits}) | |
| logging.info(f"✅ Deducted 5 credits. New credit balance: {new_credits}") | |
| return jsonify({ | |
| "video_url": video_url, | |
| "new_credits": new_credits, | |
| "log_url": upload_log() # Upload the final log so you can inspect it | |
| }) | |
| except Exception as e: | |
| trace = traceback.format_exc() | |
| logging.error(f"❌ EXCEPTION: {str(e)}\n{trace}") | |
| return jsonify({ | |
| 'error': str(e), | |
| 'log_url': upload_log() | |
| }), 500 | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # --- Styled Video Generation Endpoint --- | |
| # Uncomment when deploying in Flask app | |
| def generate_styled_video_endpoint(): # Use this if testing standalone | |
| """ | |
| Generates a video based on story data from Firebase, applying client-specified options. | |
| """ | |
| # --- Temporary file list for cleanup --- | |
| temp_files_to_clean = [] | |
| video_output_path = None # Define here for broader scope in finally block | |
| try: | |
| logging.info("➡️ Received video generation request...") | |
| # --- Authentication & Authorization --- | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| logging.error("❌ Auth Error: Missing or invalid token format") | |
| return jsonify({'error': 'Missing or invalid token', 'log_url': upload_log()}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) # Assumes verify_token returns UID or None | |
| if not uid: | |
| logging.error("❌ Auth Error: Invalid or expired token") | |
| return jsonify({'error': 'Invalid or expired token', 'log_url': upload_log()}), 401 | |
| logging.info(f"Authenticated user: {uid}") | |
| # --- Check User Credits --- | |
| user_ref = db.reference(f"users/{uid}") # Assumes this DB path structure | |
| user_data = user_ref.get() | |
| if not user_data: user_data = {} # Handle case where user node might not exist yet | |
| current_credits = user_data.get("credits", 0) | |
| video_cost = 10 # Define video cost | |
| if current_credits < video_cost: | |
| logging.warning(f"Insufficient credits for user {uid} (has {current_credits}, needs {video_cost})") | |
| return jsonify({'error': f'Insufficient credits. You need at least {video_cost} credits.', 'log_url': upload_log()}), 403 | |
| # --- Get Request Data --- | |
| data = request.get_json() | |
| if not data: | |
| logging.error("❌ Request Error: Invalid or missing JSON payload") | |
| return jsonify({'error': 'Invalid JSON payload', 'log_url': upload_log()}), 400 | |
| story_id = data.get('story_id') | |
| if not story_id: | |
| logging.error("❌ Request Error: story_id is required") | |
| return jsonify({'error': 'story_id is required', 'log_url': upload_log()}), 400 | |
| # Get video customization options from the request (or use empty dict for defaults) | |
| video_options = data.get('video_options', {}) | |
| logging.info(f"Received video options: {video_options}") | |
| # --- Fetch Story Data --- | |
| logging.info(f"Fetching story '{story_id}' for user '{uid}' from Firebase...") | |
| # Consider structuring story data under UID: db.reference(f"stories/{uid}/{story_id}") | |
| # Using the path from your original code for now: | |
| story_ref = db.reference(f"stories/{story_id}") | |
| story_data = story_ref.get() | |
| if not story_data: | |
| logging.error(f"❌ Firebase Error: Story '{story_id}' not found.") | |
| return jsonify({'error': 'Story not found', 'log_url': upload_log()}), 404 | |
| sections = story_data.get("sections", []) | |
| if not sections or not isinstance(sections, list): | |
| logging.error(f"❌ Data Error: No valid 'sections' array found in story '{story_id}'.") | |
| return jsonify({'error': 'No sections found in the story', 'log_url': upload_log()}), 404 | |
| # --- Download Assets and Prepare Data --- | |
| image_pil_list = [] # Stores downloaded PIL Images | |
| audio_file_paths = [] # Stores paths to downloaded audio files | |
| section_texts_list = [] # Stores text for each section | |
| valid_section_indices = [] # Keep track of sections processed successfully | |
| logging.info(f"Processing {len(sections)} sections for assets...") | |
| download_errors = False | |
| for i, section in enumerate(sections): | |
| if not isinstance(section, dict): | |
| logging.warning(f"Skipping section {i+1}, expected a dictionary, got {type(section)}") | |
| continue # Skip malformed section | |
| image_url = section.get("image_url") | |
| audio_url = section.get("audio_url") | |
| section_text = section.get("section_text") # Get the text | |
| logging.info(f"--- Processing Section {i+1}/{len(sections)} ---") | |
| logging.info(f" Image URL: {image_url}") | |
| logging.info(f" Audio URL: {audio_url}") | |
| logging.info(f" Text: {str(section_text)[:50] if section_text else 'None'}") | |
| # Download Image (Required for a section to be valid) | |
| img_object = None | |
| if image_url: | |
| try: | |
| img_resp = requests.get(image_url, timeout=30) | |
| img_resp.raise_for_status() | |
| img_object = Image.open(io.BytesIO(img_resp.content)) | |
| # Convert to RGB early to prevent potential palette issues later | |
| img_object = img_object.convert("RGB") | |
| logging.info(" ✅ Image downloaded and opened.") | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f" ❌ ERROR downloading image {image_url}: {e}") | |
| download_errors = True | |
| except UnidentifiedImageError: # Catch PIL errors | |
| logging.error(f" ❌ ERROR: Cannot identify image file from {image_url}. Invalid format or corrupt?") | |
| download_errors = True | |
| except Exception as e: | |
| logging.error(f" ❌ ERROR processing image {image_url}: {e}") | |
| download_errors = True | |
| else: | |
| logging.warning(f" ⚠️ No image_url for section {i+1}. Skipping section.") | |
| # Don't add placeholders if image fails, just skip the index | |
| # If image succeeded, process audio and text for this section index | |
| if img_object: | |
| image_pil_list.append(img_object) | |
| section_texts_list.append(section_text) # Add text (can be None) | |
| valid_section_indices.append(i) # Mark this index as valid | |
| # Download Audio (Optional, will use silence if fails) | |
| audio_path = None | |
| if audio_url: | |
| try: | |
| aud_resp = requests.get(audio_url, timeout=60) | |
| aud_resp.raise_for_status() | |
| # Use a descriptive temp file name in system's temp dir | |
| temp_dir = tempfile.gettempdir() | |
| aud_filename = f"story_{story_id}_sec_{i}_audio_{uuid.uuid4().hex}.mp3" # Assume mp3, adjust if needed | |
| audio_path = os.path.join(temp_dir, aud_filename) | |
| with open(audio_path, "wb") as f: | |
| f.write(aud_resp.content) | |
| temp_files_to_clean.append(audio_path) # Add to cleanup list | |
| logging.info(f" ✅ Audio downloaded to {audio_path}") | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f" ❌ ERROR downloading audio {audio_url}: {e}. Will use silence.") | |
| # download_errors = True # Don't mark as overall error if only audio fails | |
| audio_path = None # Ensure path is None on failure | |
| except Exception as e: | |
| logging.error(f" ❌ ERROR saving audio {audio_url}: {e}. Will use silence.") | |
| # download_errors = True | |
| audio_path = None | |
| else: | |
| logging.info(" No audio_url for this section. Will use silence.") | |
| audio_file_paths.append(audio_path) # Add path or None | |
| else: # Image failed, so skip adding audio/text for this section index | |
| logging.warning(f"Skipping audio/text for section {i+1} due to image failure.") | |
| # Check if any valid sections remain | |
| if not image_pil_list: | |
| logging.error("❌ ERROR: No valid images could be downloaded or processed for any section.") | |
| # upload_log() # Upload log before returning | |
| return jsonify({'error': 'No images available for video generation', 'log_url': upload_log()}), 500 | |
| logging.info(f"Successfully processed {len(image_pil_list)} sections with images.") | |
| if download_errors: | |
| logging.warning("⚠️ Some assets encountered download/processing errors.") | |
| # --- Handle Custom Logo/Watermark Download --- | |
| custom_logo_path = None # Path to downloaded custom logo | |
| watermark_opts_from_client = video_options.get("watermark_options", {}) | |
| watermark_final_config = watermark_opts_from_client.copy() # Start with client options | |
| watermark_final_config["enabled"] = False # Default to disabled unless successfully set up | |
| custom_logo_url = watermark_opts_from_client.get("custom_logo_url") | |
| if watermark_opts_from_client.get("enabled") and custom_logo_url: | |
| logging.info(f"➡️ Downloading custom logo/watermark: {custom_logo_url}") | |
| try: | |
| logo_resp = requests.get(custom_logo_url, timeout=20) | |
| logo_resp.raise_for_status() | |
| # Save custom logo to a temp file, try to get extension | |
| file_ext = os.path.splitext(urllib.parse.urlparse(custom_logo_url).path)[1] or '.png' # Default to png | |
| custom_logo_filename = f"custom_logo_{uid}_{uuid.uuid4().hex}{file_ext}" | |
| custom_logo_path = os.path.join(tempfile.gettempdir(), custom_logo_filename) | |
| with open(custom_logo_path, "wb") as f: | |
| f.write(logo_resp.content) | |
| # Verify it's a valid image | |
| try: | |
| Image.open(custom_logo_path).verify() # Quick check | |
| watermark_final_config["path"] = custom_logo_path # Update config with temp path | |
| watermark_final_config["enabled"] = True # Enable watermark | |
| temp_files_to_clean.append(custom_logo_path) | |
| logging.info(f"✅ Custom logo downloaded and verified: {custom_logo_path}") | |
| except Exception as img_err: | |
| logging.error(f"❌ Custom logo file from {custom_logo_url} is not a valid image: {img_err}. Disabling watermark.") | |
| # Clean up invalid downloaded file immediately | |
| if os.path.exists(custom_logo_path): os.remove(custom_logo_path) | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"❌ ERROR downloading custom logo {custom_logo_url}: {e}. Watermark disabled.") | |
| except Exception as e: | |
| logging.error(f"❌ ERROR processing custom logo {custom_logo_url}: {e}. Watermark disabled.") | |
| elif watermark_opts_from_client.get("enabled"): | |
| logging.warning("⚠️ Watermark enabled in options, but no 'custom_logo_url' provided. Watermark disabled.") | |
| # --- Prepare Final Config for create_video --- | |
| final_video_config = { | |
| "width": video_options.get("width", DEFAULT_WIDTH), | |
| "height": video_options.get("height", DEFAULT_HEIGHT), | |
| "fps": video_options.get("fps", DEFAULT_FPS), | |
| "transition": video_options.get("transition", "fade"), | |
| "transition_duration": video_options.get("transition_duration", DEFAULT_TRANSITION_DURATION), | |
| "font_path": video_options.get("font_path", DEFAULT_FONT), # Allow overriding default font | |
| "subtitle_options": video_options.get("subtitle_options", {"enabled": True}), # Default enabled | |
| "particle_options": video_options.get("particle_options", {"enabled": False}), # Default disabled | |
| "watermark_options": watermark_final_config, # Use the processed watermark config | |
| # Use default Sozo logo unless custom one provided? (Simplifying: only supporting default end logo for now) | |
| "end_logo_options": { | |
| "enabled": video_options.get("use_end_logo", True), # Control if end logo is used | |
| "path": video_options.get("end_logo_path", DEFAULT_LOGO_PATH), # Allow overriding default logo path via options | |
| "duration": video_options.get("end_logo_duration", 3.0) | |
| }, | |
| } | |
| # Ensure particle types list matches the number of *final* valid sections | |
| particle_opts_config = final_video_config["particle_options"] | |
| if particle_opts_config.get("enabled"): | |
| particle_types_list_orig = particle_opts_config.get("types_per_section", []) | |
| if isinstance(particle_types_list_orig, list): | |
| # Filter the original particle list based on the indices of sections that were successfully processed | |
| filtered_particle_types = [particle_types_list_orig[i] for i in valid_section_indices if i < len(particle_types_list_orig)] | |
| # Pad with None if the original list was too short | |
| if len(filtered_particle_types) < len(image_pil_list): | |
| filtered_particle_types.extend([None] * (len(image_pil_list) - len(filtered_particle_types))) | |
| particle_opts_config["types_per_section"] = filtered_particle_types | |
| logging.info(f"Aligned particle types for {len(image_pil_list)} sections: {particle_opts_config['types_per_section']}") | |
| else: | |
| logging.warning("particle_options.types_per_section was not a list. Disabling particles.") | |
| particle_opts_config["enabled"] = False | |
| # --- Create the Video --- | |
| # Define output path in temp directory | |
| video_output_filename = f"final_video_{uid}_{story_id}_{uuid.uuid4().hex}.mp4" | |
| video_output_path = os.path.join(tempfile.gettempdir(), video_output_filename) | |
| # Don't add to cleanup list immediately - only if creation fails or after successful upload | |
| logging.info("🚀 Starting video creation with MoviePy...") | |
| logging.info(f"Video Config Passed to create_video: {final_video_config}") | |
| logging.info(f"Output Path: {video_output_path}") | |
| logging.info(f"Number of image inputs: {len(image_pil_list)}") | |
| logging.info(f"Number of audio inputs: {len(audio_file_paths)}") | |
| logging.info(f"Number of text inputs: {len(section_texts_list)}") | |
| # Call the MoviePy function from video_gen.py | |
| generated_video_path = create_styled_video( | |
| images=image_pil_list, # List of PIL images | |
| audio_files=audio_file_paths, # List of paths (or None) | |
| section_texts=section_texts_list,# List of strings (or None) | |
| output_path=video_output_path, | |
| config=final_video_config # The dictionary of options | |
| ) | |
| # --- Handle Video Creation Result --- | |
| if not generated_video_path or not os.path.exists(generated_video_path): | |
| logging.error("❌ ERROR: Video generation failed (create_video returned None or file missing).") | |
| # Add the intended output path to cleanup just in case a partial file exists | |
| temp_files_to_clean.append(video_output_path) | |
| # upload_log() # Upload log before returning | |
| return jsonify({'error': 'Video generation failed', 'log_url': upload_log()}), 500 | |
| logging.info(f"✅ Video generated successfully: {generated_video_path}") | |
| # Add the successfully generated video path for cleanup after upload | |
| temp_files_to_clean.append(generated_video_path) | |
| # --- Upload Video to Firebase Storage --- | |
| logging.info(f"☁️ Uploading video '{os.path.basename(generated_video_path)}' to Firebase Storage...") | |
| # Make blob name more descriptive | |
| video_blob_name = f"stories/{uid}/{story_id}/video_{uuid.uuid4().hex}.mp4" | |
| try: | |
| # Assuming upload_to_storage handles the upload and returns public URL | |
| video_url = upload_to_storage(generated_video_path, video_blob_name) | |
| if not video_url: | |
| raise Exception("Upload function returned no URL") | |
| logging.info(f"✅ Video uploaded successfully to: {video_url}") | |
| except Exception as upload_err: | |
| logging.error(f"❌ Firebase Storage Error: Failed to upload video: {upload_err}") | |
| # upload_log() # Upload log before returning | |
| # Don't deduct credits if upload fails | |
| return jsonify({'error': 'Video generated but failed to upload to storage.', 'log_url': upload_log()}), 500 | |
| # --- Update Firebase Realtime Database --- | |
| try: | |
| story_ref.update({"video_url": video_url, "last_generated": time.time()}) # Add timestamp | |
| logging.info(f"✅ Updated story '{story_id}' record with video URL.") | |
| except Exception as db_err: | |
| logging.error(f"❌ Firebase DB Error: Failed to update story record: {db_err}") | |
| # Decide if this is critical. Maybe log and continue, but don't deduct credits? | |
| # For now, let's return an error as the client won't see the video URL in the story record. | |
| # upload_log() | |
| return jsonify({'error': 'Video generated and uploaded, but failed to update story record.', 'log_url': upload_log()}), 500 | |
| # --- Deduct Credits (Only after successful generation, upload, and DB update) --- | |
| try: | |
| new_credits = max(0, current_credits - video_cost) | |
| user_ref.update({"credits": new_credits}) | |
| logging.info(f"✅ Deducted {video_cost} credits for user {uid}. New balance: {new_credits}") | |
| except Exception as credit_err: | |
| logging.error(f"❌ Firebase DB Error: Failed to update user credits: {credit_err}") | |
| # This is less critical, log it but still return success to user | |
| # upload_log() # Upload log | |
| # --- Success Response --- | |
| # final_log_url = upload_log() | |
| return jsonify({ | |
| "message": "Video generated and uploaded successfully!", | |
| "video_url": video_url, | |
| "new_credits": new_credits, | |
| # "log_url": final_log_url | |
| "log_url": "Log upload function placeholder" # Replace with actual call if needed | |
| }), 200 # Use 200 OK for success | |
| except Exception as e: | |
| # --- Generic Error Handler --- | |
| trace = traceback.format_exc() | |
| logging.error(f"❌ UNHANDLED EXCEPTION in generate_video_endpoint: {str(e)}\n{trace}") | |
| # log_url = upload_log() # Upload log before returning | |
| return jsonify({ | |
| 'error': f"An unexpected error occurred: {str(e)}", | |
| # 'log_url': log_url | |
| 'log_url': "Log upload function placeholder" | |
| }), 500 | |
| finally: | |
| # --- Cleanup Temporary Files --- | |
| logging.info(f"🧹 Cleaning up {len(temp_files_to_clean)} temporary files...") | |
| cleaned_count = 0 | |
| failed_count = 0 | |
| for file_path in temp_files_to_clean: | |
| if file_path and os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| # logging.debug(f" - Removed: {file_path}") | |
| cleaned_count += 1 | |
| except Exception as e: | |
| logging.error(f" - Failed to remove temp file {file_path}: {e}") | |
| failed_count += 1 | |
| #else: | |
| # logging.debug(f" - Skipping non-existent path: {file_path}") | |
| logging.info(f"✅ Cleanup complete. Removed {cleaned_count} files, failed to remove {failed_count}.") | |
| #----------Image Editing Endpoint ---------- | |
| def edit_section_image_endpoint(story_id, section_idx): | |
| try: | |
| # 1) Auth | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # 2) Fetch story and verify ownership | |
| story_ref = db.reference(f"stories/{story_id}") | |
| story_data = story_ref.get() | |
| if not story_data: | |
| return jsonify({'error': 'Story not found'}), 404 | |
| if story_data.get('uid') != uid: | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| sections = story_data.get("sections", []) | |
| if section_idx < 0 or section_idx >= len(sections): | |
| return jsonify({'error': 'Invalid section index'}), 400 | |
| # 3) Check user credits | |
| user_ref = db.reference(f"users/{uid}") | |
| user_data = user_ref.get() or {} | |
| current_credits = user_data.get("credits", 0) | |
| if current_credits < 2: | |
| return jsonify({'error': 'Not enough credits to edit image. Need 2 credits.'}), 403 | |
| # 4) Read gemini_prompt | |
| data = request.get_json() | |
| gemini_prompt = data.get('gemini_prompt') | |
| if not gemini_prompt: | |
| return jsonify({'error': 'gemini_prompt is required'}), 400 | |
| # 5) Edit the image | |
| from image_gen import edit_section_image | |
| old_image_url = sections[section_idx].get("image_url") | |
| if not old_image_url: | |
| return jsonify({'error': 'No existing image in this section'}), 400 | |
| edited_image_obj = edit_section_image(old_image_url, gemini_prompt) | |
| if not edited_image_obj: | |
| return jsonify({'error': 'Failed to edit image'}), 500 | |
| # 6) Upload new image | |
| new_filename = f"/tmp/{uuid.uuid4().hex}.jpg" | |
| edited_image_obj.save(new_filename, format="JPEG") | |
| new_blob_name = f"stories/{uid}/{uuid.uuid4().hex}.jpg" | |
| new_image_url = upload_to_storage(new_filename, new_blob_name) | |
| os.remove(new_filename) | |
| # 7) Update the story record | |
| sections[section_idx]["image_url"] = new_image_url | |
| story_ref.update({"sections": sections}) | |
| # 8) Subtract 2 credits | |
| new_credits = current_credits - 2 | |
| user_ref.update({"credits": new_credits}) | |
| return jsonify({ | |
| 'success': True, | |
| 'new_image_url': new_image_url, | |
| 'new_credits': new_credits | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| #----------Text & Audio Editing Endpoint ---------- | |
| def edit_section_text_endpoint(story_id, section_idx): | |
| try: | |
| # 1) Auth | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # 2) Fetch story and verify ownership | |
| story_ref = db.reference(f"stories/{story_id}") | |
| story_data = story_ref.get() | |
| if not story_data: | |
| return jsonify({'error': 'Story not found'}), 404 | |
| if story_data.get('uid') != uid: | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| sections = story_data.get("sections", []) | |
| if section_idx < 0 or section_idx >= len(sections): | |
| return jsonify({'error': 'Invalid section index'}), 400 | |
| # 3) Check user credits | |
| user_ref = db.reference(f"users/{uid}") | |
| user_data = user_ref.get() or {} | |
| current_credits = user_data.get("credits", 0) | |
| if current_credits < 2: | |
| return jsonify({'error': 'Not enough credits to edit text. Need 2 credits.'}), 403 | |
| # 4) Read new_text | |
| data = request.get_json() | |
| new_text = data.get('new_text') | |
| if not new_text: | |
| return jsonify({'error': 'new_text is required'}), 400 | |
| voice_model = data.get('voice_model', 'aura-asteria-en') | |
| audio_model = data.get('audio_model', 'deepgram') | |
| from audio_gen import edit_section_text | |
| old_section_text = sections[section_idx].get("section_text", "") | |
| # 5) Edit text -> generate new audio | |
| updated_text, new_audio_path = edit_section_text( | |
| old_section_text, new_text, | |
| voice_model=voice_model, | |
| audio_model=audio_model | |
| ) | |
| if not updated_text or not new_audio_path: | |
| return jsonify({'error': 'Failed to edit text/audio'}), 500 | |
| # 6) Upload new audio | |
| new_blob_name = f"stories/{uid}/{uuid.uuid4().hex}.mp3" | |
| new_audio_url = upload_to_storage(new_audio_path, new_blob_name) | |
| os.remove(new_audio_path) | |
| # 7) Update the story record | |
| sections[section_idx]["section_text"] = updated_text | |
| sections[section_idx]["audio_url"] = new_audio_url | |
| story_ref.update({"sections": sections}) | |
| # 8) Subtract 2 credits | |
| new_credits = current_credits - 2 | |
| user_ref.update({"credits": new_credits}) | |
| return jsonify({ | |
| 'success': True, | |
| 'updated_text': updated_text, | |
| 'new_audio_url': new_audio_url, | |
| 'new_credits': new_credits | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| #----------View projects and videos Endpoints ---------- | |
| def view_projects(): | |
| try: | |
| # --- Authentication --- | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # --- Query Stories for the Authenticated User --- | |
| stories_ref = db.reference('stories') | |
| all_stories = stories_ref.get() or {} | |
| user_stories = {} | |
| for story_id, story_record in all_stories.items(): | |
| # Only return projects that belong to the current user | |
| if story_record.get('uid') == uid: | |
| user_stories[story_id] = { | |
| "story_id": story_id, | |
| "full_story": story_record.get("full_story", ""), | |
| "sections": story_record.get("sections", []), | |
| "generation_times": story_record.get("generation_times", {}), | |
| "created_at": story_record.get("created_at", ""), | |
| "input_type": story_record.get("input_type", ""), | |
| "input_params": story_record.get("input_params", {}), | |
| "story_type": story_record.get("story_type", ""), | |
| "video_url": story_record.get("video_url", "") # Include video URL if present | |
| } | |
| # Optionally, sort the projects by creation date (newest first) | |
| sorted_stories = dict( | |
| sorted(user_stories.items(), key=lambda item: item[1]["created_at"], reverse=True) | |
| ) | |
| return jsonify({"projects": sorted_stories}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # view videos endpoint | |
| def view_videos(): | |
| """ | |
| Returns only the stories that have a 'video_url' field, | |
| meaning the video has been generated. | |
| """ | |
| try: | |
| # --- Authentication --- | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # --- Query Stories for the Authenticated User --- | |
| stories_ref = db.reference('stories') | |
| all_stories = stories_ref.get() or {} | |
| user_videos = {} | |
| for story_id, story_record in all_stories.items(): | |
| # Must belong to current user and have a video_url | |
| if story_record.get('uid') == uid and story_record.get('video_url'): | |
| user_videos[story_id] = { | |
| "story_id": story_id, | |
| "full_story": story_record.get("full_story", ""), | |
| "sections": story_record.get("sections", []), | |
| "video_url": story_record.get("video_url", ""), | |
| "created_at": story_record.get("created_at", "") | |
| } | |
| # Sort by creation date (newest first), if needed | |
| sorted_videos = dict( | |
| sorted(user_videos.items(), key=lambda item: item[1]["created_at"], reverse=True) | |
| ) | |
| return jsonify({"videos": sorted_videos}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| #download archives endpoint | |
| # Configure lo)s") | |
| # Ensure log file exists or create it | |
| LOG_FILE_PATH = "story_download.log" | |
| if not os.path.exists(LOG_FILE_PATH): | |
| with open(LOG_FILE_PATH, 'w'): | |
| pass # create empty file | |
| logging.basicConfig(filename=LOG_FILE_PATH, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| def download_story_archive(story_id): | |
| try: | |
| logging.info(f"🔹 [START] Processing story {story_id}") | |
| # --- Authentication --- | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| logging.warning("❌ Missing or invalid token") | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| logging.warning("❌ Invalid or expired token") | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # --- Fetch the Story --- | |
| stories_ref = db.reference('stories') | |
| story_record = stories_ref.child(story_id).get() | |
| if not story_record: | |
| logging.error(f"❌ Story {story_id} not found") | |
| return jsonify({'error': 'Story not found'}), 404 | |
| if story_record.get('uid') != uid: | |
| logging.warning(f"❌ Unauthorized access attempt for story {story_id}") | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| full_text = story_record.get("full_story", "") | |
| sections = story_record.get("sections", []) | |
| split_sentences = full_text.split('.', 1) | |
| title = split_sentences[0].strip() if split_sentences and split_sentences[0].strip() else "Untitled" | |
| logging.info(f"📌 Story title: {title}") | |
| # Create ZIP buffer | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: | |
| # =========== 1) Generate PDF ============ | |
| logging.info(f"📄 Generating PDF for {story_id}") | |
| pdf = FPDF() | |
| pdf.set_auto_page_break(auto=True, margin=15) | |
| # --- Add and use DejaVu fonts for Unicode support --- | |
| pdf.add_font("DejaVu", "", "DejaVuSans.ttf", uni=True) | |
| pdf.add_font("DejaVu", "B", "dejavu-sans-bold.ttf", uni=True) | |
| pdf.add_page() | |
| # Use our newly added DejaVu font | |
| pdf.set_font("DejaVu", size=12) | |
| # Keep text within a safe width (190) so it doesn't go off-screen | |
| max_width = 190 | |
| pdf.multi_cell(max_width, 10, f"Story ID: {story_id}") | |
| pdf.multi_cell(max_width, 10, f"Title: {title}") | |
| pdf.ln(10) # spacing before sections | |
| bucket = storage.bucket() | |
| # Add sections in PDF | |
| for idx, section_obj in enumerate(sections): | |
| section_text = section_obj.get("section_text", "") | |
| image_url = section_obj.get("image_url", "") | |
| pdf.add_page() | |
| # Use bold font for section headers | |
| pdf.set_font("DejaVu", "B", 14) | |
| pdf.multi_cell(max_width, 10, f"Section {idx + 1}") | |
| pdf.ln(5) | |
| pdf.set_font("DejaVu", size=12) | |
| pdf.multi_cell(max_width, 10, section_text) | |
| pdf.ln(10) | |
| if image_url: | |
| try: | |
| logging.info(f"📷 Downloading image for section {idx + 1}: {image_url}") | |
| file_path = extract_firebase_path(image_url) | |
| if not file_path: | |
| logging.error(f"❌ Could not parse image URL => {image_url}") | |
| pdf.multi_cell(max_width, 10, "[Image URL invalid]") | |
| continue | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: | |
| temp_img_path = temp_file.name | |
| blob = bucket.blob(file_path) | |
| blob.download_to_filename(temp_img_path) | |
| # Insert the image, using full page width minus margins | |
| pdf.image(temp_img_path, x=10, w=pdf.w - 20, h=100) | |
| os.unlink(temp_img_path) | |
| logging.info(f"✅ Image embedded in PDF for section {idx + 1}") | |
| except Exception as img_error: | |
| logging.error(f"❌ Error embedding image for section {idx + 1}: {str(img_error)}") | |
| pdf.multi_cell(max_width, 10, "[Image could not be included]") | |
| # Save PDF to memory | |
| pdf_buffer = io.BytesIO() | |
| pdf.output(pdf_buffer) | |
| pdf_buffer.seek(0) | |
| zipf.writestr(f"{story_id}.pdf", pdf_buffer.read()) | |
| logging.info(f"✅ PDF added to ZIP for {story_id}") | |
| # =========== 2) Generate TXT ============ | |
| logging.info(f"📄 Generating TXT for {story_id}") | |
| txt_content = f"Story ID: {story_id}\nTitle: {title}\n\nFull Story:\n\n{full_text}\n\n" | |
| for idx, section_obj in enumerate(sections): | |
| section_text = section_obj.get("section_text", "") | |
| txt_content += f"\n\nSection {idx + 1}:\n{section_text}" | |
| zipf.writestr(f"{story_id}.txt", txt_content) | |
| logging.info(f"✅ TXT added to ZIP for {story_id}") | |
| # =========== 3) Images to ZIP ============ | |
| for idx, section_obj in enumerate(sections): | |
| image_url = section_obj.get("image_url", "") | |
| if image_url: | |
| try: | |
| logging.info(f"📷 Downloading image for ZIP (section {idx + 1}): {image_url}") | |
| file_path = extract_firebase_path(image_url) | |
| if not file_path: | |
| logging.error(f"❌ Could not parse image URL => {image_url}") | |
| continue | |
| blob = bucket.blob(file_path) | |
| image_data = blob.download_as_bytes() | |
| zipf.writestr(f"image_section_{idx + 1}.jpg", image_data) | |
| logging.info(f"✅ Image added to ZIP for section {idx + 1}") | |
| except Exception as img_error: | |
| logging.error(f"❌ Error downloading image for section {idx + 1}: {str(img_error)}") | |
| # =========== 4) Audio to ZIP ============ | |
| for idx, section_obj in enumerate(sections): | |
| audio_url = section_obj.get("audio_url", "") | |
| if audio_url: | |
| try: | |
| logging.info(f"🔊 Downloading audio for ZIP (section {idx + 1}): {audio_url}") | |
| file_path = extract_firebase_path(audio_url) | |
| if not file_path: | |
| logging.error(f"❌ Could not parse audio URL => {audio_url}") | |
| continue | |
| blob = bucket.blob(file_path) | |
| audio_data = blob.download_as_bytes() | |
| zipf.writestr(f"audio_section_{idx + 1}.mp3", audio_data) | |
| logging.info(f"✅ Audio added to ZIP for section {idx + 1}") | |
| except Exception as audio_error: | |
| logging.error(f"❌ Error downloading audio for section {idx + 1}: {str(audio_error)}") | |
| # Upload log to Firebase | |
| log_url = upload_log() | |
| if log_url: | |
| logging.info(f"📤 Log uploaded: {log_url}") | |
| else: | |
| logging.error("❌ Failed to upload log") | |
| # Serve ZIP File | |
| zip_buffer.seek(0) | |
| return send_file( | |
| zip_buffer, | |
| mimetype='application/zip', | |
| as_attachment=True, | |
| download_name=f"story_{story_id}.zip" | |
| ) | |
| except Exception as e: | |
| logging.error(f"❌ ERROR: {str(e)}") | |
| traceback.print_exc() | |
| return jsonify({'error': str(e)}), 500 | |
| def extract_firebase_path(public_url: str) -> str: | |
| """ | |
| Attempt to parse a Firebase/Google Storage URL, returning just the | |
| bucket file path (e.g. "stories/ABC123/file.jpg") or "" if it fails. | |
| """ | |
| # 1) If it has "/o/" => parse the old way | |
| if "/o/" in public_url and "?" in public_url: | |
| splitted_o = public_url.split('/o/', 1) | |
| splitted_q = splitted_o[1].split('?', 1) | |
| return urllib.parse.unquote(splitted_q[0]) | |
| # 2) Otherwise, handle domain-based links like: | |
| # https://storage.googleapis.com/sozo-daac1.firebasestorage.app/stories/.../file.jpg | |
| # We'll split on domain then remove leading slash | |
| try: | |
| # "https://storage.googleapis.com/sozo-daac1.firebasestorage.app/stories/ABC/file.jpg" | |
| splitted = public_url.split('/', 3) # scheme, '', domain, path | |
| if len(splitted) < 4: | |
| return "" | |
| # splitted[3] = "sozo-daac1.firebasestorage.app/stories/ABC/file.jpg" | |
| # We might need to remove "sozo-daac1.firebasestorage.app/" if present | |
| sub_splitted = splitted[3].split('/', 1) # ["sozo-daac1.firebasestorage.app", "stories/ABC/file.jpg"] | |
| if len(sub_splitted) < 2: | |
| return "" | |
| path_only = sub_splitted[1] # "stories/ABC/file.jpg" | |
| return urllib.parse.unquote(path_only) | |
| except Exception as e: | |
| logging.error(f"❌ extract_firebase_path error: {str(e)}") | |
| return "" | |
| # Delete endpoints | |
| #projects | |
| def delete_project(story_id): | |
| """ | |
| Deletes the entire project (story) from the database (and optionally its assets in storage). | |
| """ | |
| try: | |
| # --- Authentication --- | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # --- Fetch Story --- | |
| story_ref = db.reference(f"stories/{story_id}") | |
| story_data = story_ref.get() | |
| if not story_data: | |
| return jsonify({'error': 'Story not found'}), 404 | |
| if story_data.get('uid') != uid: | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| # --- Delete the story from DB --- | |
| story_ref.delete() | |
| return jsonify({'success': True, 'message': f'Story {story_id} deleted.'}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| #videos | |
| def delete_video(story_id): | |
| """ | |
| Deletes only the video from a project (removes video_url from DB), | |
| optionally deleting the file in storage. | |
| """ | |
| try: | |
| # --- Authentication --- | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Missing or invalid token'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid or expired token'}), 401 | |
| # --- Fetch Story --- | |
| story_ref = db.reference(f"stories/{story_id}") | |
| story_data = story_ref.get() | |
| if not story_data: | |
| return jsonify({'error': 'Story not found'}), 404 | |
| if story_data.get('uid') != uid: | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| video_url = story_data.get('video_url') | |
| if not video_url: | |
| return jsonify({'error': 'No video to delete'}), 400 | |
| # Delete the video file from Firebase Storage | |
| bucket = storage.bucket() | |
| file_path = extract_firebase_path(video_url) | |
| if file_path: | |
| blob = bucket.blob(file_path) | |
| blob.delete() | |
| # --- Remove video_url from DB --- | |
| story_ref.update({'video_url': None}) | |
| return jsonify({'success': True, 'message': f'Video removed from story {story_id}.'}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ---------- Credit Request Endpoints ---------- | |
| def request_credits(): | |
| try: | |
| auth_header = request.headers.get('Authorization', '') | |
| if not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'Authorization header missing or malformed'}), 401 | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| data = request.get_json() | |
| requested_credits = data.get('requested_credits') | |
| if requested_credits is None: | |
| return jsonify({'error': 'requested_credits is required'}), 400 | |
| # Create a credit request entry | |
| credit_request_ref = db.reference('credit_requests').push() | |
| credit_request_ref.set({ | |
| 'user_id': uid, | |
| 'requested_credits': requested_credits, | |
| 'status': 'pending', | |
| 'requested_at': datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({'success': True, 'request_id': credit_request_ref.key}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ---------- Admin Endpoints for Credit Requests ---------- | |
| # admin profile | |
| def get_admin_profile(): | |
| try: | |
| auth_header = request.headers.get('Authorization', '') | |
| print("Received Auth Header (admin):", auth_header) # Debugging | |
| # Use verify_admin to ensure the caller is an admin. | |
| admin_uid = verify_admin(auth_header) | |
| if not admin_uid: | |
| return jsonify({'error': 'Unauthorized: Admin access required'}), 401 | |
| # Get the admin's profile data. | |
| admin_data = db.reference(f'users/{admin_uid}').get() | |
| print("Fetched Admin Data:", admin_data) # Debugging | |
| if not admin_data: | |
| return jsonify({'error': 'Admin user not found'}), 404 | |
| # Fetch all users data from Firebase. | |
| all_users_data = db.reference('users').get() or {} | |
| total_users = len(all_users_data) | |
| # Filter out admin users for normal user stats. | |
| normal_users_data = [user for user in all_users_data.values() if not user.get('is_admin', False)] | |
| total_normal_users = len(normal_users_data) | |
| # Sum credits for all users (or normal users only). | |
| total_current_credits = sum(user.get('credits', 0) for user in all_users_data.values()) | |
| total_normal_current_credits = sum(user.get('credits', 0) for user in normal_users_data) | |
| # Calculate initial credits (assuming normal users start with 3 credits). | |
| total_initial_credits = total_normal_users * 3 | |
| # Credit usage is how many credits have been spent by normal users. | |
| credit_usage = total_initial_credits - total_normal_current_credits | |
| return jsonify({ | |
| 'uid': admin_uid, | |
| 'email': admin_data.get('email'), | |
| 'credits': admin_data.get('credits', 0), | |
| 'is_admin': True, | |
| 'aggregated_stats': { | |
| 'total_users': total_users, | |
| 'total_normal_users': total_normal_users, | |
| 'total_current_credits': total_current_credits, | |
| 'total_normal_current_credits': total_normal_current_credits, | |
| 'total_initial_credits_normal_users': total_initial_credits, | |
| 'credit_usage': credit_usage | |
| } | |
| }) | |
| except Exception as e: | |
| print(f"Error fetching admin profile: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def list_credit_requests(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| requests_ref = db.reference('credit_requests') | |
| credit_requests = requests_ref.get() or {} | |
| # Convert dict to list with id | |
| requests_list = [{'id': req_id, **data} for req_id, data in credit_requests.items()] | |
| return jsonify({'credit_requests': requests_list}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def process_credit_request(request_id): | |
| try: | |
| admin_uid = verify_admin(request.headers.get('Authorization', '')) | |
| req_ref = db.reference(f'credit_requests/{request_id}') | |
| req_data = req_ref.get() | |
| if not req_data: | |
| return jsonify({'error': 'Credit request not found'}), 404 | |
| data = request.get_json() | |
| decision = data.get('decision') | |
| if decision not in ['approved', 'declined']: | |
| return jsonify({'error': 'decision must be "approved" or "declined"'}), 400 | |
| # If approved, add credits to the user | |
| if decision == 'approved': | |
| user_ref = db.reference(f'users/{req_data["user_id"]}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| new_total = user_data.get('credits', 0) + float(req_data.get('requested_credits', 0)) | |
| user_ref.update({'credits': new_total}) | |
| req_ref.update({ | |
| 'status': 'approved', | |
| 'processed_by': admin_uid, | |
| 'processed_at': datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({'success': True, 'new_user_credits': new_total}) | |
| else: | |
| req_ref.update({ | |
| 'status': 'declined', | |
| 'processed_by': admin_uid, | |
| 'processed_at': datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({'success': True, 'message': 'Credit request declined'}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def admin_list_users(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| users_ref = db.reference('users') | |
| all_users = users_ref.get() or {} | |
| # Convert dict to a list of { uid, ...data } | |
| user_list = [] | |
| for uid, user_data in all_users.items(): | |
| user_list.append({ | |
| 'uid': uid, | |
| 'email': user_data.get('email'), | |
| 'credits': user_data.get('credits', 0), | |
| 'is_admin': user_data.get('is_admin', False), | |
| 'created_at': user_data.get('created_at', ''), | |
| 'suspended': user_data.get('suspended', False) | |
| }) | |
| return jsonify({'users': user_list}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def admin_search_users(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| email_query = request.args.get('email', '').lower().strip() | |
| if not email_query: | |
| return jsonify({'error': 'email query param is required'}), 400 | |
| users_ref = db.reference('users') | |
| all_users = users_ref.get() or {} | |
| matched_users = [] | |
| for uid, user_data in all_users.items(): | |
| user_email = user_data.get('email', '').lower() | |
| # If you want partial match, do `if email_query in user_email:` | |
| # If you want exact match, do `if user_email == email_query:` | |
| if email_query in user_email: | |
| matched_users.append({ | |
| 'uid': uid, | |
| 'email': user_data.get('email'), | |
| 'credits': user_data.get('credits', 0), | |
| 'is_admin': user_data.get('is_admin', False), | |
| 'created_at': user_data.get('created_at', ''), | |
| 'suspended': user_data.get('suspended', False) | |
| }) | |
| return jsonify({'matched_users': matched_users}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def admin_suspend_user(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| action = data.get('action') # "suspend" or "unsuspend" | |
| if action not in ["suspend", "unsuspend"]: | |
| return jsonify({'error': 'action must be "suspend" or "unsuspend"'}), 400 | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| if action == "suspend": | |
| user_ref.update({'suspended': True}) | |
| else: | |
| user_ref.update({'suspended': False}) | |
| return jsonify({'success': True, 'message': f'User {uid} is now {action}ed'}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def admin_list_stories(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| stories_ref = db.reference('stories') | |
| all_stories = stories_ref.get() or {} | |
| total_stories = len(all_stories) | |
| # Fetch all users' data to map UIDs to emails | |
| users_ref = db.reference('users') | |
| users_data = users_ref.get() or {} | |
| # If you want to see how many stories each user has, do: | |
| stories_per_user = {} | |
| for sid, sdata in all_stories.items(): | |
| user_id = sdata.get('uid') | |
| if user_id: | |
| user_email = users_data.get(user_id, {}).get('email', 'Unknown') | |
| stories_per_user[user_email] = stories_per_user.get(user_email, 0) + 1 | |
| return jsonify({ | |
| 'total_stories': total_stories, | |
| 'stories_per_user': stories_per_user # Now contains emails instead of UIDs | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def send_notifications(): | |
| """ | |
| Admin sends notifications to one, multiple, or all users. | |
| """ | |
| try: | |
| # 1) Verify admin | |
| admin_uid = verify_admin(request.headers.get('Authorization', '')) | |
| if not admin_uid: | |
| return jsonify({'error': 'Unauthorized: Admin access required'}), 401 | |
| # 2) Parse request data | |
| data = request.get_json() | |
| message = data.get('message') | |
| if not message: | |
| return jsonify({'error': 'message is required'}), 400 | |
| # 'recipients' can be a single user_id, a list of user_ids, or "all" | |
| recipients = data.get('recipients', "all") | |
| # 3) If recipients == "all", get all user IDs | |
| all_users_ref = db.reference('users') | |
| all_users_data = all_users_ref.get() or {} | |
| user_ids_to_notify = [] | |
| if recipients == "all": | |
| user_ids_to_notify = list(all_users_data.keys()) | |
| elif isinstance(recipients, list): | |
| # Filter out invalid user IDs | |
| user_ids_to_notify = [uid for uid in recipients if uid in all_users_data] | |
| elif isinstance(recipients, str): | |
| # Could be a single user_id if not "all" | |
| if recipients in all_users_data: | |
| user_ids_to_notify = [recipients] | |
| else: | |
| return jsonify({'error': 'Invalid single user_id'}), 400 | |
| else: | |
| return jsonify({'error': 'recipients must be "all", a user_id, or a list of user_ids'}), 400 | |
| # 4) Create a "notifications" node for each user | |
| # E.g., notifications/{user_id}/{notification_id} | |
| now_str = datetime.utcnow().isoformat() | |
| for user_id in user_ids_to_notify: | |
| notif_id = str(uuid.uuid4()) | |
| notif_ref = db.reference(f'notifications/{user_id}/{notif_id}') | |
| notif_data = { | |
| "from_admin": admin_uid, | |
| "message": message, | |
| "created_at": now_str, | |
| "read": False | |
| } | |
| notif_ref.set(notif_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f"Notification sent to {len(user_ids_to_notify)} user(s)." | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def admin_view_feedback(): | |
| """ | |
| Allows an admin to view all feedback entries (or optionally filter by type/status). | |
| """ | |
| try: | |
| # 1) Verify admin | |
| admin_uid = verify_admin(request.headers.get('Authorization', '')) | |
| if not admin_uid: | |
| return jsonify({'error': 'Unauthorized: Admin access required'}), 401 | |
| # 2) Optional: parse query params for filtering, e.g. ?type=bug or ?status=open | |
| feedback_type = request.args.get('type') # e.g. "bug", "feature_request", "general" | |
| feedback_status = request.args.get('status') # e.g. "open", "resolved" | |
| # 3) Fetch all feedback | |
| feedback_ref = db.reference('feedback') | |
| all_feedback = feedback_ref.get() or {} | |
| # Convert dict to list | |
| feedback_list = [] | |
| for fb_id, fb_data in all_feedback.items(): | |
| # If we have a filter for type | |
| if feedback_type and fb_data.get('type') != feedback_type: | |
| continue | |
| # If we have a filter for status | |
| if feedback_status and fb_data.get('status') != feedback_status: | |
| continue | |
| feedback_list.append({ | |
| 'feedback_id': fb_id, | |
| 'user_id': fb_data.get('user_id'), | |
| 'user_email': fb_data.get('user_email'), | |
| 'type': fb_data.get('type', 'general'), | |
| 'message': fb_data.get('message', ''), | |
| 'created_at': fb_data.get('created_at'), | |
| 'status': fb_data.get('status', 'open') | |
| }) | |
| return jsonify({'feedback': feedback_list}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ---------- Admin Endpoint to Directly Update Credits ---------- | |
| def admin_update_credits(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| add_credits = data.get('add_credits') | |
| if add_credits is None: | |
| return jsonify({'error': 'add_credits is required'}), 400 | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| new_total = user_data.get('credits', 0) + float(add_credits) | |
| user_ref.update({'credits': new_total}) | |
| return jsonify({'success': True, 'new_total_credits': new_total}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ---------- Main ---------- | |
| if __name__ == '__main__': | |
| # Create dummy admin account if it doesn't exist | |
| #create_dummy_admin() | |
| app.run(debug=True, host="0.0.0.0", port=7860) |