Copilot copilot-swe-agent[bot] BlueSkyXN commited on
Commit
447f0bd
·
unverified ·
1 Parent(s): ff725f8

Code review: security hardening and bug fixes (#6)

Browse files

* Initial plan

* Code review and fixes: security improvements, bug fixes, code quality

Co-authored-by: BlueSkyXN <63384277+BlueSkyXN@users.noreply.github.com>

* Delete CODE_REVIEW_REPORT.md

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: BlueSkyXN <63384277+BlueSkyXN@users.noreply.github.com>

Files changed (2) hide show
  1. README.md +1 -1
  2. main.py +84 -23
README.md CHANGED
@@ -8,7 +8,7 @@ app_port: 8000 # 你的 FastAPI 应用在容器内部监听的端口 (必须与
8
  pinned: false # 是否在你的个人资料页置顶这个 Space (可选)
9
  ---
10
 
11
- # 🧙‍♂️ Magick 动态图像转换 API (V3)
12
 
13
  本项目提供一个基于 FastAPI 和 ImageMagick 的高性能 REST API,支持通过动态 URL 路径对图像进行多格式转换,包括动画图像处理。
14
 
 
8
  pinned: false # 是否在你的个人资料页置顶这个 Space (可选)
9
  ---
10
 
11
+ # 🧙‍♂️ Magick 动态图像转换 API (V4)
12
 
13
  本项目提供一个基于 FastAPI 和 ImageMagick 的高性能 REST API,支持通过动态 URL 路径对图像进行多格式转换,包括动画图像处理。
14
 
main.py CHANGED
@@ -13,7 +13,6 @@ ImageMagick 动态图像转换 API
13
  - GET /health
14
  """
15
 
16
- import fastapi
17
  from fastapi import (
18
  FastAPI,
19
  File,
@@ -24,16 +23,16 @@ from fastapi import (
24
  Form,
25
  Request
26
  )
27
- from fastapi.responses import FileResponse, JSONResponse, HTMLResponse
28
  from fastapi.staticfiles import StaticFiles
29
  from fastapi.templating import Jinja2Templates
30
- import subprocess
31
  import asyncio
32
  import tempfile
33
  import os
34
  import shutil
35
  import logging
36
  import uuid
 
37
  from typing import Literal
38
 
39
  # --- 1. 应用配置 ---
@@ -101,6 +100,56 @@ async def get_upload_file_size(upload_file: UploadFile) -> int:
101
  upload_file.file.seek(current_position) # 恢复原始指针位置
102
  return size
103
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  def cleanup_temp_dir(temp_dir: str):
105
  """
106
  在后台任务中安全地清理临时会话目录。
@@ -189,6 +238,10 @@ async def _perform_conversion(
189
  """
190
  logger.info(f"开始转换: {target_format}/{mode}/{setting} (文件: {file.filename})")
191
 
 
 
 
 
192
  # 预检查: AVIF/HEIF 格式需要 heif-enc 依赖
193
  if target_format in ["avif", "heif"]:
194
  try:
@@ -201,13 +254,13 @@ async def _perform_conversion(
201
  if proc_check.returncode != 0:
202
  raise HTTPException(
203
  status_code=503,
204
- detail=f"AVIF/HEIF encoding is not available. heif-enc encoder not found."
205
  )
206
  except Exception as e:
207
  logger.error(f"依赖检查失败: {e}")
208
  raise HTTPException(
209
  status_code=503,
210
- detail=f"Unable to verify AVIF/HEIF encoder availability."
211
  )
212
 
213
  # 1. 验证文件扩展名
@@ -222,7 +275,16 @@ async def _perform_conversion(
222
  detail=f"Unsupported file format: {file_ext}. Allowed formats: {', '.join(allowed_extensions)}"
223
  )
224
 
225
- # 2. 验证文件大小
 
 
 
 
 
 
 
 
 
226
  file_size_mb = await get_upload_file_size(file) / (1024 * 1024)
227
  if file_size_mb > MAX_FILE_SIZE_MB:
228
  logger.warning(f"文件过大: {file_size_mb:.2f}MB (最大: {MAX_FILE_SIZE_MB}MB)")
@@ -231,7 +293,7 @@ async def _perform_conversion(
231
  detail=f"File too large. Max size is {MAX_FILE_SIZE_MB}MB."
232
  )
233
 
234
- # 3. 创建唯一的临时工作目录
235
  session_id = str(uuid.uuid4())
236
  temp_dir = os.path.join(TEMP_DIR, session_id)
237
  os.makedirs(temp_dir, exist_ok=True)
@@ -242,15 +304,14 @@ async def _perform_conversion(
242
 
243
  logger.info(f"正在临时目录中处理: {temp_dir}")
244
 
245
- cleanup_scheduled = False
246
  try:
247
- # 4. 保存上传的文件到临时输入路径
248
  logger.info(f"正在保存上传的文件 '{file.filename}' 至 '{input_path}'")
249
  with open(input_path, "wb") as buffer:
250
  shutil.copyfileobj(file.file, buffer)
251
  logger.info("文件保存成功。")
252
 
253
- # 5. 动态构建 ImageMagick 命令行参数
254
  cmd = ['magick', input_path]
255
 
256
  # 关键: 仅对动画格式使用 -coalesce 以优化性能
@@ -260,7 +321,7 @@ async def _perform_conversion(
260
  if file_extension.lower() in animated_formats or target_format in ['gif', 'webp']:
261
  cmd.append('-coalesce')
262
 
263
- # --- 5a. 无损 (lossless) 模式逻辑 ---
264
  if mode == "lossless":
265
  # 'setting' (0-100) 代表压缩速度 (0=最佳/最慢, 100=最快/最差)
266
 
@@ -301,9 +362,8 @@ async def _perform_conversion(
301
  # GIF 始终是基于调色板的无损
302
  # -layers optimize 用于优化动图帧
303
  cmd.extend(['-layers', 'optimize'])
304
- pass # Magick 默认值适用于无损GIF
305
 
306
- # --- 5b. 有损 (lossy) 模式逻辑 ---
307
  elif mode == "lossy":
308
  # 'setting' (0-100) 代表 质量 (0=最差, 100=最佳)
309
  quality = setting
@@ -339,14 +399,14 @@ async def _perform_conversion(
339
  cmd.extend(['-layers', 'optimize'])
340
 
341
 
342
- # 6. 添加输出路径并完成命令构建
343
  cmd.append(output_path)
344
  command_str = ' '.join(cmd)
345
  logger.info(f"正在执行命令: {command_str}")
346
 
347
- # 7. 异步执行 Magick 命令 (使用信号量限制并发)
348
  async with conversion_semaphore:
349
- logger.info(f"获取并发许可,开始ImageMagick处理")
350
  process = await asyncio.subprocess.create_subprocess_exec(
351
  *cmd,
352
  stdout=asyncio.subprocess.PIPE,
@@ -357,18 +417,19 @@ async def _perform_conversion(
357
  timeout=TIMEOUT_SECONDS
358
  )
359
 
360
- # 8. 检查命令执行结果
361
  if process.returncode != 0:
362
- error_message = f"Magick failed: {stderr.decode()[:1000]}"
363
- logger.error(error_message)
364
- raise HTTPException(status_code=500, detail=error_message)
 
365
 
366
  if not os.path.exists(output_path):
367
  error_message = "Magick 命令成功执行,但未找到输出文件。"
368
  logger.error(error_message)
369
- raise HTTPException(status_code=500, detail=error_message)
370
 
371
- # 9. 成功:准备并返回文件响应
372
  logger.info(f"转换成功。输出文件: '{output_path}'")
373
 
374
  original_filename_base = os.path.splitext(file.filename)[0]
@@ -403,7 +464,7 @@ async def _perform_conversion(
403
  # 确保关闭上传的文件句柄
404
  await file.close()
405
  # 备用清理:仅当未注册后台任务时立即清理
406
- if not cleanup_scheduled and os.path.exists(temp_dir):
407
  cleanup_temp_dir(temp_dir)
408
 
409
  @app.post("/", response_class=FileResponse, summary="简化上传转换")
 
13
  - GET /health
14
  """
15
 
 
16
  from fastapi import (
17
  FastAPI,
18
  File,
 
23
  Form,
24
  Request
25
  )
26
+ from fastapi.responses import FileResponse, JSONResponse
27
  from fastapi.staticfiles import StaticFiles
28
  from fastapi.templating import Jinja2Templates
 
29
  import asyncio
30
  import tempfile
31
  import os
32
  import shutil
33
  import logging
34
  import uuid
35
+ import imghdr
36
  from typing import Literal
37
 
38
  # --- 1. 应用配置 ---
 
100
  upload_file.file.seek(current_position) # 恢复原始指针位置
101
  return size
102
 
103
+ async def validate_image_content(upload_file: UploadFile) -> bool:
104
+ """
105
+ 验证上传文件是否为有效的图像文件(通过文件头魔数检测)。
106
+
107
+ 此函数通过检查文件头部的魔数(magic bytes)来验证文件的真实类型,
108
+ 防止恶意文件通过修改扩展名绕过验证。
109
+
110
+ Args:
111
+ upload_file: FastAPI 的 UploadFile 对象。
112
+
113
+ Returns:
114
+ True 如果文件是有效的图像,False 否则。
115
+ """
116
+ # 保存当前位置
117
+ current_position = upload_file.file.tell()
118
+ upload_file.file.seek(0)
119
+
120
+ # 读取文件头部用于检测
121
+ file_header = upload_file.file.read(32)
122
+ upload_file.file.seek(current_position) # 恢复原始指针位置
123
+
124
+ # 使用 imghdr 检测图像类型
125
+ img_type = imghdr.what(None, h=file_header)
126
+
127
+ # 支持的图像类型
128
+ valid_types = {'jpeg', 'png', 'gif', 'webp', 'bmp', 'tiff'}
129
+
130
+ if img_type in valid_types:
131
+ return True
132
+
133
+ # imghdr 不支持某些格式,需要手动检测魔数
134
+ # AVIF/HEIF 检测 (ftyp box)
135
+ if len(file_header) >= 12:
136
+ # HEIF/AVIF 文件以 ftyp box 开头
137
+ ftyp_offset = file_header[4:8]
138
+ if ftyp_offset == b'ftyp':
139
+ # 检查品牌类型
140
+ brand = file_header[8:12]
141
+ # 常见的 HEIF/AVIF 品牌
142
+ heif_brands = [b'heic', b'heix', b'hevc', b'hevx', b'mif1', b'msf1', b'avif', b'avis']
143
+ if brand in heif_brands:
144
+ return True
145
+
146
+ # WebP 可能 imghdr 检测不到的情况
147
+ if len(file_header) >= 12:
148
+ if file_header[:4] == b'RIFF' and file_header[8:12] == b'WEBP':
149
+ return True
150
+
151
+ return False
152
+
153
  def cleanup_temp_dir(temp_dir: str):
154
  """
155
  在后台任务中安全地清理临时会话目录。
 
238
  """
239
  logger.info(f"开始转换: {target_format}/{mode}/{setting} (文件: {file.filename})")
240
 
241
+ # 初始化临时目录变量,确保 finally 块中可以安全访问
242
+ temp_dir = None
243
+ cleanup_scheduled = False
244
+
245
  # 预检查: AVIF/HEIF 格式需要 heif-enc 依赖
246
  if target_format in ["avif", "heif"]:
247
  try:
 
254
  if proc_check.returncode != 0:
255
  raise HTTPException(
256
  status_code=503,
257
+ detail="AVIF/HEIF encoding is not available. heif-enc encoder not found."
258
  )
259
  except Exception as e:
260
  logger.error(f"依赖检查失败: {e}")
261
  raise HTTPException(
262
  status_code=503,
263
+ detail="Unable to verify AVIF/HEIF encoder availability."
264
  )
265
 
266
  # 1. 验证文件扩展名
 
275
  detail=f"Unsupported file format: {file_ext}. Allowed formats: {', '.join(allowed_extensions)}"
276
  )
277
 
278
+ # 2. 验证文件内容(魔数检查,防止恶意文件)
279
+ is_valid_image = await validate_image_content(file)
280
+ if not is_valid_image:
281
+ logger.warning(f"文件内容验证失败: {file.filename} - 文件头魔数不匹配图像格式")
282
+ raise HTTPException(
283
+ status_code=400,
284
+ detail="Invalid image file content. The file does not appear to be a valid image."
285
+ )
286
+
287
+ # 3. 验证文件大小
288
  file_size_mb = await get_upload_file_size(file) / (1024 * 1024)
289
  if file_size_mb > MAX_FILE_SIZE_MB:
290
  logger.warning(f"文件过大: {file_size_mb:.2f}MB (最大: {MAX_FILE_SIZE_MB}MB)")
 
293
  detail=f"File too large. Max size is {MAX_FILE_SIZE_MB}MB."
294
  )
295
 
296
+ # 4. 创建唯一的临时工作目录
297
  session_id = str(uuid.uuid4())
298
  temp_dir = os.path.join(TEMP_DIR, session_id)
299
  os.makedirs(temp_dir, exist_ok=True)
 
304
 
305
  logger.info(f"正在临时目录中处理: {temp_dir}")
306
 
 
307
  try:
308
+ # 5. 保存上传的文件到临时输入路径
309
  logger.info(f"正在保存上传的文件 '{file.filename}' 至 '{input_path}'")
310
  with open(input_path, "wb") as buffer:
311
  shutil.copyfileobj(file.file, buffer)
312
  logger.info("文件保存成功。")
313
 
314
+ # 6. 动态构建 ImageMagick 命令行参数
315
  cmd = ['magick', input_path]
316
 
317
  # 关键: 仅对动画格式使用 -coalesce 以优化性能
 
321
  if file_extension.lower() in animated_formats or target_format in ['gif', 'webp']:
322
  cmd.append('-coalesce')
323
 
324
+ # --- 6a. 无损 (lossless) 模式逻辑 ---
325
  if mode == "lossless":
326
  # 'setting' (0-100) 代表压缩速度 (0=最佳/最慢, 100=最快/最差)
327
 
 
362
  # GIF 始终是基于调色板的无损
363
  # -layers optimize 用于优化动图帧
364
  cmd.extend(['-layers', 'optimize'])
 
365
 
366
+ # --- 6b. 有损 (lossy) 模式逻辑 ---
367
  elif mode == "lossy":
368
  # 'setting' (0-100) 代表 质量 (0=最差, 100=最佳)
369
  quality = setting
 
399
  cmd.extend(['-layers', 'optimize'])
400
 
401
 
402
+ # 7. 添加输出路径并完成命令构建
403
  cmd.append(output_path)
404
  command_str = ' '.join(cmd)
405
  logger.info(f"正在执行命令: {command_str}")
406
 
407
+ # 8. 异步执行 Magick 命令 (使用信号量限制并发)
408
  async with conversion_semaphore:
409
+ logger.info("获取并发许可,开始ImageMagick处理")
410
  process = await asyncio.subprocess.create_subprocess_exec(
411
  *cmd,
412
  stdout=asyncio.subprocess.PIPE,
 
417
  timeout=TIMEOUT_SECONDS
418
  )
419
 
420
+ # 9. 检查命令执行结果
421
  if process.returncode != 0:
422
+ error_detail = stderr.decode()
423
+ logger.error(f"Magick failed: {error_detail}")
424
+ # 不向用户暴露完整的错误信息,防止信息泄露
425
+ raise HTTPException(status_code=500, detail="Image conversion failed. Please check your input file and parameters.")
426
 
427
  if not os.path.exists(output_path):
428
  error_message = "Magick 命令成功执行,但未找到输出文件。"
429
  logger.error(error_message)
430
+ raise HTTPException(status_code=500, detail="Conversion completed but output file not found.")
431
 
432
+ # 10. 成功:准备并返回文件响应
433
  logger.info(f"转换成功。输出文件: '{output_path}'")
434
 
435
  original_filename_base = os.path.splitext(file.filename)[0]
 
464
  # 确保关闭上传的文件句柄
465
  await file.close()
466
  # 备用清理:仅当未注册后台任务时立即清理
467
+ if temp_dir is not None and not cleanup_scheduled and os.path.exists(temp_dir):
468
  cleanup_temp_dir(temp_dir)
469
 
470
  @app.post("/", response_class=FileResponse, summary="简化上传转换")