| import os |
| import uuid |
| import sys |
| import subprocess |
| import sqlite3 |
| import datetime |
| import threading |
| import time |
| from flask import Flask, render_template, request, send_from_directory, url_for, redirect, flash, session |
| import ffmpeg |
| from werkzeug.utils import secure_filename |
|
|
| |
| def check_ffmpeg_installed(): |
| try: |
| |
| subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) |
| return True |
| except (subprocess.SubprocessError, FileNotFoundError): |
| |
| try: |
| |
| ffmpeg_path = "D:\\ffmpeg\\bin\\ffmpeg.exe" |
| subprocess.run([ffmpeg_path, '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) |
| |
| os.environ['PATH'] = os.environ['PATH'] + os.pathsep + "D:\\ffmpeg\\bin" |
| return True |
| except (subprocess.SubprocessError, FileNotFoundError): |
| return False |
|
|
| |
| if not check_ffmpeg_installed(): |
| print("错误: FFmpeg未安装或未添加到系统环境变量!") |
| print("请安装FFmpeg并确保它已添加到系统环境变量中。") |
| print("Windows安装指南:") |
| print("1. 下载FFmpeg: https://ffmpeg.org/download.html") |
| print("2. 解压到一个目录,例如 C:\\ffmpeg") |
| print("3. 将FFmpeg的bin目录添加到系统环境变量PATH中") |
| print("4. 重启终端并再次运行此应用") |
| sys.exit(1) |
|
|
| |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| HISTORY_DB_FILE = os.path.join(BASE_DIR, 'conversion_history.db') |
| DEBUG_MODE = True |
| CLEANUP_SCHEDULE_HOURS = (4, 16) |
| SCHEDULER_POLL_INTERVAL_SECONDS = 30 |
| _cleanup_scheduler_started = False |
| _cleanup_scheduler_lock = threading.Lock() |
|
|
|
|
| def get_db_connection(): |
| connection = sqlite3.connect(HISTORY_DB_FILE) |
| connection.row_factory = sqlite3.Row |
| return connection |
|
|
|
|
|
|
| def build_history_entry(row): |
| entry = dict(row) |
| stored_output_name = entry.get('stored_output_name') |
| if stored_output_name: |
| entry['download_url'] = url_for('download_file', filename=stored_output_name) |
| return entry |
|
|
|
|
|
|
| def prune_history_entries(connection): |
| connection.execute( |
| ''' |
| DELETE FROM history_entries |
| WHERE id NOT IN ( |
| SELECT id |
| FROM history_entries |
| ORDER BY id DESC |
| LIMIT 100 |
| ) |
| ''' |
| ) |
|
|
|
|
|
|
| def delete_output_file(stored_output_name): |
| if not stored_output_name: |
| return |
|
|
| file_path = os.path.join(app.config['OUTPUT_FOLDER'], stored_output_name) |
| try: |
| if os.path.isfile(file_path): |
| os.remove(file_path) |
| except Exception as e: |
| print(f"删除文件时出错: {e}") |
|
|
|
|
|
|
| def delete_history_rows(connection, row_ids): |
| if not row_ids: |
| return |
|
|
| placeholders = ','.join('?' for _ in row_ids) |
| connection.execute( |
| f'DELETE FROM history_entries WHERE id IN ({placeholders})', |
| tuple(row_ids) |
| ) |
|
|
|
|
|
|
| def cleanup_history_entries(rows, connection=None): |
| row_ids = [] |
| for row in rows: |
| row_id = row['id'] |
| row_ids.append(row_id) |
| delete_output_file(row['stored_output_name']) |
|
|
| if not row_ids: |
| return 0 |
|
|
| if connection is None: |
| with get_db_connection() as owned_connection: |
| delete_history_rows(owned_connection, row_ids) |
| else: |
| delete_history_rows(connection, row_ids) |
|
|
| return len(row_ids) |
|
|
|
|
|
|
| def get_cleanup_slot_boundary(now): |
| slot_hour = now.hour |
| if slot_hour not in CLEANUP_SCHEDULE_HOURS: |
| return None |
| return now.replace(minute=0, second=0, microsecond=0) |
|
|
|
|
|
|
| def get_cleanup_slot_key(now): |
| boundary = get_cleanup_slot_boundary(now) |
| if boundary is None: |
| return None |
| return boundary.strftime('%Y-%m-%d-%H') |
|
|
|
|
|
|
| def claim_cleanup_slot(slot_key, now=None): |
| if not slot_key: |
| return False |
|
|
| executed_at = (now or datetime.datetime.now()).strftime('%Y-%m-%d %H:%M:%S') |
| try: |
| with get_db_connection() as connection: |
| connection.execute( |
| ''' |
| INSERT INTO scheduler_runs (slot_key, executed_at) |
| VALUES (?, ?) |
| ''', |
| (slot_key, executed_at) |
| ) |
| return True |
| except sqlite3.IntegrityError: |
| return False |
|
|
|
|
|
|
| def run_scheduled_cleanup(now=None): |
| now = now or datetime.datetime.now() |
| boundary = get_cleanup_slot_boundary(now) |
| if boundary is None: |
| return 0 |
|
|
| with get_db_connection() as connection: |
| rows = connection.execute( |
| ''' |
| SELECT id, stored_output_name |
| FROM history_entries |
| WHERE timestamp < ? |
| ORDER BY id ASC |
| ''', |
| (boundary.strftime('%Y-%m-%d %H:%M:%S'),) |
| ).fetchall() |
|
|
| deleted_count = cleanup_history_entries(rows, connection=connection) |
|
|
| print(f"定时清理完成: 删除 {deleted_count} 条记录,截止时间 {boundary.strftime('%Y-%m-%d %H:%M:%S')}") |
| return deleted_count |
|
|
|
|
|
|
| def cleanup_scheduler_loop(): |
| while True: |
| now = datetime.datetime.now() |
| slot_key = get_cleanup_slot_key(now) |
| if slot_key and claim_cleanup_slot(slot_key, now=now): |
| try: |
| run_scheduled_cleanup(now=now) |
| except Exception as e: |
| print(f"定时清理失败: {e}") |
|
|
| time.sleep(SCHEDULER_POLL_INTERVAL_SECONDS) |
|
|
|
|
|
|
| def should_start_cleanup_scheduler(): |
| if os.environ.get('WERKZEUG_RUN_MAIN') == 'true': |
| return True |
| return not DEBUG_MODE |
|
|
|
|
|
|
| def start_cleanup_scheduler(): |
| global _cleanup_scheduler_started |
|
|
| if not should_start_cleanup_scheduler(): |
| return False |
|
|
| with _cleanup_scheduler_lock: |
| if _cleanup_scheduler_started: |
| return False |
|
|
| scheduler_thread = threading.Thread( |
| target=cleanup_scheduler_loop, |
| name='scheduled-cleanup-thread', |
| daemon=True |
| ) |
| scheduler_thread.start() |
| _cleanup_scheduler_started = True |
| return True |
|
|
|
|
|
|
| def ensure_history_db_exists(): |
| with get_db_connection() as connection: |
| connection.execute( |
| ''' |
| CREATE TABLE IF NOT EXISTS history_entries ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| history_owner_id TEXT, |
| timestamp TEXT NOT NULL, |
| original_name TEXT NOT NULL, |
| converted_name TEXT, |
| stored_output_name TEXT, |
| status TEXT NOT NULL, |
| error_message TEXT |
| ) |
| ''' |
| ) |
| connection.execute( |
| ''' |
| CREATE TABLE IF NOT EXISTS scheduler_runs ( |
| slot_key TEXT PRIMARY KEY, |
| executed_at TEXT NOT NULL |
| ) |
| ''' |
| ) |
| connection.execute( |
| ''' |
| CREATE INDEX IF NOT EXISTS idx_history_owner_timestamp |
| ON history_entries (history_owner_id, id DESC) |
| ''' |
| ) |
| connection.execute( |
| ''' |
| CREATE INDEX IF NOT EXISTS idx_history_owner_stored_status |
| ON history_entries (history_owner_id, stored_output_name, status) |
| ''' |
| ) |
|
|
|
|
|
|
| def get_or_create_history_owner_id(): |
| history_owner_id = session.get('history_owner_id') |
| if history_owner_id: |
| return history_owner_id |
|
|
| history_owner_id = str(uuid.uuid4()) |
| session['history_owner_id'] = history_owner_id |
| return history_owner_id |
|
|
|
|
|
|
| def create_stored_output_name(display_output_name): |
| return f"{uuid.uuid4()}_{display_output_name}" |
|
|
|
|
| |
| def save_conversion_history(history_entry): |
| history_entry = history_entry.copy() |
| history_entry['timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
| history_entry['history_owner_id'] = get_or_create_history_owner_id() |
|
|
| with get_db_connection() as connection: |
| connection.execute( |
| ''' |
| INSERT INTO history_entries ( |
| history_owner_id, |
| timestamp, |
| original_name, |
| converted_name, |
| stored_output_name, |
| status, |
| error_message |
| ) VALUES (?, ?, ?, ?, ?, ?, ?) |
| ''', |
| ( |
| history_entry.get('history_owner_id'), |
| history_entry.get('timestamp', ''), |
| history_entry.get('original_name', ''), |
| history_entry.get('converted_name'), |
| history_entry.get('stored_output_name'), |
| history_entry.get('status', ''), |
| history_entry.get('error_message') |
| ) |
| ) |
| prune_history_entries(connection) |
|
|
|
|
| |
| def get_conversion_history(history_owner_id): |
| with get_db_connection() as connection: |
| rows = connection.execute( |
| ''' |
| SELECT |
| history_owner_id, |
| timestamp, |
| original_name, |
| converted_name, |
| stored_output_name, |
| status, |
| error_message |
| FROM history_entries |
| WHERE history_owner_id = ? |
| ORDER BY id DESC |
| ''', |
| (history_owner_id,) |
| ).fetchall() |
|
|
| return [build_history_entry(row) for row in rows] |
|
|
|
|
|
|
| def get_download_entry_for_owner(filename, history_owner_id): |
| with get_db_connection() as connection: |
| row = connection.execute( |
| ''' |
| SELECT |
| history_owner_id, |
| timestamp, |
| original_name, |
| converted_name, |
| stored_output_name, |
| status, |
| error_message |
| FROM history_entries |
| WHERE history_owner_id = ? |
| AND status = 'success' |
| AND stored_output_name = ? |
| ORDER BY id DESC |
| LIMIT 1 |
| ''', |
| (history_owner_id, filename) |
| ).fetchone() |
|
|
| if not row: |
| return None |
|
|
| return build_history_entry(row) |
|
|
| app = Flask(__name__) |
| app.config['SECRET_KEY'] = 'video_converter_secret_key' |
| app.config['UPLOAD_FOLDER'] = os.path.join(BASE_DIR, 'uploads') |
| app.config['OUTPUT_FOLDER'] = os.path.join(BASE_DIR, 'outputs') |
| app.config['ALLOWED_EXTENSIONS'] = {'mp4', 'avi', 'mov', 'mkv', 'flv', 'wmv', 'webm'} |
|
|
| |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
| os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True) |
| ensure_history_db_exists() |
|
|
| def allowed_file(filename): |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] |
|
|
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/mp3') |
| def mp3_converter(): |
| return render_template('mp3.html') |
|
|
| @app.route('/upload', methods=['POST']) |
| def upload_file(): |
| if 'files[]' not in request.files: |
| flash('没有选择文件', 'error') |
| return redirect(request.url) |
| |
| files = request.files.getlist('files[]') |
| target_format = request.form.get('format', 'mp4') |
| |
| if not files or files[0].filename == '': |
| flash('没有选择文件', 'error') |
| return redirect(request.url) |
| |
| conversion_results = [] |
| history_entries = [] |
| |
| for file in files: |
| if not file or not file.filename: |
| continue |
|
|
| client_filename = file.filename |
|
|
| if '.' not in client_filename or client_filename.rsplit('.', 1)[1] == '': |
| safe_name = secure_filename(client_filename) or client_filename |
| flash(f'文件 "{safe_name}" 没有有效的扩展名', 'error') |
| continue |
|
|
| file_extension = client_filename.rsplit('.', 1)[1].lower() |
| if file_extension not in app.config['ALLOWED_EXTENSIONS']: |
| safe_name = secure_filename(client_filename) or client_filename |
| flash(f'文件 "{safe_name}" 的格式不支持', 'error') |
| continue |
|
|
| |
| safe_base = secure_filename(client_filename.rsplit('.', 1)[0]) |
| if not safe_base: |
| safe_base = 'file' |
| original_filename = f"{safe_base}.{file_extension}" |
|
|
| |
| unique_id = str(uuid.uuid4()) |
| unique_filename = f"{unique_id}.{file_extension}" |
|
|
| input_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) |
| file.save(input_path) |
|
|
| |
| original_name_without_ext = original_filename.rsplit('.', 1)[0] |
| |
| if original_name_without_ext.startswith('-'): |
| original_name_without_ext = f"file{original_name_without_ext}" |
| output_filename = f"{original_name_without_ext}.{target_format}" |
| stored_output_name = create_stored_output_name(output_filename) |
| output_path = os.path.join(app.config['OUTPUT_FOLDER'], stored_output_name) |
|
|
| try: |
| |
| (ffmpeg |
| .input(input_path) |
| .output(output_path) |
| .run(capture_stdout=True, capture_stderr=True, overwrite_output=True)) |
|
|
| |
| result = { |
| 'original_name': original_filename, |
| 'converted_name': output_filename, |
| 'stored_output_name': stored_output_name, |
| 'download_url': url_for('download_file', filename=stored_output_name), |
| 'status': 'success' |
| } |
| conversion_results.append(result) |
| history_entries.append(result.copy()) |
|
|
| except ffmpeg.Error as e: |
| error_message = e.stderr.decode() if e.stderr else '转换过程中发生错误' |
| print(f"FFmpeg错误: {error_message}") |
| result = { |
| 'original_name': original_filename, |
| 'status': 'error', |
| 'error_message': error_message |
| } |
| conversion_results.append(result) |
| history_entries.append(result.copy()) |
|
|
| |
| try: |
| os.remove(input_path) |
| except Exception as e: |
| print(f"删除文件时出错: {e}") |
| |
| |
| for entry in history_entries: |
| save_conversion_history(entry) |
| |
| |
| print(f"转换结果数量: {len(conversion_results)}") |
| for i, result in enumerate(conversion_results): |
| print(f"结果 {i+1}: {result['original_name']} -> {result.get('converted_name', '转换失败')}") |
| |
| |
| if len(conversion_results) > 0: |
| print("正在渲染结果页面...") |
| return render_template('results.html', results=conversion_results) |
| else: |
| flash('没有文件被转换', 'error') |
| return redirect(url_for('index')) |
|
|
| @app.route('/download/<filename>') |
| def download_file(filename): |
| history_owner_id = get_or_create_history_owner_id() |
| history_entry = get_download_entry_for_owner(filename, history_owner_id) |
| if not history_entry: |
| flash('无法访问该文件', 'error') |
| return redirect(url_for('view_history')) |
|
|
| return send_from_directory( |
| app.config['OUTPUT_FOLDER'], |
| history_entry['stored_output_name'], |
| as_attachment=True, |
| download_name=history_entry['converted_name'] |
| ) |
|
|
| @app.route('/history') |
| def view_history(): |
| history_owner_id = get_or_create_history_owner_id() |
| history = get_conversion_history(history_owner_id) |
| return render_template('history.html', history=history) |
|
|
| @app.route('/convert_to_mp3', methods=['POST']) |
| def convert_to_mp3(): |
| if 'files[]' not in request.files: |
| flash('没有选择文件', 'error') |
| return redirect(request.url) |
| |
| files = request.files.getlist('files[]') |
| audio_quality = request.form.get('quality', '192k') |
| |
| if not files or files[0].filename == '': |
| flash('没有选择文件', 'error') |
| return redirect(request.url) |
| |
| conversion_results = [] |
| history_entries = [] |
| |
| for file in files: |
| if not file or not file.filename: |
| continue |
|
|
| client_filename = file.filename |
|
|
| if '.' not in client_filename or client_filename.rsplit('.', 1)[1] == '': |
| safe_name = secure_filename(client_filename) or client_filename |
| flash(f'文件 "{safe_name}" 没有有效的扩展名', 'error') |
| continue |
|
|
| file_extension = client_filename.rsplit('.', 1)[1].lower() |
| if file_extension not in app.config['ALLOWED_EXTENSIONS']: |
| safe_name = secure_filename(client_filename) or client_filename |
| flash(f'文件 "{safe_name}" 的格式不支持', 'error') |
| continue |
|
|
| |
| safe_base = secure_filename(client_filename.rsplit('.', 1)[0]) |
| if not safe_base: |
| safe_base = 'file' |
| original_filename = f"{safe_base}.{file_extension}" |
|
|
| |
| unique_id = str(uuid.uuid4()) |
| unique_filename = f"{unique_id}.{file_extension}" |
|
|
| input_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) |
| file.save(input_path) |
|
|
| |
| original_name_without_ext = original_filename.rsplit('.', 1)[0] |
| |
| if original_name_without_ext.startswith('-'): |
| original_name_without_ext = f"file{original_name_without_ext}" |
| output_filename = f"{original_name_without_ext}.mp3" |
| stored_output_name = create_stored_output_name(output_filename) |
| output_path = os.path.join(app.config['OUTPUT_FOLDER'], stored_output_name) |
|
|
| try: |
| |
| (ffmpeg |
| .input(input_path) |
| .output(output_path, acodec='libmp3lame', audio_bitrate=audio_quality, format='mp3') |
| .run(capture_stdout=True, capture_stderr=True, overwrite_output=True)) |
|
|
| |
| result = { |
| 'original_name': original_filename, |
| 'converted_name': output_filename, |
| 'stored_output_name': stored_output_name, |
| 'download_url': url_for('download_file', filename=stored_output_name), |
| 'status': 'success' |
| } |
| conversion_results.append(result) |
| history_entries.append(result.copy()) |
|
|
| except ffmpeg.Error as e: |
| error_message = e.stderr.decode() if e.stderr else '转换过程中发生错误' |
| print(f"FFmpeg错误: {error_message}") |
| result = { |
| 'original_name': original_filename, |
| 'status': 'error', |
| 'error_message': error_message |
| } |
| conversion_results.append(result) |
| history_entries.append(result.copy()) |
|
|
| |
| try: |
| os.remove(input_path) |
| except Exception as e: |
| print(f"删除文件时出错: {e}") |
| |
| |
| for entry in history_entries: |
| save_conversion_history(entry) |
| |
| |
| print(f"转换结果数量: {len(conversion_results)}") |
| for i, result in enumerate(conversion_results): |
| print(f"结果 {i+1}: {result['original_name']} -> {result.get('converted_name', '转换失败')}") |
| |
| |
| if len(conversion_results) > 0: |
| print("正在渲染结果页面...") |
| return render_template('results.html', results=conversion_results) |
| else: |
| flash('没有文件被转换', 'error') |
| return redirect(url_for('mp3_converter')) |
|
|
| @app.route('/clear', methods=['POST']) |
| def clear_outputs(): |
| history_owner_id = get_or_create_history_owner_id() |
|
|
| with get_db_connection() as connection: |
| rows = connection.execute( |
| ''' |
| SELECT id, stored_output_name |
| FROM history_entries |
| WHERE history_owner_id = ? |
| ''', |
| (history_owner_id,) |
| ).fetchall() |
|
|
| cleanup_history_entries(rows, connection=connection) |
|
|
| flash('当前浏览器的转换记录和文件已清除', 'success') |
| return redirect(url_for('view_history')) |
|
|
| @app.route('/health') |
| def health(): |
| return {'status': 'healthy'}, 200 |
|
|
| if __name__ == '__main__': |
| print("视频格式转换网页应用已启动!") |
| print(f"请访问: http://127.0.0.1:5000/") |
| if start_cleanup_scheduler(): |
| print("已启动定时清理线程,按服务器本地时间在 04:00 和 16:00 执行清理") |
| app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000)), debug=DEBUG_MODE) |
|
|