import os import uuid import asyncio import subprocess import shutil import requests import sqlite3 import datetime import json from flask import Flask, render_template, request, jsonify, send_file from werkzeug.utils import secure_filename import edge_tts from groq import Groq import google.generativeai as genai from flask_cors import CORS from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageEnhance app = Flask(__name__) CORS(app) # --- CONFIGURATION --- # FIX: Use /tmp for database to avoid Permission Error OUTPUT_FOLDER = '/tmp/output' UPLOAD_FOLDER = '/tmp/uploads' CHUNKS_FOLDER = '/tmp/chunks' DB_FILE = '/tmp/users.db' for f in [OUTPUT_FOLDER, UPLOAD_FOLDER, CHUNKS_FOLDER]: os.makedirs(f, exist_ok=True) # --- API KEYS --- SERVER_GROQ_KEY = os.environ.get("GROQ_API_KEY") SERVER_GEMINI_KEY = os.environ.get("GEMINI_API_KEY") ADMIN_PASSWORD = os.environ.get("ADMIN_PASS", "admin123") # Default password if not set # --- DATABASE SETUP --- def init_db(): conn = sqlite3.connect(DB_FILE) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS users (access_code TEXT PRIMARY KEY, user_type TEXT, credits INTEGER, daily_usage INTEGER, last_active TEXT)''') # Create Default Codes c.execute("INSERT OR IGNORE INTO users (access_code, user_type, credits, daily_usage, last_active) VALUES (?, ?, ?, ?, ?)", ('11112222', 'free', 100, 0, datetime.date.today().isoformat())) c.execute("INSERT OR IGNORE INTO users (access_code, user_type, credits, daily_usage, last_active) VALUES (?, ?, ?, ?, ?)", ('88889999', 'premium', 250, 0, datetime.date.today().isoformat())) conn.commit() conn.close() # Initialize DB immediately init_db() # --- HELPER: AUTH --- def check_auth(request): code = request.headers.get('X-Access-Code') if not code: return None, "No access code" conn = sqlite3.connect(DB_FILE) c = conn.cursor() c.execute("SELECT user_type, credits, daily_usage, last_active FROM users WHERE access_code=?", (code,)) user = c.fetchone() if not user: conn.close() return None, "Invalid Access Code" user_type, credits, usage, last_date = user today = datetime.date.today().isoformat() if last_date != today: usage = 0 c.execute("UPDATE users SET daily_usage=0, last_active=? WHERE access_code=?", (today, code)) conn.commit() conn.close() return {"code": code, "type": user_type, "credits": credits, "usage": usage}, None def update_usage(code, credit_cost=0, count_increment=0): conn = sqlite3.connect(DB_FILE) c = conn.cursor() c.execute("UPDATE users SET credits = credits - ?, daily_usage = daily_usage + ? WHERE access_code=?", (credit_cost, count_increment, code)) conn.commit() conn.close() # --- ROUTES --- @app.route('/') def home(): return render_template('index.html') @app.route('/api/login', methods=['POST']) def login(): code = request.json.get('code') user, err = check_auth(type('obj', (object,), {'headers': {'X-Access-Code': code}})) if err: return jsonify({"error": err}), 401 return jsonify(user) @app.route('/api/stats', methods=['GET']) def stats(): conn = sqlite3.connect(DB_FILE) c = conn.cursor() today = datetime.date.today().isoformat() c.execute("SELECT COUNT(*) FROM users WHERE last_active=?", (today,)) active = c.fetchone()[0] conn.close() return jsonify({"active_users": active}) # --- ADMIN ROUTES (NEW) --- @app.route('/api/admin/login', methods=['POST']) def admin_login(): if request.json.get('password') == ADMIN_PASSWORD: return jsonify({"status": "ok"}) return jsonify({"error": "Wrong Password"}), 401 @app.route('/api/admin/generate', methods=['POST']) def admin_generate(): if request.headers.get('X-Admin-Pass') != ADMIN_PASSWORD: return jsonify({"error": "Unauthorized"}), 401 type_ = request.json.get('type') # 'free' or 'premium' credits = 100 if type_ == 'free' else 250 new_code = str(uuid.uuid4().int)[:8] # Generate 8 digit code conn = sqlite3.connect(DB_FILE) c = conn.cursor() c.execute("INSERT INTO users (access_code, user_type, credits, daily_usage, last_active) VALUES (?, ?, ?, ?, ?)", (new_code, type_, credits, 0, datetime.date.today().isoformat())) conn.commit() conn.close() return jsonify({"code": new_code, "type": type_, "credits": credits}) @app.route('/api/admin/add_credit', methods=['POST']) def admin_add_credit(): if request.headers.get('X-Admin-Pass') != ADMIN_PASSWORD: return jsonify({"error": "Unauthorized"}), 401 target_code = request.json.get('code') amount = int(request.json.get('amount')) conn = sqlite3.connect(DB_FILE) c = conn.cursor() c.execute("UPDATE users SET credits = credits + ? WHERE access_code=?", (amount, target_code)) if c.rowcount == 0: conn.close(); return jsonify({"error": "User not found"}), 404 conn.commit() conn.close() return jsonify({"status": "success", "added": amount}) # --- TAB FUNCTIONS (UNCHANGED LOGIC) --- # ... (All previous Transcript, Rewrite, TTS, Video, Thumbnail routes remain exactly the same) ... # For brevity, I am keeping the key function structures. Ensure you keep the previous logic for these: @app.route('/api/convert_mp3', methods=['POST']) def convert_mp3(): # ... (Same as before) ... user, err = check_auth(request); if err: return jsonify({"error": err}), 401 file = request.files['file']; filename = secure_filename(file.filename); filepath = os.path.join(UPLOAD_FOLDER, filename); file.save(filepath) audio_path = os.path.join(UPLOAD_FOLDER, f"audio_{uuid.uuid4().hex}.mp3") cmd = ["ffmpeg", "-y", "-i", filepath, "-vn", "-acodec", "libmp3lame", "-ar", "16000", "-ac", "1", "-b:a", "64k", audio_path] try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE); os.remove(filepath); return jsonify({"audio_url": f"/download/{os.path.basename(audio_path)}", "filename": os.path.basename(audio_path)}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/transcribe_mp3', methods=['POST']) def transcribe_mp3(): # ... (Same logic: check auth, check limits, run whisper) ... user, err = check_auth(request); if err: return jsonify({"error": err}), 401 limit = 1 if user['type'] == 'free' else 5 if user['usage'] >= limit: return jsonify({"error": "Daily limit reached."}), 403 if user['credits'] < 10: return jsonify({"error": "Insufficient Credits."}), 403 file = request.files['file']; filepath = os.path.join(UPLOAD_FOLDER, secure_filename(file.filename)); file.save(filepath) user_key = request.form.get('api_key'); api_key = user_key if user_key else SERVER_GROQ_KEY; client = Groq(api_key=api_key) try: if os.path.exists(CHUNKS_FOLDER): shutil.rmtree(CHUNKS_FOLDER) os.makedirs(CHUNKS_FOLDER) subprocess.run(f'ffmpeg -i "{filepath}" -f segment -segment_time 600 -c copy "{CHUNKS_FOLDER}/chunk_%03d.mp3"', shell=True, check=True) full_text = "" for chunk in sorted(os.listdir(CHUNKS_FOLDER)): with open(os.path.join(CHUNKS_FOLDER, chunk), "rb") as f: transcription = client.audio.transcriptions.create(file=(chunk, f.read()), model="whisper-large-v3", response_format="json") full_text += transcription.text + " " cost = 10 if not user_key else 0 update_usage(user['code'], credit_cost=cost, count_increment=1) shutil.rmtree(CHUNKS_FOLDER); os.remove(filepath) return jsonify({"transcript": full_text.strip()}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/rewrite', methods=['POST']) def rewrite(): user, err = check_auth(request); if err: return jsonify({"error": err}), 401 data = request.json; user_key = data.get('api_key'); api_key = user_key if user_key else SERVER_GEMINI_KEY genai.configure(api_key=api_key); model = genai.GenerativeModel("gemini-2.5-flash") try: res = model.generate_content(f"Role: Professional Burmese Movie Recap Narrator. Style: Spoken. Text: {data.get('text')}"); return jsonify({"script": res.text}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/tts', methods=['POST']) def tts(): user, err = check_auth(request); if err: return jsonify({"error": err}), 401 data = request.json; path = os.path.join(OUTPUT_FOLDER, f"voice_{uuid.uuid4().hex}.mp3") try: asyncio.run(edge_tts.Communicate(data['script'], data['voice'], rate=data['speed'], pitch=data['pitch']).save(path)); return jsonify({"audio_url": f"/download/{os.path.basename(path)}"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/process_video', methods=['POST']) def process_video(): user, err = check_auth(request) if err: return jsonify({"error": err}), 401 if user['type'] == 'free': return jsonify({"error": "Feature Locked."}), 403 try: v_file = request.files['video']; a_file = request.files['audio'] v_path = os.path.join(UPLOAD_FOLDER, secure_filename(v_file.filename)); a_path = os.path.join(UPLOAD_FOLDER, secure_filename(a_file.filename)) v_file.save(v_path); a_file.save(a_path); out_path = os.path.join(OUTPUT_FOLDER, f"final_{uuid.uuid4().hex}.mp4") cmd = ["ffmpeg", "-y", "-i", v_path, "-i", a_path, "-map", "0:v", "-map", "1:a", "-c:v", "libx264", "-shortest", out_path] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) os.remove(v_path); os.remove(a_path); update_usage(user['code'], credit_cost=20) return jsonify({"video_url": f"/download/{os.path.basename(out_path)}"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/thumbnail', methods=['POST']) def thumbnail(): user, err = check_auth(request) if err: return jsonify({"error": err}), 401 if user['type'] == 'free': return jsonify({"error": "Feature Locked."}), 403 try: file = request.files['image']; title = request.form.get('title'); img = Image.open(file.stream).convert("RGB") enhancer = ImageEnhance.Contrast(img); img = enhancer.enhance(1.2) draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("/usr/share/fonts/truetype/noto/NotoSansMyanmar-Bold.ttf", 100) except: font = ImageFont.load_default() draw.text((50, img.height - 200), title, font=font, fill="yellow", stroke_width=8, stroke_fill="black") out_path = os.path.join(OUTPUT_FOLDER, f"thumb_{uuid.uuid4().hex}.jpg"); img.save(out_path) return jsonify({"url": f"/download/{os.path.basename(out_path)}"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/srt_translate', methods=['POST']) def srt_translate(): user, err = check_auth(request) if err: return jsonify({"error": err}), 401 if user['type'] == 'free': return jsonify({"error": "Feature Locked."}), 403 return jsonify({"srt_content": "1\n00:00:01,000 --> 00:00:04,000\nDemo Translated Subtitle\n\n"}) @app.route('/download/') def download(filename): return send_file(os.path.join(OUTPUT_FOLDER, filename), as_attachment=True) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)