import os import base64 import requests import json from flask import Flask, render_template, request, jsonify from PIL import Image import io app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 # 10MB max file size # Gemini API configuration GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY') GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-image-preview:generateContent" ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'} def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def optimize_image(image_bytes, max_size=(1024, 1024), quality=85): """优化图像大小和质量""" try: # 使用 io.BytesIO 将字节数据转换为文件对象,供 Pillow 使用 img = Image.open(io.BytesIO(image_bytes)) if img.mode in ('RGBA', 'LA'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'RGBA': background.paste(img, mask=img.split()[-1]) else: background.paste(img, mask=img.split()[-1]) img = background img.thumbnail(max_size, Image.Resampling.LANCZOS) img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='JPEG', quality=quality, optimize=True) img_byte_arr.seek(0) return img_byte_arr.getvalue() except Exception as e: print(f"Image optimization error: {e}") return image_bytes # 优化失败,返回原始字节数据 def call_gemini_api(text_content=None, image_data=None): """调用Gemini API进行分析""" if not GEMINI_API_KEY: return {"error": "未配置Gemini API密钥,请设置环境变量GEMINI_API_KEY"} system_prompt = """你是一位经验丰富的皮肤科专家医生,具有多年的临床诊疗经验。请基于提供的信息进行专业的皮肤病分析。 分析要求: 仔细观察皮肤病变的形态、颜色、分布、边界等特征 结合患者描述的症状和病史 提供可能的诊断方向(按可能性大小排序) 给出详细的诊疗建议 必要时建议进一步检查 提醒患者注意事项 请按以下格式回复: 🔍 图像观察分析 [详细描述观察到的皮肤病变特征] 📋 症状综合评估 [结合文字描述的症状分析] 🎯 可能诊断 最可能诊断: [诊断名称] - 可能性: X% 依据: [具体理由] 次要考虑: [其他可能诊断] 💊 治疗建议 药物治疗 [具体用药建议] 生活护理 [日常护理要点] 🏥 进一步检查建议 [如需要其他检查,请列出] ⚠️ 注意事项 [重要提醒和注意事项] 免责声明: 此分析仅供临床参考,最终诊断需结合完整病史、体格检查等综合判断。建议患者及时就医,接受专业医师诊疗。""" parts = [] if text_content: parts.append({"text": f"{system_prompt}\n\n患者症状描述:{text_content}"}) else: parts.append({"text": system_prompt + "\n\n请基于图像进行分析。"}) if image_data: image_base64 = base64.b64encode(image_data).decode('utf-8') parts.append({ "inline_data": { "mime_type": "image/jpeg", "data": image_base64 } }) request_body = { "contents": [ { "parts": parts } ], "generationConfig": { "temperature": 0.1, "topK": 40, "topP": 0.95, "maxOutputTokens": 2048, }, "safetySettings": [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE" } ] } headers = { 'Content-Type': 'application/json', 'x-goog-api-key': GEMINI_API_KEY } try: response = requests.post( GEMINI_API_URL, headers=headers, json=request_body, timeout=30 ) response.raise_for_status() result = response.json() if 'candidates' in result and len(result['candidates']) > 0: content = result['candidates'][0]['content']['parts'][0]['text'] return {"result": content} else: return {"error": "API返回了空的响应"} except requests.exceptions.Timeout: return {"error": "请求超时,请稍后重试"} except requests.exceptions.RequestException as e: return {"error": f"网络请求失败: {str(e)}"} except json.JSONDecodeError: return {"error": "API返回了无效的JSON格式"} except Exception as e: return {"error": f"处理响应时发生错误: {str(e)}"} @app.route('/') def index(): """主页路由""" return render_template('index.html') @app.route('/analyze', methods=['POST']) def analyze(): """分析皮肤图像和症状的API端点""" try: text_content = request.form.get('text', '').strip() image_data = None if 'image' in request.files: file = request.files['image'] if file.filename != '': if allowed_file(file.filename): try: # 核心修复:一次性将文件流读入内存 original_image_bytes = file.read() # 调用优化函数,它现在接受字节数据 image_data = optimize_image(original_image_bytes) except Exception as e: return jsonify({"error": f"图像处理失败: {str(e)}"}), 400 else: return jsonify({"error": "不支持的文件格式,请上传PNG、JPG、JPEG或GIF格式的图像"}), 400 if not text_content and not image_data: return jsonify({"error": "请至少提供症状描述或上传皮肤图像"}), 400 result = call_gemini_api(text_content, image_data) return jsonify(result) except Exception as e: return jsonify({"error": f"服务器内部错误: {str(e)}"}), 500 @app.route('/health') def health_check(): """健康检查端点""" return jsonify({ "status": "healthy", "service": "dermatology-assistant", "version": "1.0.0" }) @app.errorhandler(413) def too_large(e): return jsonify({"error": "上传的文件过大,请确保文件小于10MB"}), 413 @app.errorhandler(404) def not_found(e): return jsonify({"error": "请求的资源不存在"}), 404 @app.errorhandler(500) def internal_error(e): return jsonify({"error": "服务器内部错误"}), 500 if __name__ == '__main__': os.makedirs('templates', exist_ok=True) if not GEMINI_API_KEY: print("⚠️ 警告: 未设置GEMINI_API_KEY环境变量") print(" 请设置环境变量: export GEMINI_API_KEY=your_api_key") port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port, debug=False)