import os import json import sqlite3 import random import time import requests from datetime import datetime from flask import Flask, render_template, request, jsonify, send_from_directory from werkzeug.utils import secure_filename app = Flask(__name__) app.config['SECRET_KEY'] = 'forest-sentry-secret-key-2026' app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max upload app.config['UPLOAD_FOLDER'] = os.path.join(app.instance_path, 'uploads') # Ensure directories exist os.makedirs(app.instance_path, exist_ok=True) os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) DB_PATH = os.path.join(app.instance_path, "forest_sentry.db") SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" # --- Database --- def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): with get_db_connection() as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS plots ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, location TEXT, type TEXT, carbon_stock REAL, fire_risk INTEGER, health_score INTEGER, last_scan TEXT, status TEXT ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, action TEXT, details TEXT ) ''') # Check if empty, add demo data cur = conn.execute('SELECT count(*) FROM plots') if cur.fetchone()[0] == 0: seed_data(conn) print("数据库初始化完成 (Database initialized).") def seed_data(conn): plots = [ ("阿尔法林区 (Alpha)", "45.12, 122.34", "针叶林", 1250.5, 15, 92, datetime.now().strftime("%Y-%m-%d %H:%M"), "正常"), ("贝塔山脊 (Beta)", "45.15, 122.38", "混合林", 890.2, 45, 78, datetime.now().strftime("%Y-%m-%d %H:%M"), "警告"), ("伽马谷地 (Gamma)", "45.10, 122.30", "阔叶林", 1500.0, 5, 98, datetime.now().strftime("%Y-%m-%d %H:%M"), "正常"), ("德尔塔荒地 (Delta)", "45.18, 122.40", "灌木丛", 300.5, 85, 45, datetime.now().strftime("%Y-%m-%d %H:%M"), "危急"), ] conn.executemany('INSERT INTO plots (name, location, type, carbon_stock, fire_risk, health_score, last_scan, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', plots) conn.commit() # --- AI Service --- def call_ai_reasoning(context_data, prompt_type="analyze"): if not SILICONFLOW_API_KEY: return {"error": "No API Key"} headers = { "Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json" } system_prompt = """你是一个“森林哨兵”AI,是林业、生态和野火预防方面的专家。 你分析传感器数据并提供可操作的见解。 输出必须仅为有效的 JSON,JSON 块之外不得有 markdown 格式。 请使用中文回复。""" user_content = "" if prompt_type == "analyze": user_content = f"""分析此林地数据: {json.dumps(context_data, ensure_ascii=False)}。 评估火灾风险 (0-100),碳汇潜力,并建议 3 个具体行动。 返回 JSON: {{ "risk_analysis": "string", "carbon_insight": "string", "actions": ["string", "string", "string"], "alert_level": "正常/警告/危急" }}""" elif prompt_type == "plan": user_content = f"""为以下情况创建无人机任务计划: {json.dumps(context_data, ensure_ascii=False)}。 返回 JSON: {{ "mission_name": "string", "waypoints": ["lat,lon", ...], "estimated_time": "string", "priority": "string" }}""" payload = { "model": "Qwen/Qwen2.5-7B-Instruct", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "response_format": {"type": "json_object"} } try: # Mock Fallback for speed/reliability if needed, but we try API first response = requests.post("https://api.siliconflow.cn/v1/chat/completions", headers=headers, json=payload, timeout=10) if response.status_code == 200: res_json = response.json() content = res_json['choices'][0]['message']['content'] # Clean potential markdown if "```json" in content: content = content.split("```json")[1].split("```")[0] elif "```" in content: content = content.split("```")[1].split("```")[0] return json.loads(content) else: print(f"AI Error: {response.text}") raise Exception("API Error") except Exception as e: print(f"Fallback AI due to: {e}") # Fallback Mock if prompt_type == "analyze": return { "risk_analysis": "AI 服务连接超时。基于启发式规则:检测到高温和低湿度,建议加强巡逻。", "carbon_insight": "预估生长稳定,碳汇潜力中等。", "actions": ["人工巡逻", "检查传感器状态", "更新卫星图像"], "alert_level": "警告" } return {"mission_name": "紧急扫描任务", "waypoints": ["45.12, 122.35", "45.13, 122.36"], "estimated_time": "15分钟", "priority": "高"} # --- Routes --- @app.route('/') def index(): return render_template('index.html') @app.route('/api/plots', methods=['GET']) def get_plots(): with get_db_connection() as conn: plots = conn.execute('SELECT * FROM plots').fetchall() return jsonify([dict(row) for row in plots]) @app.route('/api/analyze', methods=['POST']) def analyze_plot(): data = request.json if not data: return jsonify({"error": "No data"}), 400 # Simulate reading real-time sensors sensor_data = { "temp": random.uniform(15, 35), "humidity": random.uniform(20, 80), "wind_speed": random.uniform(0, 15), "soil_moisture": random.uniform(10, 60), "plot_info": data } # AI Reasoning analysis = call_ai_reasoning(sensor_data, "analyze") # Update DB with new risk if critical with get_db_connection() as conn: if analysis.get('alert_level') in ['警告', '危急', 'High', 'Critical']: # Map English levels to Chinese if AI returns English level = analysis['alert_level'] if level == 'High': level = '警告' if level == 'Critical': level = '危急' conn.execute('UPDATE plots SET status = ?, fire_risk = ? WHERE id = ?', (level, random.randint(70, 95), data['id'])) conn.commit() return jsonify({"sensors": sensor_data, "analysis": analysis}) @app.route('/api/action', methods=['POST']) def take_action(): data = request.json action_type = data.get('type') plot_id = data.get('plot_id') # Tool Execution Simulation result = {"status": "success", "message": "行动已完成"} if action_type == 'drone_scan': # Simulate mission planning plan = call_ai_reasoning({"plot_id": plot_id, "action": "drone_scan"}, "plan") result["data"] = plan result["message"] = f"无人机已派遣: {plan.get('mission_name', '扫描任务')}" # Update Last Scan with get_db_connection() as conn: conn.execute('UPDATE plots SET last_scan = ? WHERE id = ?', (datetime.now().strftime("%Y-%m-%d %H:%M"), plot_id)) conn.commit() elif action_type == 'alert_rangers': result["message"] = "已通过短信/无线电通知阿尔法护林队。" return jsonify(result) @app.route('/api/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 if file: filename = secure_filename(file.filename) # Mock saving to dataset if it's large, or just save locally file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(file_path) return jsonify({"status": "success", "message": f"文件 {filename} 上传成功", "path": file_path}) @app.route('/api/reset', methods=['POST']) def reset_db(): try: if os.path.exists(DB_PATH): os.remove(DB_PATH) init_db() return jsonify({"status": "success", "message": "数据库已重置"}) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == '__main__': init_db() app.run(host='0.0.0.0', port=7860)