Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import sqlite3 | |
| import random | |
| import time | |
| import requests | |
| from datetime import datetime | |
| from flask import Flask, render_template, request, jsonify, send_from_directory | |
| from werkzeug.utils import secure_filename | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'forest-sentry-secret-key-2026' | |
| app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max upload | |
| app.config['UPLOAD_FOLDER'] = os.path.join(app.instance_path, 'uploads') | |
| # Ensure directories exist | |
| os.makedirs(app.instance_path, exist_ok=True) | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| DB_PATH = os.path.join(app.instance_path, "forest_sentry.db") | |
| SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" | |
| # --- Database --- | |
| def get_db_connection(): | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| with get_db_connection() as conn: | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS plots ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| location TEXT, | |
| type TEXT, | |
| carbon_stock REAL, | |
| fire_risk INTEGER, | |
| health_score INTEGER, | |
| last_scan TEXT, | |
| status TEXT | |
| ) | |
| ''') | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT, | |
| action TEXT, | |
| details TEXT | |
| ) | |
| ''') | |
| # Check if empty, add demo data | |
| cur = conn.execute('SELECT count(*) FROM plots') | |
| if cur.fetchone()[0] == 0: | |
| seed_data(conn) | |
| print("数据库初始化完成 (Database initialized).") | |
| def seed_data(conn): | |
| plots = [ | |
| ("阿尔法林区 (Alpha)", "45.12, 122.34", "针叶林", 1250.5, 15, 92, datetime.now().strftime("%Y-%m-%d %H:%M"), "正常"), | |
| ("贝塔山脊 (Beta)", "45.15, 122.38", "混合林", 890.2, 45, 78, datetime.now().strftime("%Y-%m-%d %H:%M"), "警告"), | |
| ("伽马谷地 (Gamma)", "45.10, 122.30", "阔叶林", 1500.0, 5, 98, datetime.now().strftime("%Y-%m-%d %H:%M"), "正常"), | |
| ("德尔塔荒地 (Delta)", "45.18, 122.40", "灌木丛", 300.5, 85, 45, datetime.now().strftime("%Y-%m-%d %H:%M"), "危急"), | |
| ] | |
| conn.executemany('INSERT INTO plots (name, location, type, carbon_stock, fire_risk, health_score, last_scan, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', plots) | |
| conn.commit() | |
| # --- AI Service --- | |
| def call_ai_reasoning(context_data, prompt_type="analyze"): | |
| if not SILICONFLOW_API_KEY: | |
| return {"error": "No API Key"} | |
| headers = { | |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| system_prompt = """你是一个“森林哨兵”AI,是林业、生态和野火预防方面的专家。 | |
| 你分析传感器数据并提供可操作的见解。 | |
| 输出必须仅为有效的 JSON,JSON 块之外不得有 markdown 格式。 | |
| 请使用中文回复。""" | |
| user_content = "" | |
| if prompt_type == "analyze": | |
| user_content = f"""分析此林地数据: {json.dumps(context_data, ensure_ascii=False)}。 | |
| 评估火灾风险 (0-100),碳汇潜力,并建议 3 个具体行动。 | |
| 返回 JSON: {{ "risk_analysis": "string", "carbon_insight": "string", "actions": ["string", "string", "string"], "alert_level": "正常/警告/危急" }}""" | |
| elif prompt_type == "plan": | |
| user_content = f"""为以下情况创建无人机任务计划: {json.dumps(context_data, ensure_ascii=False)}。 | |
| 返回 JSON: {{ "mission_name": "string", "waypoints": ["lat,lon", ...], "estimated_time": "string", "priority": "string" }}""" | |
| payload = { | |
| "model": "Qwen/Qwen2.5-7B-Instruct", | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_content} | |
| ], | |
| "response_format": {"type": "json_object"} | |
| } | |
| try: | |
| # Mock Fallback for speed/reliability if needed, but we try API first | |
| response = requests.post("https://api.siliconflow.cn/v1/chat/completions", headers=headers, json=payload, timeout=10) | |
| if response.status_code == 200: | |
| res_json = response.json() | |
| content = res_json['choices'][0]['message']['content'] | |
| # Clean potential markdown | |
| if "```json" in content: | |
| content = content.split("```json")[1].split("```")[0] | |
| elif "```" in content: | |
| content = content.split("```")[1].split("```")[0] | |
| return json.loads(content) | |
| else: | |
| print(f"AI Error: {response.text}") | |
| raise Exception("API Error") | |
| except Exception as e: | |
| print(f"Fallback AI due to: {e}") | |
| # Fallback Mock | |
| if prompt_type == "analyze": | |
| return { | |
| "risk_analysis": "AI 服务连接超时。基于启发式规则:检测到高温和低湿度,建议加强巡逻。", | |
| "carbon_insight": "预估生长稳定,碳汇潜力中等。", | |
| "actions": ["人工巡逻", "检查传感器状态", "更新卫星图像"], | |
| "alert_level": "警告" | |
| } | |
| return {"mission_name": "紧急扫描任务", "waypoints": ["45.12, 122.35", "45.13, 122.36"], "estimated_time": "15分钟", "priority": "高"} | |
| # --- Routes --- | |
| def index(): | |
| return render_template('index.html') | |
| def get_plots(): | |
| with get_db_connection() as conn: | |
| plots = conn.execute('SELECT * FROM plots').fetchall() | |
| return jsonify([dict(row) for row in plots]) | |
| def analyze_plot(): | |
| data = request.json | |
| if not data: | |
| return jsonify({"error": "No data"}), 400 | |
| # Simulate reading real-time sensors | |
| sensor_data = { | |
| "temp": random.uniform(15, 35), | |
| "humidity": random.uniform(20, 80), | |
| "wind_speed": random.uniform(0, 15), | |
| "soil_moisture": random.uniform(10, 60), | |
| "plot_info": data | |
| } | |
| # AI Reasoning | |
| analysis = call_ai_reasoning(sensor_data, "analyze") | |
| # Update DB with new risk if critical | |
| with get_db_connection() as conn: | |
| if analysis.get('alert_level') in ['警告', '危急', 'High', 'Critical']: | |
| # Map English levels to Chinese if AI returns English | |
| level = analysis['alert_level'] | |
| if level == 'High': level = '警告' | |
| if level == 'Critical': level = '危急' | |
| conn.execute('UPDATE plots SET status = ?, fire_risk = ? WHERE id = ?', | |
| (level, random.randint(70, 95), data['id'])) | |
| conn.commit() | |
| return jsonify({"sensors": sensor_data, "analysis": analysis}) | |
| def take_action(): | |
| data = request.json | |
| action_type = data.get('type') | |
| plot_id = data.get('plot_id') | |
| # Tool Execution Simulation | |
| result = {"status": "success", "message": "行动已完成"} | |
| if action_type == 'drone_scan': | |
| # Simulate mission planning | |
| plan = call_ai_reasoning({"plot_id": plot_id, "action": "drone_scan"}, "plan") | |
| result["data"] = plan | |
| result["message"] = f"无人机已派遣: {plan.get('mission_name', '扫描任务')}" | |
| # Update Last Scan | |
| with get_db_connection() as conn: | |
| conn.execute('UPDATE plots SET last_scan = ? WHERE id = ?', | |
| (datetime.now().strftime("%Y-%m-%d %H:%M"), plot_id)) | |
| conn.commit() | |
| elif action_type == 'alert_rangers': | |
| result["message"] = "已通过短信/无线电通知阿尔法护林队。" | |
| return jsonify(result) | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file part"}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"error": "No selected file"}), 400 | |
| if file: | |
| filename = secure_filename(file.filename) | |
| # Mock saving to dataset if it's large, or just save locally | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| return jsonify({"status": "success", "message": f"文件 {filename} 上传成功", "path": file_path}) | |
| def reset_db(): | |
| try: | |
| if os.path.exists(DB_PATH): | |
| os.remove(DB_PATH) | |
| init_db() | |
| return jsonify({"status": "success", "message": "数据库已重置"}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| init_db() | |
| app.run(host='0.0.0.0', port=7860) | |