Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import asyncio | |
| import subprocess | |
| import shutil | |
| import requests | |
| import sqlite3 | |
| import datetime | |
| import json | |
| from flask import Flask, render_template, request, jsonify, send_file | |
| from werkzeug.utils import secure_filename | |
| import edge_tts | |
| from groq import Groq | |
| import google.generativeai as genai | |
| from flask_cors import CORS | |
| from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageEnhance | |
| app = Flask(__name__) | |
| CORS(app) | |
| # --- CONFIGURATION --- | |
| # FIX: Use /tmp for database to avoid Permission Error | |
| OUTPUT_FOLDER = '/tmp/output' | |
| UPLOAD_FOLDER = '/tmp/uploads' | |
| CHUNKS_FOLDER = '/tmp/chunks' | |
| DB_FILE = '/tmp/users.db' | |
| for f in [OUTPUT_FOLDER, UPLOAD_FOLDER, CHUNKS_FOLDER]: | |
| os.makedirs(f, exist_ok=True) | |
| # --- API KEYS --- | |
| SERVER_GROQ_KEY = os.environ.get("GROQ_API_KEY") | |
| SERVER_GEMINI_KEY = os.environ.get("GEMINI_API_KEY") | |
| ADMIN_PASSWORD = os.environ.get("ADMIN_PASS", "admin123") # Default password if not set | |
| # --- DATABASE SETUP --- | |
| def init_db(): | |
| conn = sqlite3.connect(DB_FILE) | |
| c = conn.cursor() | |
| c.execute('''CREATE TABLE IF NOT EXISTS users | |
| (access_code TEXT PRIMARY KEY, user_type TEXT, credits INTEGER, | |
| daily_usage INTEGER, last_active TEXT)''') | |
| # Create Default Codes | |
| c.execute("INSERT OR IGNORE INTO users (access_code, user_type, credits, daily_usage, last_active) VALUES (?, ?, ?, ?, ?)", | |
| ('11112222', 'free', 100, 0, datetime.date.today().isoformat())) | |
| c.execute("INSERT OR IGNORE INTO users (access_code, user_type, credits, daily_usage, last_active) VALUES (?, ?, ?, ?, ?)", | |
| ('88889999', 'premium', 250, 0, datetime.date.today().isoformat())) | |
| conn.commit() | |
| conn.close() | |
| # Initialize DB immediately | |
| init_db() | |
| # --- HELPER: AUTH --- | |
| def check_auth(request): | |
| code = request.headers.get('X-Access-Code') | |
| if not code: return None, "No access code" | |
| conn = sqlite3.connect(DB_FILE) | |
| c = conn.cursor() | |
| c.execute("SELECT user_type, credits, daily_usage, last_active FROM users WHERE access_code=?", (code,)) | |
| user = c.fetchone() | |
| if not user: | |
| conn.close() | |
| return None, "Invalid Access Code" | |
| user_type, credits, usage, last_date = user | |
| today = datetime.date.today().isoformat() | |
| if last_date != today: | |
| usage = 0 | |
| c.execute("UPDATE users SET daily_usage=0, last_active=? WHERE access_code=?", (today, code)) | |
| conn.commit() | |
| conn.close() | |
| return {"code": code, "type": user_type, "credits": credits, "usage": usage}, None | |
| def update_usage(code, credit_cost=0, count_increment=0): | |
| conn = sqlite3.connect(DB_FILE) | |
| c = conn.cursor() | |
| c.execute("UPDATE users SET credits = credits - ?, daily_usage = daily_usage + ? WHERE access_code=?", | |
| (credit_cost, count_increment, code)) | |
| conn.commit() | |
| conn.close() | |
| # --- ROUTES --- | |
| def home(): return render_template('index.html') | |
| def login(): | |
| code = request.json.get('code') | |
| user, err = check_auth(type('obj', (object,), {'headers': {'X-Access-Code': code}})) | |
| if err: return jsonify({"error": err}), 401 | |
| return jsonify(user) | |
| def stats(): | |
| conn = sqlite3.connect(DB_FILE) | |
| c = conn.cursor() | |
| today = datetime.date.today().isoformat() | |
| c.execute("SELECT COUNT(*) FROM users WHERE last_active=?", (today,)) | |
| active = c.fetchone()[0] | |
| conn.close() | |
| return jsonify({"active_users": active}) | |
| # --- ADMIN ROUTES (NEW) --- | |
| def admin_login(): | |
| if request.json.get('password') == ADMIN_PASSWORD: | |
| return jsonify({"status": "ok"}) | |
| return jsonify({"error": "Wrong Password"}), 401 | |
| def admin_generate(): | |
| if request.headers.get('X-Admin-Pass') != ADMIN_PASSWORD: return jsonify({"error": "Unauthorized"}), 401 | |
| type_ = request.json.get('type') # 'free' or 'premium' | |
| credits = 100 if type_ == 'free' else 250 | |
| new_code = str(uuid.uuid4().int)[:8] # Generate 8 digit code | |
| conn = sqlite3.connect(DB_FILE) | |
| c = conn.cursor() | |
| c.execute("INSERT INTO users (access_code, user_type, credits, daily_usage, last_active) VALUES (?, ?, ?, ?, ?)", | |
| (new_code, type_, credits, 0, datetime.date.today().isoformat())) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({"code": new_code, "type": type_, "credits": credits}) | |
| def admin_add_credit(): | |
| if request.headers.get('X-Admin-Pass') != ADMIN_PASSWORD: return jsonify({"error": "Unauthorized"}), 401 | |
| target_code = request.json.get('code') | |
| amount = int(request.json.get('amount')) | |
| conn = sqlite3.connect(DB_FILE) | |
| c = conn.cursor() | |
| c.execute("UPDATE users SET credits = credits + ? WHERE access_code=?", (amount, target_code)) | |
| if c.rowcount == 0: | |
| conn.close(); return jsonify({"error": "User not found"}), 404 | |
| conn.commit() | |
| conn.close() | |
| return jsonify({"status": "success", "added": amount}) | |
| # --- TAB FUNCTIONS (UNCHANGED LOGIC) --- | |
| # ... (All previous Transcript, Rewrite, TTS, Video, Thumbnail routes remain exactly the same) ... | |
| # For brevity, I am keeping the key function structures. Ensure you keep the previous logic for these: | |
| def convert_mp3(): | |
| # ... (Same as before) ... | |
| user, err = check_auth(request); | |
| if err: return jsonify({"error": err}), 401 | |
| file = request.files['file']; filename = secure_filename(file.filename); filepath = os.path.join(UPLOAD_FOLDER, filename); file.save(filepath) | |
| audio_path = os.path.join(UPLOAD_FOLDER, f"audio_{uuid.uuid4().hex}.mp3") | |
| cmd = ["ffmpeg", "-y", "-i", filepath, "-vn", "-acodec", "libmp3lame", "-ar", "16000", "-ac", "1", "-b:a", "64k", audio_path] | |
| try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE); os.remove(filepath); return jsonify({"audio_url": f"/download/{os.path.basename(audio_path)}", "filename": os.path.basename(audio_path)}) | |
| except Exception as e: return jsonify({"error": str(e)}), 500 | |
| def transcribe_mp3(): | |
| # ... (Same logic: check auth, check limits, run whisper) ... | |
| user, err = check_auth(request); | |
| if err: return jsonify({"error": err}), 401 | |
| limit = 1 if user['type'] == 'free' else 5 | |
| if user['usage'] >= limit: return jsonify({"error": "Daily limit reached."}), 403 | |
| if user['credits'] < 10: return jsonify({"error": "Insufficient Credits."}), 403 | |
| file = request.files['file']; filepath = os.path.join(UPLOAD_FOLDER, secure_filename(file.filename)); file.save(filepath) | |
| user_key = request.form.get('api_key'); api_key = user_key if user_key else SERVER_GROQ_KEY; client = Groq(api_key=api_key) | |
| try: | |
| if os.path.exists(CHUNKS_FOLDER): shutil.rmtree(CHUNKS_FOLDER) | |
| os.makedirs(CHUNKS_FOLDER) | |
| subprocess.run(f'ffmpeg -i "{filepath}" -f segment -segment_time 600 -c copy "{CHUNKS_FOLDER}/chunk_%03d.mp3"', shell=True, check=True) | |
| full_text = "" | |
| for chunk in sorted(os.listdir(CHUNKS_FOLDER)): | |
| with open(os.path.join(CHUNKS_FOLDER, chunk), "rb") as f: | |
| transcription = client.audio.transcriptions.create(file=(chunk, f.read()), model="whisper-large-v3", response_format="json") | |
| full_text += transcription.text + " " | |
| cost = 10 if not user_key else 0 | |
| update_usage(user['code'], credit_cost=cost, count_increment=1) | |
| shutil.rmtree(CHUNKS_FOLDER); os.remove(filepath) | |
| return jsonify({"transcript": full_text.strip()}) | |
| except Exception as e: return jsonify({"error": str(e)}), 500 | |
| def rewrite(): | |
| user, err = check_auth(request); | |
| if err: return jsonify({"error": err}), 401 | |
| data = request.json; user_key = data.get('api_key'); api_key = user_key if user_key else SERVER_GEMINI_KEY | |
| genai.configure(api_key=api_key); model = genai.GenerativeModel("gemini-2.5-flash") | |
| try: res = model.generate_content(f"Role: Professional Burmese Movie Recap Narrator. Style: Spoken. Text: {data.get('text')}"); return jsonify({"script": res.text}) | |
| except Exception as e: return jsonify({"error": str(e)}), 500 | |
| def tts(): | |
| user, err = check_auth(request); | |
| if err: return jsonify({"error": err}), 401 | |
| data = request.json; path = os.path.join(OUTPUT_FOLDER, f"voice_{uuid.uuid4().hex}.mp3") | |
| try: asyncio.run(edge_tts.Communicate(data['script'], data['voice'], rate=data['speed'], pitch=data['pitch']).save(path)); return jsonify({"audio_url": f"/download/{os.path.basename(path)}"}) | |
| except Exception as e: return jsonify({"error": str(e)}), 500 | |
| def process_video(): | |
| user, err = check_auth(request) | |
| if err: return jsonify({"error": err}), 401 | |
| if user['type'] == 'free': return jsonify({"error": "Feature Locked."}), 403 | |
| try: | |
| v_file = request.files['video']; a_file = request.files['audio'] | |
| v_path = os.path.join(UPLOAD_FOLDER, secure_filename(v_file.filename)); a_path = os.path.join(UPLOAD_FOLDER, secure_filename(a_file.filename)) | |
| v_file.save(v_path); a_file.save(a_path); out_path = os.path.join(OUTPUT_FOLDER, f"final_{uuid.uuid4().hex}.mp4") | |
| cmd = ["ffmpeg", "-y", "-i", v_path, "-i", a_path, "-map", "0:v", "-map", "1:a", "-c:v", "libx264", "-shortest", out_path] | |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) | |
| os.remove(v_path); os.remove(a_path); update_usage(user['code'], credit_cost=20) | |
| return jsonify({"video_url": f"/download/{os.path.basename(out_path)}"}) | |
| except Exception as e: return jsonify({"error": str(e)}), 500 | |
| def thumbnail(): | |
| user, err = check_auth(request) | |
| if err: return jsonify({"error": err}), 401 | |
| if user['type'] == 'free': return jsonify({"error": "Feature Locked."}), 403 | |
| try: | |
| file = request.files['image']; title = request.form.get('title'); img = Image.open(file.stream).convert("RGB") | |
| enhancer = ImageEnhance.Contrast(img); img = enhancer.enhance(1.2) | |
| draw = ImageDraw.Draw(img) | |
| try: font = ImageFont.truetype("/usr/share/fonts/truetype/noto/NotoSansMyanmar-Bold.ttf", 100) | |
| except: font = ImageFont.load_default() | |
| draw.text((50, img.height - 200), title, font=font, fill="yellow", stroke_width=8, stroke_fill="black") | |
| out_path = os.path.join(OUTPUT_FOLDER, f"thumb_{uuid.uuid4().hex}.jpg"); img.save(out_path) | |
| return jsonify({"url": f"/download/{os.path.basename(out_path)}"}) | |
| except Exception as e: return jsonify({"error": str(e)}), 500 | |
| def srt_translate(): | |
| user, err = check_auth(request) | |
| if err: return jsonify({"error": err}), 401 | |
| if user['type'] == 'free': return jsonify({"error": "Feature Locked."}), 403 | |
| return jsonify({"srt_content": "1\n00:00:01,000 --> 00:00:04,000\nDemo Translated Subtitle\n\n"}) | |
| def download(filename): return send_file(os.path.join(OUTPUT_FOLDER, filename), as_attachment=True) | |
| if __name__ == '__main__': app.run(host='0.0.0.0', port=7860) | |