# https://ynsdfz.aliwork.com/APP_DKQG7TSUK6ZNU3RDFGVO/workbench?corpid=dingd16cf4422967594bf2c783f7214b6d69&dd_addcookie=true&from_login=success&login_host=ynsdfz.aliwork.com&login_pt=4-296-307-385&dtcode=e1bec99a908e374fa77d5e8a06109972&code_type=jsapi&dd_enable_replace=true&ddtab=true import json import requests import os import re import subprocess from datetime import datetime from flask import Flask, render_template, request, jsonify import socket from xpinyin import Pinyin app = Flask(__name__) p = Pinyin() STUDENTS_DATA = [] STUDENTS_FILE = "students.json" DOT_NOTIFY_URL = "https://dot.mindreset.tech/api/open/text" DOT_AUTH_TOKEN = "dot_app_XdfKhLlhOiyNWfSepKUytXlxwBfERNkiFZzYYkRrLOeXVjOIacBEjhqJDedXNKkw" DOT_ICON = "" def load_students_data(): global STUDENTS_DATA try: if os.path.exists(STUDENTS_FILE): with open(STUDENTS_FILE, 'r', encoding='utf-8') as f: STUDENTS_DATA = json.load(f) print(f"✅ 成功加载学生数据,共 {len(STUDENTS_DATA)} 条记录") else: print(f"⚠️ 学生数据文件 {STUDENTS_FILE} 不存在") STUDENTS_DATA = [] except Exception as e: print(f"❌ 加载学生数据失败: {e}") STUDENTS_DATA = [] class DingTalkFormQueryClient: def __init__(self): self.base_url = "https://ynsdfz.aliwork.com/dingtalk/web/APP_DKQG7TSUK6ZNU3RDFGVO/v1/form/searchFormDatas.json" self.form_uuid = "FORM-72EF2444A3564C6D8020F1CDC36487EDHWE1" self.cookies_file = "cookies.txt" # 字段映射表 self.field_mapping = { # 基本信息 'textField_mhsv4lgl': '姓名', # 新版表单 'radioField_ts1jbr0': '姓名', # 旧版表单 'textField_lqrf9fj': '学号', 'textField_sm64ysf': '考试名称', 'textField_mhsv4lgk': '身份证号', # 新版表单 'radioField_8a4voiy': '身份证号', # 旧版表单 # 语文 'textField_4ecclpd': '语文', 'textField_jqsfnel': '语文校次', # 数学 'textField_djh9jbz': '数学', 'textField_sm6gl9b': '数学校次', # 英语 'textField_s5awu09': '英语', 'textField_yrtoxm6': '英语校次', # 物理 'textField_j7nxew8': '物理', 'textField_8iavn0r': '物理校次', # 历史 'textField_7vh26my': '历史', 'textField_3nigx7n': '历史校次', # 化学 'textField_d9v8hqi': '化学', 'textField_eecpqqn': '化学校次', # 生物 'textField_6mk8ivr': '生物', 'textField_gjcqbzz': '生物校次', # 政治 'textField_mqmwhoa': '政治', 'textField_q60j1vs': '政治校次', # 地理 'textField_5iiw6pt': '地理', 'textField_xshus0b': '地理校次', # 总分和排名 'textField_4bphzli': '总分', 'textField_y4uc1lx': '组合排名', 'textField_a2l7vuc': '大类排名', } # 查询身份证号时优先尝试的字段顺序(兼容新旧表单) self.id_field_priority = [ 'textField_mhsv4lgk', # 新版 'radioField_8a4voiy', # 旧版 ] # 考试排序列表(时间正序) self.exam_order = [ "26届高一上中", "26届高一上末", "26届高一下中", "26届高一下末", "26届高二上中", "26届高二上末", "26届高二下中", "26届高二下末", "26届高二月考1", "26届高三月考2", "26届高三月考3", "26届高三月考4" ] def load_cookies_from_file(self): """从文件加载cookies""" if not os.path.exists(self.cookies_file): # 在Web应用中,打印到控制台可能不是最佳选择,但为了简单起见,暂时保留 print(f"❌ Cookie文件 {self.cookies_file} 不存在") return None try: with open(self.cookies_file, 'r', encoding='utf-8') as f: cookie_string = f.read().strip() if not cookie_string: print(f"❌ Cookie文件 {self.cookies_file} 为空") return None cookies = {} for item in cookie_string.split(';'): if '=' in item: key, value = item.strip().split('=', 1) cookies[key.strip()] = value.strip() print(f"✅ 成功从 {self.cookies_file} 加载cookies") # 控制台日志 return cookies except Exception as e: print(f"❌ 读取cookie文件失败: {e}") # 控制台日志 return None def extract_csrf_token(self, cookies): """从cookies中提取CSRF令牌""" csrf_token = cookies.get('tianshu_csrf_token') if not csrf_token: print("❌ 在cookies中未找到tianshu_csrf_token") # 控制台日志 return None return csrf_token def validate_id_number(self, id_number): """验证身份证号格式""" pattern = r'^\d{17}[\dXx]$' if not re.match(pattern, id_number): return False return True def build_request_url(self, id_number, csrf_token, search_field_id): """构建请求URL""" if not search_field_id: raise ValueError("search_field_id 不能为空") search_json = json.dumps({search_field_id: id_number}) params = { 'formUuid': self.form_uuid, 'searchFieldJson': search_json, 'currentPage': 1, 'pageSize': 30, '_csrf_token': csrf_token } param_string = '&'.join([f"{k}={requests.utils.quote(str(v))}" for k, v in params.items()]) return f"{self.base_url}?{param_string}" def query_scores(self, id_number): """查询成绩 - 修复版本""" # 验证身份证号 if not self.validate_id_number(id_number): return {"success": False, "error": "身份证号格式不正确,请输入18位身份证号"} # 加载cookies cookies = self.load_cookies_from_file() if not cookies: return {"success": False, "error": f"无法加载cookies,请检查服务器端的 {self.cookies_file} 文件"} # 提取CSRF令牌 csrf_token = self.extract_csrf_token(cookies) if not csrf_token: return {"success": False, "error": "无法提取CSRF令牌,请检查cookie中的tianshu_csrf_token"} headers = { 'Accept': 'application/json, text/json', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7', 'Referer': 'https://ynsdfz.aliwork.com/APP_DKQG7TSUK6ZNU3RDFGVO/workbench', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'X-Requested-With': 'XMLHttpRequest' } last_success_empty = None for search_field in self.id_field_priority: result = self._perform_query( id_number=id_number, cookies=cookies, csrf_token=csrf_token, headers=headers, search_field=search_field ) if not result.get("success", False): # 直接返回错误信息 return result if result.get("data"): if search_field != self.id_field_priority[0]: print(f"ℹ️ 通过备用字段 {search_field} 匹配到成绩数据") return result last_success_empty = result # 所有字段都无数据,返回最后一次成功但无记录的结果或通用提示 if last_success_empty: return last_success_empty return {"success": True, "data": [], "message": "未找到相关成绩记录"} def _perform_query(self, id_number, cookies, csrf_token, headers, search_field): """执行实际的HTTP查询,并统一处理异常""" try: url = self.build_request_url(id_number, csrf_token, search_field) except ValueError as e: return {"success": False, "error": str(e)} try: print(f"🔍 正在使用字段 {search_field} 查询身份证号: {id_number}") # 控制台日志 response = requests.get( url, headers=headers, cookies=cookies, timeout=(10, 30) ) print(f"📡 HTTP状态码: {response.status_code}") # 控制台日志 if response.status_code != 200: return {"success": False, "error": f"HTTP请求失败,状态码: {response.status_code}"} try: data = response.json() print(f"📦 收到响应数据") # 控制台日志 except json.JSONDecodeError as e: print(f"❌ JSON解析失败: {e}") # 控制台日志 return {"success": False, "error": "服务器返回的不是有效的JSON格式"} parsed = self._parse_response(data) if parsed.get("success") and not parsed.get("data"): parsed.setdefault("message", "未找到相关成绩记录") return parsed except requests.exceptions.Timeout: return {"success": False, "error": "请求超时,请检查网络连接"} except requests.exceptions.ConnectionError: return {"success": False, "error": "网络连接错误,请检查网络状态"} except Exception as e: return {"success": False, "error": f"请求失败: {e}"} def _parse_response(self, data): """解析服务器响应 - 修复核心逻辑""" try: if not isinstance(data, dict): return {"success": False, "error": "服务器返回的数据格式错误:不是字典类型"} print(f"📋 响应字段: {list(data.keys())}") # 控制台日志 success = data.get('success') if success is False: error_msg = data.get('errorMsg', '未知API错误') return {"success": False, "error": f"API返回错误: {error_msg}"} if success is not True: print(f"⚠️ success字段值: {success}") # 控制台日志 content = data.get('content') if not content: return {"success": False, "error": "响应中缺少content字段"} if not isinstance(content, dict): return {"success": False, "error": "content字段格式错误"} records = content.get('data') if records is None: # 可能表示没有数据,但请求本身是成功的 return {"success": True, "data": [], "message": "未找到相关成绩记录"} if not isinstance(records, list): return {"success": False, "error": "data字段不是数组格式"} if len(records) == 0: return {"success": True, "data": [], "message": "未找到相关成绩记录"} print(f"📊 找到 {len(records)} 条成绩记录") # 控制台日志 processed_data = self.process_score_data(records) return {"success": True, "data": processed_data} except Exception as e: return {"success": False, "error": f"解析响应数据时出错: {e}"} def process_score_data(self, records): """处理成绩数据""" processed_records = [] for i, record in enumerate(records): try: # print(f"处理第 {i+1} 条记录...") # 控制台日志,可选择性保留 form_data = record.get('formData', {}) if not form_data: # print(f"⚠️ 第 {i+1} 条记录缺少formData") # 控制台日志 continue basic_info = { '考试时间': self.timestamp_to_date(record.get('gmtCreate', 0)), '标题': record.get('title', ''), '表单ID': record.get('formInstId', '') } mapped_data = {} for field_id, value in form_data.items(): field_name = self.field_mapping.get(field_id, field_id) if value == "" or value == "-" or value is None: mapped_data[field_name] = "无数据" else: mapped_data[field_name] = str(value) processed_record = {**basic_info, **mapped_data} processed_records.append(processed_record) except Exception as e: # print(f"⚠️ 处理第 {i+1} 条记录时出错: {e}") # 控制台日志 continue # 对成绩记录进行排序 def get_sort_key(record): exam_name = record.get('考试名称', '') try: return self.exam_order.index(exam_name) except ValueError: # 如果不在列表中,排在最后 return len(self.exam_order) + 1 processed_records.sort(key=get_sort_key) # 为每条记录添加排序索引,方便前端排序 for i, record in enumerate(processed_records): record['sort_index'] = i # 计算每次考试的趋势(相比上一次) for i in range(len(processed_records)): current = processed_records[i] if i == 0: current['rank_trend'] = "无变化" current['score_trend'] = "N/A" else: prev = processed_records[i-1] # 计算排名趋势 try: curr_rank_str = str(current.get('大类排名', '0')) prev_rank_str = str(prev.get('大类排名', '0')) # 提取数字 curr_rank_match = re.search(r'\d+', curr_rank_str) prev_rank_match = re.search(r'\d+', prev_rank_str) if curr_rank_match and prev_rank_match: curr_rank = int(curr_rank_match.group()) prev_rank = int(prev_rank_match.group()) diff = prev_rank - curr_rank if diff == 0: current['rank_trend'] = "无变化" elif diff > 0: current['rank_trend'] = f"↑{diff}名" else: current['rank_trend'] = f"↓{abs(diff)}名" else: current['rank_trend'] = "无数据" except Exception: current['rank_trend'] = "计算错误" # 计算总分趋势 try: curr_score_str = str(current.get('总分', '0')) prev_score_str = str(prev.get('总分', '0')) # 提取数字(支持小数) curr_score_match = re.search(r'\d+(\.\d+)?', curr_score_str) prev_score_match = re.search(r'\d+(\.\d+)?', prev_score_str) if curr_score_match and prev_score_match: curr_score = float(curr_score_match.group()) prev_score = float(prev_score_match.group()) diff = curr_score - prev_score if diff == 0: current['score_trend'] = "无变化" elif diff > 0: current['score_trend'] = f"↑{diff:.1f}" else: current['score_trend'] = f"↓{abs(diff):.1f}" else: current['score_trend'] = "无数据" except Exception: current['score_trend'] = "计算错误" # 反转列表,使最新的考试排在前面 processed_records.reverse() return processed_records def timestamp_to_date(self, timestamp): """时间戳转日期""" try: if timestamp and timestamp > 0: dt = datetime.fromtimestamp(timestamp / 1000) return dt.strftime('%Y-%m-%d %H:%M:%S') return "未知时间" except: return "时间转换错误" def format_score_report(self, data): """格式化成绩报告""" if not data: return "无成绩数据" report = [] report.append("=" * 80) report.append(f"学生成绩查询报告") report.append("=" * 80) for i, record in enumerate(data, 1): report.append(f"\n📊 考试记录 {i}:") report.append("-" * 50) # 基本信息 report.append(f"考试名称: {record.get('考试名称', '未知')}") report.append(f"考试时间: {record.get('考试时间', '未知')}") report.append(f"学生姓名: {record.get('姓名', '未知')}") report.append(f"学号: {record.get('学号', '未知')}") # 成绩信息 report.append("\n📝 各科成绩:") subjects = ['语文', '数学', '英语', '物理', '化学', '生物', '历史', '政治', '地理'] for subject in subjects: score = record.get(subject, '无数据') rank = record.get(f"{subject}校次", '无数据') if score != '无数据' or rank != '无数据': report.append(f" {subject}: {score} (校次: {rank})") # 总分和排名 report.append("\n🏆 总分与排名:") report.append(f" 总分: {record.get('总分', '无数据')}") report.append(f" 大类排名: {record.get('大类排名', '无数据')}") report.append(f" 组合排名: {record.get('组合排名', '无数据')}") return "\n".join(report) def compute_rank_trend(self, records): """计算相比上次考试的组合排名变化。返回字符串如 '↓3名' 或 '↑2名' 或 '无变化'。""" try: # records 按时间倒序排列(最新的在最前面) if not records or len(records) < 2: return "无变化" # 提取最近两次的组合排名字段并尝试解析为整数 def parse_rank(rec): v = rec.get('大类排名') or rec.get('大类排名'.strip()) if not v: return None # 移除非数字字符 s = re.sub(r"[^0-9]", "", str(v)) return int(s) if s.isdigit() else None latest = parse_rank(records[0]) # 最新的在第一个 prev = parse_rank(records[1]) # 上一次的在第二个 if latest is None or prev is None: return "无数据" diff = prev - latest if diff == 0: return "无变化" if diff > 0: return f"↑{diff}名" return f"↓{abs(diff)}名" except Exception: return "计算错误" def _get_local_ip(self): """获取本机局域网IP地址""" try: import subprocess # 使用ifconfig获取所有IP地址 result = subprocess.run(['ifconfig'], capture_output=True, text=True) if result.returncode == 0: ips = [] lines = result.stdout.split('\n') for line in lines: if 'inet ' in line and '127.0.0.1' not in line and 'inet 169.254.' not in line: parts = line.strip().split() for i, part in enumerate(parts): if part == 'inet' and i + 1 < len(parts): ip = parts[i + 1] ips.append(ip) # 优先级:192.168.x.x > 10.x.x.x > 172.x.x.x > 其他 for ip in ips: if ip.startswith('192.168.'): return ip for ip in ips: if ip.startswith('10.'): return ip for ip in ips: if ip.startswith('172.'): return ip if ips: return ips[0] except Exception: # 备用方法 try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] s.close() return local_ip except Exception: pass return "127.0.0.1" def send_dot_notification(self, title, message, signature, icon=None): """发送到 dot.mindreset.tech 的通知请求。""" try: url = DOT_NOTIFY_URL headers = { "Authorization": f"Bearer {DOT_AUTH_TOKEN}", "Content-Type": "application/json" } # 获取本机局域网IP local_ip = self._get_local_ip() data = { "refreshNow": False, "deviceId": "E4B063CC56DC", "title": title, "message": message, "signature": signature, "icon": icon or DOT_ICON, "link": f"https://{local_ip}:1111" } resp = requests.post(url, json=data, headers=headers, timeout=10) try: return resp.json() except Exception: return {"status": "non-json-response", "code": resp.status_code} except Exception as e: return {"error": str(e)} # 全局客户端实例 client = DingTalkFormQueryClient() # 应用启动时加载学生数据 load_students_data() @app.route('/', methods=['GET']) def index(): return render_template('index.html', result=None) @app.route('/search_student', methods=['POST']) def search_student(): """根据姓名查找学生""" name = request.form.get('name', '').strip() if not name: return jsonify({"success": False, "message": "请输入学生姓名"}) if not STUDENTS_DATA: return jsonify({"success": False, "message": "学生数据未加载,请检查students.json文件"}) # 模糊匹配学生姓名(支持拼音) matched_students = [] name_lower = name.lower() for student in STUDENTS_DATA: student_name = student.get("学生姓名", "") if not student_name: continue # 1. 直接包含匹配 if name in student_name: match_type = "name" # 2. 拼音匹配 else: # 获取全拼 (e.g. "zhouyangguang") pinyin_full = p.get_pinyin(student_name, '').lower() # 获取首字母 (e.g. "zyg") pinyin_initials = p.get_initials(student_name, '').lower() if name_lower in pinyin_full or name_lower in pinyin_initials: match_type = "pinyin" else: match_type = None if match_type: # 直接显示完整身份证号,不进行掩码处理 id_number = student.get("学号", "") matched_students.append({ "姓名": student_name, "身份证号": id_number, "显示身份证号": id_number, "is_self": False }) if not matched_students: return jsonify({"success": False, "message": "未找到匹配的学生"}) # 添加调试信息 print(f"🔍 搜索姓名: {name}") print(f"📋 找到 {len(matched_students)} 个匹配的学生") return jsonify({"success": True, "students": matched_students}) @app.route('/compare') def compare_page(): """成绩比较页面""" return render_template('compare.html') @app.route('/api/get_student_scores', methods=['POST']) def get_student_scores(): """获取学生成绩数据API""" id_number = request.form.get('id_number') if not id_number: return jsonify({"success": False, "error": "身份证号不能为空"}) result = client.query_scores(id_number) return jsonify(result) @app.route('/query', methods=['POST']) def query(): id_number = request.form.get('id_number') result = None if not id_number: result = {"success": False, "error": "身份证号不能为空"} else: result = client.query_scores(id_number) # 如果返回成功且检测到周洋光,则计算趋势并发送通知 try: # 查询学生姓名是否为周洋光:在 students.json 中查找 student_name = None for s in STUDENTS_DATA: if s.get('学号') == id_number: student_name = s.get('学生姓名') break if student_name == '周洋光' and result.get('success') and result.get('data'): print("✅ 检测到周洋光,准备发送通知...") trend = client.compute_rank_trend(result.get('data')) # 取最新考试名称(列表已按倒序排列,取第一个) latest_exam = result.get('data')[0].get('考试名称', '') if result.get('data') else '' title = f"本次考试排名: {result.get('data')[0].get('大类排名','最新排名')}名" message = f"相比上次考试趋势: {trend}" signature = latest_exam send_resp = client.send_dot_notification(title, message, signature) print(f"通知发送结果: {send_resp}") # 将趋势返回给客户端 result['trend'] = trend except Exception as e: print(f"发送通知或计算趋势时出错: {e}") # 准备传递给模板的数据 template_data = { 'id_number_submitted': id_number, 'success': result.get('success'), 'error_message': result.get('error'), 'message': result.get('message'), 'scores_data': result.get('data'), 'trend': result.get('trend') # 添加趋势数据传递给模板 } # 传递 self_name,用于模板判断显示“本人”徽章 self_name = None for s in STUDENTS_DATA: if s.get('学号') == id_number: self_name = s.get('学生姓名') break return render_template('index.html', result=template_data, self_name=self_name) def main(): """主函数 - 改为启动Flask服务器""" # 确保 templates 文件夹存在 if not os.path.exists("templates"): os.makedirs("templates") print("创建 'templates' 文件夹。请将 'index.html' 文件放入其中。") # 提示信息 print("=" * 60) print("钉钉表单成绩查询工具 - Web版") print("=" * 60) print("请在浏览器中打开 http://127.0.0.1:1111/ 进行访问") print("注意:") print(f"1. 请确保在程序目录下有 {client.cookies_file} 文件且内容正确") print(f"2. 请确保在程序目录下有 {STUDENTS_FILE} 文件包含学生信息") print("3. 服务器日志会显示在此控制台") print("=" * 60) app.run(host='0.0.0.0', port=1111, debug=True) if __name__ == "__main__": main()