PYffmpeg / app.py
github-actions[bot]
Deploy from GitHub Actions (5840e834f99f4d7ac8eff10e242e827ecdc597bf)
b9f2ad1
import os
import uuid
import sys
import subprocess
import sqlite3
import datetime
import threading
import time
from flask import Flask, render_template, request, send_from_directory, url_for, redirect, flash, session
import ffmpeg
from werkzeug.utils import secure_filename
# 检查FFmpeg是否已安装
def check_ffmpeg_installed():
try:
# 首先尝试执行ffmpeg命令(通过环境变量)
subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
return True
except (subprocess.SubprocessError, FileNotFoundError):
# 如果环境变量中找不到,尝试直接使用指定路径
try:
# 尝试使用用户提供的路径
ffmpeg_path = "D:\\ffmpeg\\bin\\ffmpeg.exe"
subprocess.run([ffmpeg_path, '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
# 如果成功,将路径添加到环境变量中,以便后续使用
os.environ['PATH'] = os.environ['PATH'] + os.pathsep + "D:\\ffmpeg\\bin"
return True
except (subprocess.SubprocessError, FileNotFoundError):
return False
# 如果FFmpeg未安装,显示错误信息并退出
if not check_ffmpeg_installed():
print("错误: FFmpeg未安装或未添加到系统环境变量!")
print("请安装FFmpeg并确保它已添加到系统环境变量中。")
print("Windows安装指南:")
print("1. 下载FFmpeg: https://ffmpeg.org/download.html")
print("2. 解压到一个目录,例如 C:\\ffmpeg")
print("3. 将FFmpeg的bin目录添加到系统环境变量PATH中")
print("4. 重启终端并再次运行此应用")
sys.exit(1)
# 历史记录存储路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
HISTORY_DB_FILE = os.path.join(BASE_DIR, 'conversion_history.db')
DEBUG_MODE = True
CLEANUP_SCHEDULE_HOURS = (4, 16)
SCHEDULER_POLL_INTERVAL_SECONDS = 30
_cleanup_scheduler_started = False
_cleanup_scheduler_lock = threading.Lock()
def get_db_connection():
connection = sqlite3.connect(HISTORY_DB_FILE)
connection.row_factory = sqlite3.Row
return connection
def build_history_entry(row):
entry = dict(row)
stored_output_name = entry.get('stored_output_name')
if stored_output_name:
entry['download_url'] = url_for('download_file', filename=stored_output_name)
return entry
def prune_history_entries(connection):
connection.execute(
'''
DELETE FROM history_entries
WHERE id NOT IN (
SELECT id
FROM history_entries
ORDER BY id DESC
LIMIT 100
)
'''
)
def delete_output_file(stored_output_name):
if not stored_output_name:
return
file_path = os.path.join(app.config['OUTPUT_FOLDER'], stored_output_name)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"删除文件时出错: {e}")
def delete_history_rows(connection, row_ids):
if not row_ids:
return
placeholders = ','.join('?' for _ in row_ids)
connection.execute(
f'DELETE FROM history_entries WHERE id IN ({placeholders})',
tuple(row_ids)
)
def cleanup_history_entries(rows, connection=None):
row_ids = []
for row in rows:
row_id = row['id']
row_ids.append(row_id)
delete_output_file(row['stored_output_name'])
if not row_ids:
return 0
if connection is None:
with get_db_connection() as owned_connection:
delete_history_rows(owned_connection, row_ids)
else:
delete_history_rows(connection, row_ids)
return len(row_ids)
def get_cleanup_slot_boundary(now):
slot_hour = now.hour
if slot_hour not in CLEANUP_SCHEDULE_HOURS:
return None
return now.replace(minute=0, second=0, microsecond=0)
def get_cleanup_slot_key(now):
boundary = get_cleanup_slot_boundary(now)
if boundary is None:
return None
return boundary.strftime('%Y-%m-%d-%H')
def claim_cleanup_slot(slot_key, now=None):
if not slot_key:
return False
executed_at = (now or datetime.datetime.now()).strftime('%Y-%m-%d %H:%M:%S')
try:
with get_db_connection() as connection:
connection.execute(
'''
INSERT INTO scheduler_runs (slot_key, executed_at)
VALUES (?, ?)
''',
(slot_key, executed_at)
)
return True
except sqlite3.IntegrityError:
return False
def run_scheduled_cleanup(now=None):
now = now or datetime.datetime.now()
boundary = get_cleanup_slot_boundary(now)
if boundary is None:
return 0
with get_db_connection() as connection:
rows = connection.execute(
'''
SELECT id, stored_output_name
FROM history_entries
WHERE timestamp < ?
ORDER BY id ASC
''',
(boundary.strftime('%Y-%m-%d %H:%M:%S'),)
).fetchall()
deleted_count = cleanup_history_entries(rows, connection=connection)
print(f"定时清理完成: 删除 {deleted_count} 条记录,截止时间 {boundary.strftime('%Y-%m-%d %H:%M:%S')}")
return deleted_count
def cleanup_scheduler_loop():
while True:
now = datetime.datetime.now()
slot_key = get_cleanup_slot_key(now)
if slot_key and claim_cleanup_slot(slot_key, now=now):
try:
run_scheduled_cleanup(now=now)
except Exception as e:
print(f"定时清理失败: {e}")
time.sleep(SCHEDULER_POLL_INTERVAL_SECONDS)
def should_start_cleanup_scheduler():
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
return True
return not DEBUG_MODE
def start_cleanup_scheduler():
global _cleanup_scheduler_started
if not should_start_cleanup_scheduler():
return False
with _cleanup_scheduler_lock:
if _cleanup_scheduler_started:
return False
scheduler_thread = threading.Thread(
target=cleanup_scheduler_loop,
name='scheduled-cleanup-thread',
daemon=True
)
scheduler_thread.start()
_cleanup_scheduler_started = True
return True
def ensure_history_db_exists():
with get_db_connection() as connection:
connection.execute(
'''
CREATE TABLE IF NOT EXISTS history_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
history_owner_id TEXT,
timestamp TEXT NOT NULL,
original_name TEXT NOT NULL,
converted_name TEXT,
stored_output_name TEXT,
status TEXT NOT NULL,
error_message TEXT
)
'''
)
connection.execute(
'''
CREATE TABLE IF NOT EXISTS scheduler_runs (
slot_key TEXT PRIMARY KEY,
executed_at TEXT NOT NULL
)
'''
)
connection.execute(
'''
CREATE INDEX IF NOT EXISTS idx_history_owner_timestamp
ON history_entries (history_owner_id, id DESC)
'''
)
connection.execute(
'''
CREATE INDEX IF NOT EXISTS idx_history_owner_stored_status
ON history_entries (history_owner_id, stored_output_name, status)
'''
)
def get_or_create_history_owner_id():
history_owner_id = session.get('history_owner_id')
if history_owner_id:
return history_owner_id
history_owner_id = str(uuid.uuid4())
session['history_owner_id'] = history_owner_id
return history_owner_id
def create_stored_output_name(display_output_name):
return f"{uuid.uuid4()}_{display_output_name}"
# 保存转换历史记录
def save_conversion_history(history_entry):
history_entry = history_entry.copy()
history_entry['timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
history_entry['history_owner_id'] = get_or_create_history_owner_id()
with get_db_connection() as connection:
connection.execute(
'''
INSERT INTO history_entries (
history_owner_id,
timestamp,
original_name,
converted_name,
stored_output_name,
status,
error_message
) VALUES (?, ?, ?, ?, ?, ?, ?)
''',
(
history_entry.get('history_owner_id'),
history_entry.get('timestamp', ''),
history_entry.get('original_name', ''),
history_entry.get('converted_name'),
history_entry.get('stored_output_name'),
history_entry.get('status', ''),
history_entry.get('error_message')
)
)
prune_history_entries(connection)
# 获取转换历史记录
def get_conversion_history(history_owner_id):
with get_db_connection() as connection:
rows = connection.execute(
'''
SELECT
history_owner_id,
timestamp,
original_name,
converted_name,
stored_output_name,
status,
error_message
FROM history_entries
WHERE history_owner_id = ?
ORDER BY id DESC
''',
(history_owner_id,)
).fetchall()
return [build_history_entry(row) for row in rows]
def get_download_entry_for_owner(filename, history_owner_id):
with get_db_connection() as connection:
row = connection.execute(
'''
SELECT
history_owner_id,
timestamp,
original_name,
converted_name,
stored_output_name,
status,
error_message
FROM history_entries
WHERE history_owner_id = ?
AND status = 'success'
AND stored_output_name = ?
ORDER BY id DESC
LIMIT 1
''',
(history_owner_id, filename)
).fetchone()
if not row:
return None
return build_history_entry(row)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'video_converter_secret_key'
app.config['UPLOAD_FOLDER'] = os.path.join(BASE_DIR, 'uploads')
app.config['OUTPUT_FOLDER'] = os.path.join(BASE_DIR, 'outputs')
app.config['ALLOWED_EXTENSIONS'] = {'mp4', 'avi', 'mov', 'mkv', 'flv', 'wmv', 'webm'}
# 确保上传、输出和历史记录文件存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True)
ensure_history_db_exists()
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
@app.route('/')
def index():
return render_template('index.html')
@app.route('/mp3')
def mp3_converter():
return render_template('mp3.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'files[]' not in request.files:
flash('没有选择文件', 'error')
return redirect(request.url)
files = request.files.getlist('files[]')
target_format = request.form.get('format', 'mp4')
if not files or files[0].filename == '':
flash('没有选择文件', 'error')
return redirect(request.url)
conversion_results = []
history_entries = []
for file in files:
if not file or not file.filename:
continue
client_filename = file.filename
if '.' not in client_filename or client_filename.rsplit('.', 1)[1] == '':
safe_name = secure_filename(client_filename) or client_filename
flash(f'文件 "{safe_name}" 没有有效的扩展名', 'error')
continue
file_extension = client_filename.rsplit('.', 1)[1].lower()
if file_extension not in app.config['ALLOWED_EXTENSIONS']:
safe_name = secure_filename(client_filename) or client_filename
flash(f'文件 "{safe_name}" 的格式不支持', 'error')
continue
# 安全地获取文件名并保存上传的文件
safe_base = secure_filename(client_filename.rsplit('.', 1)[0])
if not safe_base:
safe_base = 'file'
original_filename = f"{safe_base}.{file_extension}"
# 生成唯一文件名以避免冲突
unique_id = str(uuid.uuid4())
unique_filename = f"{unique_id}.{file_extension}"
input_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(input_path)
# 保持原文件名,只改变扩展名
original_name_without_ext = original_filename.rsplit('.', 1)[0]
# 检查文件名是否以连字符开头,如果是,添加前缀
if original_name_without_ext.startswith('-'):
original_name_without_ext = f"file{original_name_without_ext}"
output_filename = f"{original_name_without_ext}.{target_format}"
stored_output_name = create_stored_output_name(output_filename)
output_path = os.path.join(app.config['OUTPUT_FOLDER'], stored_output_name)
try:
# 使用ffmpeg进行格式转换
(ffmpeg
.input(input_path)
.output(output_path)
.run(capture_stdout=True, capture_stderr=True, overwrite_output=True))
# 记录转换结果
result = {
'original_name': original_filename,
'converted_name': output_filename,
'stored_output_name': stored_output_name,
'download_url': url_for('download_file', filename=stored_output_name),
'status': 'success'
}
conversion_results.append(result)
history_entries.append(result.copy())
except ffmpeg.Error as e:
error_message = e.stderr.decode() if e.stderr else '转换过程中发生错误'
print(f"FFmpeg错误: {error_message}")
result = {
'original_name': original_filename,
'status': 'error',
'error_message': error_message
}
conversion_results.append(result)
history_entries.append(result.copy())
# 删除上传的原始文件以节省空间
try:
os.remove(input_path)
except Exception as e:
print(f"删除文件时出错: {e}")
# 批量保存所有转换结果到历史记录
for entry in history_entries:
save_conversion_history(entry)
# 添加调试日志,记录转换结果数量
print(f"转换结果数量: {len(conversion_results)}")
for i, result in enumerate(conversion_results):
print(f"结果 {i+1}: {result['original_name']} -> {result.get('converted_name', '转换失败')}")
# 确保结果页面显示所有转换结果
if len(conversion_results) > 0:
print("正在渲染结果页面...")
return render_template('results.html', results=conversion_results)
else:
flash('没有文件被转换', 'error')
return redirect(url_for('index'))
@app.route('/download/<filename>')
def download_file(filename):
history_owner_id = get_or_create_history_owner_id()
history_entry = get_download_entry_for_owner(filename, history_owner_id)
if not history_entry:
flash('无法访问该文件', 'error')
return redirect(url_for('view_history'))
return send_from_directory(
app.config['OUTPUT_FOLDER'],
history_entry['stored_output_name'],
as_attachment=True,
download_name=history_entry['converted_name']
)
@app.route('/history')
def view_history():
history_owner_id = get_or_create_history_owner_id()
history = get_conversion_history(history_owner_id)
return render_template('history.html', history=history)
@app.route('/convert_to_mp3', methods=['POST'])
def convert_to_mp3():
if 'files[]' not in request.files:
flash('没有选择文件', 'error')
return redirect(request.url)
files = request.files.getlist('files[]')
audio_quality = request.form.get('quality', '192k')
if not files or files[0].filename == '':
flash('没有选择文件', 'error')
return redirect(request.url)
conversion_results = []
history_entries = []
for file in files:
if not file or not file.filename:
continue
client_filename = file.filename
if '.' not in client_filename or client_filename.rsplit('.', 1)[1] == '':
safe_name = secure_filename(client_filename) or client_filename
flash(f'文件 "{safe_name}" 没有有效的扩展名', 'error')
continue
file_extension = client_filename.rsplit('.', 1)[1].lower()
if file_extension not in app.config['ALLOWED_EXTENSIONS']:
safe_name = secure_filename(client_filename) or client_filename
flash(f'文件 "{safe_name}" 的格式不支持', 'error')
continue
# 安全地获取文件名并保存上传的文件
safe_base = secure_filename(client_filename.rsplit('.', 1)[0])
if not safe_base:
safe_base = 'file'
original_filename = f"{safe_base}.{file_extension}"
# 生成唯一文件名以避免冲突
unique_id = str(uuid.uuid4())
unique_filename = f"{unique_id}.{file_extension}"
input_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(input_path)
# 保持原文件名,只改变扩展名为mp3
original_name_without_ext = original_filename.rsplit('.', 1)[0]
# 检查文件名是否以连字符开头,如果是,添加前缀
if original_name_without_ext.startswith('-'):
original_name_without_ext = f"file{original_name_without_ext}"
output_filename = f"{original_name_without_ext}.mp3"
stored_output_name = create_stored_output_name(output_filename)
output_path = os.path.join(app.config['OUTPUT_FOLDER'], stored_output_name)
try:
# 使用ffmpeg提取音频并转换为MP3格式
(ffmpeg
.input(input_path)
.output(output_path, acodec='libmp3lame', audio_bitrate=audio_quality, format='mp3')
.run(capture_stdout=True, capture_stderr=True, overwrite_output=True))
# 记录转换结果
result = {
'original_name': original_filename,
'converted_name': output_filename,
'stored_output_name': stored_output_name,
'download_url': url_for('download_file', filename=stored_output_name),
'status': 'success'
}
conversion_results.append(result)
history_entries.append(result.copy())
except ffmpeg.Error as e:
error_message = e.stderr.decode() if e.stderr else '转换过程中发生错误'
print(f"FFmpeg错误: {error_message}")
result = {
'original_name': original_filename,
'status': 'error',
'error_message': error_message
}
conversion_results.append(result)
history_entries.append(result.copy())
# 删除上传的原始文件以节省空间
try:
os.remove(input_path)
except Exception as e:
print(f"删除文件时出错: {e}")
# 批量保存所有转换结果到历史记录
for entry in history_entries:
save_conversion_history(entry)
# 添加调试日志,记录转换结果数量
print(f"转换结果数量: {len(conversion_results)}")
for i, result in enumerate(conversion_results):
print(f"结果 {i+1}: {result['original_name']} -> {result.get('converted_name', '转换失败')}")
# 确保结果页面显示所有转换结果
if len(conversion_results) > 0:
print("正在渲染结果页面...")
return render_template('results.html', results=conversion_results)
else:
flash('没有文件被转换', 'error')
return redirect(url_for('mp3_converter'))
@app.route('/clear', methods=['POST'])
def clear_outputs():
history_owner_id = get_or_create_history_owner_id()
with get_db_connection() as connection:
rows = connection.execute(
'''
SELECT id, stored_output_name
FROM history_entries
WHERE history_owner_id = ?
''',
(history_owner_id,)
).fetchall()
cleanup_history_entries(rows, connection=connection)
flash('当前浏览器的转换记录和文件已清除', 'success')
return redirect(url_for('view_history'))
@app.route('/health')
def health():
return {'status': 'healthy'}, 200
if __name__ == '__main__':
print("视频格式转换网页应用已启动!")
print(f"请访问: http://127.0.0.1:5000/")
if start_cleanup_scheduler():
print("已启动定时清理线程,按服务器本地时间在 04:00 和 16:00 执行清理")
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000)), debug=DEBUG_MODE)