Spaces:
Running
Running
File size: 30,061 Bytes
a5bcb12 cdd8c08 a5bcb12 cdd8c08 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 648651f 6bb04a4 648651f a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 648651f 6bb04a4 cdd8c08 f2b0f1f a5bcb12 f2b0f1f a5bcb12 f2b0f1f a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 cdd8c08 a5bcb12 cdd8c08 a5bcb12 cdd8c08 a5bcb12 cdd8c08 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 4514b82 a5bcb12 f2b0f1f a5bcb12 4514b82 a5bcb12 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 |
# https://ynsdfz.aliwork.com/APP_DKQG7TSUK6ZNU3RDFGVO/workbench?corpid=dingd16cf4422967594bf2c783f7214b6d69&dd_addcookie=true&from_login=success&login_host=ynsdfz.aliwork.com&login_pt=4-296-307-385&dtcode=e1bec99a908e374fa77d5e8a06109972&code_type=jsapi&dd_enable_replace=true&ddtab=true
import json
import requests
import os
import re
import subprocess
from datetime import datetime
from flask import Flask, render_template, request, jsonify
import socket
from xpinyin import Pinyin
app = Flask(__name__)
p = Pinyin()
STUDENTS_DATA = []
STUDENTS_FILE = "students.json"
DOT_NOTIFY_URL = "https://dot.mindreset.tech/api/open/text"
DOT_AUTH_TOKEN = "dot_app_XdfKhLlhOiyNWfSepKUytXlxwBfERNkiFZzYYkRrLOeXVjOIacBEjhqJDedXNKkw"
DOT_ICON = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACgAAAAoCAYAAACM/rhtAAAEj0lEQVR4nO2YXYhVVRTHf/fOTI2Og2apo6IDpZNFZkUfplRYPURRCX0oigZWlPQgEUlRhhVBUA+h9ZAWRBJFX2IPfuLLJGlFYkb6YGWNIPYxpKmYOjO3h/Vfs/c9c+49Z65jQcyCyz37Y/33f6+99lrrHBiU/7kUBgijmILVo99/JgWgLmNOMceczEVqkSLBOk3AlcBUoBnoBg4AO4EfUuafdXGLDAeWA38ApQq/zcDNml9gYFwqF7kZwH6R+AZYAkwHLsIsOQf4ICK6guCnZ41kUf83AieBY8DcDJ2pwOcYyTUEvx1wkr77FuBX7Fgvi8bqtXCRcDHqNX4usFYkn1XfGV2cNHHANVpoltrnZOg5yWHAPuBv4EL1FVM1ahAHmiByn6rdkFPfSd4n/VcS/WcsDdjxPqoFbiJfDHTxizEE+B34WeTSgnsfqWTmAsGvukXsWqAL2C29Uk6CJeGcAr4AWoGRhLhYR5WLk0awTqBdIue7n4BdkD/VX0k/Da9HOt8JuxUL8CX1uxEy8XxCM/AgsAn4Ddt9CQstq7FQExOoRg6gEZgH7CLExi6gA3gfuCMPnpObj/lKSf/rgVWE3ftvI9BWBdT77gR+ifROCW81duTe/xUW7PvgxQ7/qibvAm6n/KbO0dgi4EU9H8EuTRLUn5/UvD3AvUAnsDexkRbgmYjowiSePyzXhJWJxer1GwWcwCwKcAtwGDhKCNp1ke5jwntP7XvUXqY5jZT73OWE9HlrkuR1GnhXbb/FhcTElwlWBPPFbqxyiUPHFM3bpHljMOsdAUYTakhfy0+qTZs+iBUjvTd7mwCaCOkqFgdsAr4Xqbs0tkxk5kYbWYf52jDgPIKvPZC0TCSelR7W3KU+cCnB9FA5wjvpydjtKwFPAedjuXm7xkcSMsbVWE1YAl6oQg6CERqAQ8CPPvdxAVxCuemrkWwF2qV3ELP+aeAG4Gn1/6T/k8AjGeRc3DirpHsxwDtYEh8a7aSaxL6zEKsHT2PBOI5xncAbWI0Y62UR9LTag0USPsaOKF44S5KLPSfAJ4CPRHhsNJ43b7sF52EbnV3EUtdw7ALklR5sI42U3/jXgS3RQl4XdqdgVJICFtIAjhWBrwXSRrYPxuJZwSucLj0f0PhV9P9FyV3kerX3AUwkvDdA/jrNw9E4Ed2s/gvU/qSfeG6cZuAvzLd73W0tdtvG9wPU49ab2AZvi8beUt+0GvCel+6CmHkbdkTthKjeQHrAjt837hfYBrW9rhuPWWEvMCKBF19Cx/M1ZxHeFD0z9d6whwjvsi0JYmnV7yJCvBtLOCLH89y7EwvuWXizsVPsRPGPyEAOuligR7FKZGICZChWJKwnVCmTkmAR3gKC47+E5eiYWCMwE/hQczqwyj2JVwY6A9gRAXcAX2L14PGo/zUsPKWCRXhXYKHH9Q5htd+3mBt4/9tYURHr9jGzv4OApa27taMxWLbZD2wFPsOKUCdXKZzEeNdgBcZ0rKLxbzjtWHGR+ztO3jiY9wtBrre3SnjVFF3Bc2y8WC3f/hzPj9TXrxVvUAblX5F/AN1tJCuIaUiQAAAAAElFTkSuQmCC"
def load_students_data():
global STUDENTS_DATA
try:
if os.path.exists(STUDENTS_FILE):
with open(STUDENTS_FILE, 'r', encoding='utf-8') as f:
STUDENTS_DATA = json.load(f)
print(f"✅ 成功加载学生数据,共 {len(STUDENTS_DATA)} 条记录")
else:
print(f"⚠️ 学生数据文件 {STUDENTS_FILE} 不存在")
STUDENTS_DATA = []
except Exception as e:
print(f"❌ 加载学生数据失败: {e}")
STUDENTS_DATA = []
class DingTalkFormQueryClient:
def __init__(self):
self.base_url = "https://ynsdfz.aliwork.com/dingtalk/web/APP_DKQG7TSUK6ZNU3RDFGVO/v1/form/searchFormDatas.json"
self.form_uuid = "FORM-72EF2444A3564C6D8020F1CDC36487EDHWE1"
self.cookies_file = "cookies.txt"
# 字段映射表
self.field_mapping = {
# 基本信息
'textField_mhsv4lgl': '姓名', # 新版表单
'radioField_ts1jbr0': '姓名', # 旧版表单
'textField_lqrf9fj': '学号',
'textField_sm64ysf': '考试名称',
'textField_mhsv4lgk': '身份证号', # 新版表单
'radioField_8a4voiy': '身份证号', # 旧版表单
# 语文
'textField_4ecclpd': '语文',
'textField_jqsfnel': '语文校次',
# 数学
'textField_djh9jbz': '数学',
'textField_sm6gl9b': '数学校次',
# 英语
'textField_s5awu09': '英语',
'textField_yrtoxm6': '英语校次',
# 物理
'textField_j7nxew8': '物理',
'textField_8iavn0r': '物理校次',
# 历史
'textField_7vh26my': '历史',
'textField_3nigx7n': '历史校次',
# 化学
'textField_d9v8hqi': '化学',
'textField_eecpqqn': '化学校次',
# 生物
'textField_6mk8ivr': '生物',
'textField_gjcqbzz': '生物校次',
# 政治
'textField_mqmwhoa': '政治',
'textField_q60j1vs': '政治校次',
# 地理
'textField_5iiw6pt': '地理',
'textField_xshus0b': '地理校次',
# 总分和排名
'textField_4bphzli': '总分',
'textField_y4uc1lx': '组合排名',
'textField_a2l7vuc': '大类排名',
}
# 查询身份证号时优先尝试的字段顺序(兼容新旧表单)
self.id_field_priority = [
'textField_mhsv4lgk', # 新版
'radioField_8a4voiy', # 旧版
]
# 考试排序列表(时间正序)
self.exam_order = [
"26届高一上中",
"26届高一上末",
"26届高一下中",
"26届高一下末",
"26届高二上中",
"26届高二上末",
"26届高二下中",
"26届高二下末",
"26届高二月考1",
"26届高三月考2",
"26届高三月考3",
"26届高三月考4"
]
def load_cookies_from_file(self):
"""从文件加载cookies"""
if not os.path.exists(self.cookies_file):
# 在Web应用中,打印到控制台可能不是最佳选择,但为了简单起见,暂时保留
print(f"❌ Cookie文件 {self.cookies_file} 不存在")
return None
try:
with open(self.cookies_file, 'r', encoding='utf-8') as f:
cookie_string = f.read().strip()
if not cookie_string:
print(f"❌ Cookie文件 {self.cookies_file} 为空")
return None
cookies = {}
for item in cookie_string.split(';'):
if '=' in item:
key, value = item.strip().split('=', 1)
cookies[key.strip()] = value.strip()
print(f"✅ 成功从 {self.cookies_file} 加载cookies") # 控制台日志
return cookies
except Exception as e:
print(f"❌ 读取cookie文件失败: {e}") # 控制台日志
return None
def extract_csrf_token(self, cookies):
"""从cookies中提取CSRF令牌"""
csrf_token = cookies.get('tianshu_csrf_token')
if not csrf_token:
print("❌ 在cookies中未找到tianshu_csrf_token") # 控制台日志
return None
return csrf_token
def validate_id_number(self, id_number):
"""验证身份证号格式"""
pattern = r'^\d{17}[\dXx]$'
if not re.match(pattern, id_number):
return False
return True
def build_request_url(self, id_number, csrf_token, search_field_id):
"""构建请求URL"""
if not search_field_id:
raise ValueError("search_field_id 不能为空")
search_json = json.dumps({search_field_id: id_number})
params = {
'formUuid': self.form_uuid,
'searchFieldJson': search_json,
'currentPage': 1,
'pageSize': 30,
'_csrf_token': csrf_token
}
param_string = '&'.join([f"{k}={requests.utils.quote(str(v))}" for k, v in params.items()])
return f"{self.base_url}?{param_string}"
def query_scores(self, id_number):
"""查询成绩 - 修复版本"""
# 验证身份证号
if not self.validate_id_number(id_number):
return {"success": False, "error": "身份证号格式不正确,请输入18位身份证号"}
# 加载cookies
cookies = self.load_cookies_from_file()
if not cookies:
return {"success": False, "error": f"无法加载cookies,请检查服务器端的 {self.cookies_file} 文件"}
# 提取CSRF令牌
csrf_token = self.extract_csrf_token(cookies)
if not csrf_token:
return {"success": False, "error": "无法提取CSRF令牌,请检查cookie中的tianshu_csrf_token"}
headers = {
'Accept': 'application/json, text/json',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'Referer': 'https://ynsdfz.aliwork.com/APP_DKQG7TSUK6ZNU3RDFGVO/workbench',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest'
}
last_success_empty = None
for search_field in self.id_field_priority:
result = self._perform_query(
id_number=id_number,
cookies=cookies,
csrf_token=csrf_token,
headers=headers,
search_field=search_field
)
if not result.get("success", False):
# 直接返回错误信息
return result
if result.get("data"):
if search_field != self.id_field_priority[0]:
print(f"ℹ️ 通过备用字段 {search_field} 匹配到成绩数据")
return result
last_success_empty = result
# 所有字段都无数据,返回最后一次成功但无记录的结果或通用提示
if last_success_empty:
return last_success_empty
return {"success": True, "data": [], "message": "未找到相关成绩记录"}
def _perform_query(self, id_number, cookies, csrf_token, headers, search_field):
"""执行实际的HTTP查询,并统一处理异常"""
try:
url = self.build_request_url(id_number, csrf_token, search_field)
except ValueError as e:
return {"success": False, "error": str(e)}
try:
print(f"🔍 正在使用字段 {search_field} 查询身份证号: {id_number}") # 控制台日志
response = requests.get(
url,
headers=headers,
cookies=cookies,
timeout=(10, 30)
)
print(f"📡 HTTP状态码: {response.status_code}") # 控制台日志
if response.status_code != 200:
return {"success": False, "error": f"HTTP请求失败,状态码: {response.status_code}"}
try:
data = response.json()
print(f"📦 收到响应数据") # 控制台日志
except json.JSONDecodeError as e:
print(f"❌ JSON解析失败: {e}") # 控制台日志
return {"success": False, "error": "服务器返回的不是有效的JSON格式"}
parsed = self._parse_response(data)
if parsed.get("success") and not parsed.get("data"):
parsed.setdefault("message", "未找到相关成绩记录")
return parsed
except requests.exceptions.Timeout:
return {"success": False, "error": "请求超时,请检查网络连接"}
except requests.exceptions.ConnectionError:
return {"success": False, "error": "网络连接错误,请检查网络状态"}
except Exception as e:
return {"success": False, "error": f"请求失败: {e}"}
def _parse_response(self, data):
"""解析服务器响应 - 修复核心逻辑"""
try:
if not isinstance(data, dict):
return {"success": False, "error": "服务器返回的数据格式错误:不是字典类型"}
print(f"📋 响应字段: {list(data.keys())}") # 控制台日志
success = data.get('success')
if success is False:
error_msg = data.get('errorMsg', '未知API错误')
return {"success": False, "error": f"API返回错误: {error_msg}"}
if success is not True:
print(f"⚠️ success字段值: {success}") # 控制台日志
content = data.get('content')
if not content:
return {"success": False, "error": "响应中缺少content字段"}
if not isinstance(content, dict):
return {"success": False, "error": "content字段格式错误"}
records = content.get('data')
if records is None: # 可能表示没有数据,但请求本身是成功的
return {"success": True, "data": [], "message": "未找到相关成绩记录"}
if not isinstance(records, list):
return {"success": False, "error": "data字段不是数组格式"}
if len(records) == 0:
return {"success": True, "data": [], "message": "未找到相关成绩记录"}
print(f"📊 找到 {len(records)} 条成绩记录") # 控制台日志
processed_data = self.process_score_data(records)
return {"success": True, "data": processed_data}
except Exception as e:
return {"success": False, "error": f"解析响应数据时出错: {e}"}
def process_score_data(self, records):
"""处理成绩数据"""
processed_records = []
for i, record in enumerate(records):
try:
# print(f"处理第 {i+1} 条记录...") # 控制台日志,可选择性保留
form_data = record.get('formData', {})
if not form_data:
# print(f"⚠️ 第 {i+1} 条记录缺少formData") # 控制台日志
continue
basic_info = {
'考试时间': self.timestamp_to_date(record.get('gmtCreate', 0)),
'标题': record.get('title', ''),
'表单ID': record.get('formInstId', '')
}
mapped_data = {}
for field_id, value in form_data.items():
field_name = self.field_mapping.get(field_id, field_id)
if value == "" or value == "-" or value is None:
mapped_data[field_name] = "无数据"
else:
mapped_data[field_name] = str(value)
processed_record = {**basic_info, **mapped_data}
processed_records.append(processed_record)
except Exception as e:
# print(f"⚠️ 处理第 {i+1} 条记录时出错: {e}") # 控制台日志
continue
# 对成绩记录进行排序
def get_sort_key(record):
exam_name = record.get('考试名称', '')
try:
return self.exam_order.index(exam_name)
except ValueError:
# 如果不在列表中,排在最后
return len(self.exam_order) + 1
processed_records.sort(key=get_sort_key)
# 为每条记录添加排序索引,方便前端排序
for i, record in enumerate(processed_records):
record['sort_index'] = i
# 计算每次考试的趋势(相比上一次)
for i in range(len(processed_records)):
current = processed_records[i]
if i == 0:
current['rank_trend'] = "无变化"
current['score_trend'] = "N/A"
else:
prev = processed_records[i-1]
# 计算排名趋势
try:
curr_rank_str = str(current.get('大类排名', '0'))
prev_rank_str = str(prev.get('大类排名', '0'))
# 提取数字
curr_rank_match = re.search(r'\d+', curr_rank_str)
prev_rank_match = re.search(r'\d+', prev_rank_str)
if curr_rank_match and prev_rank_match:
curr_rank = int(curr_rank_match.group())
prev_rank = int(prev_rank_match.group())
diff = prev_rank - curr_rank
if diff == 0:
current['rank_trend'] = "无变化"
elif diff > 0:
current['rank_trend'] = f"↑{diff}名"
else:
current['rank_trend'] = f"↓{abs(diff)}名"
else:
current['rank_trend'] = "无数据"
except Exception:
current['rank_trend'] = "计算错误"
# 计算总分趋势
try:
curr_score_str = str(current.get('总分', '0'))
prev_score_str = str(prev.get('总分', '0'))
# 提取数字(支持小数)
curr_score_match = re.search(r'\d+(\.\d+)?', curr_score_str)
prev_score_match = re.search(r'\d+(\.\d+)?', prev_score_str)
if curr_score_match and prev_score_match:
curr_score = float(curr_score_match.group())
prev_score = float(prev_score_match.group())
diff = curr_score - prev_score
if diff == 0:
current['score_trend'] = "无变化"
elif diff > 0:
current['score_trend'] = f"↑{diff:.1f}"
else:
current['score_trend'] = f"↓{abs(diff):.1f}"
else:
current['score_trend'] = "无数据"
except Exception:
current['score_trend'] = "计算错误"
# 反转列表,使最新的考试排在前面
processed_records.reverse()
return processed_records
def timestamp_to_date(self, timestamp):
"""时间戳转日期"""
try:
if timestamp and timestamp > 0:
dt = datetime.fromtimestamp(timestamp / 1000)
return dt.strftime('%Y-%m-%d %H:%M:%S')
return "未知时间"
except:
return "时间转换错误"
def format_score_report(self, data):
"""格式化成绩报告"""
if not data:
return "无成绩数据"
report = []
report.append("=" * 80)
report.append(f"学生成绩查询报告")
report.append("=" * 80)
for i, record in enumerate(data, 1):
report.append(f"\n📊 考试记录 {i}:")
report.append("-" * 50)
# 基本信息
report.append(f"考试名称: {record.get('考试名称', '未知')}")
report.append(f"考试时间: {record.get('考试时间', '未知')}")
report.append(f"学生姓名: {record.get('姓名', '未知')}")
report.append(f"学号: {record.get('学号', '未知')}")
# 成绩信息
report.append("\n📝 各科成绩:")
subjects = ['语文', '数学', '英语', '物理', '化学', '生物', '历史', '政治', '地理']
for subject in subjects:
score = record.get(subject, '无数据')
rank = record.get(f"{subject}校次", '无数据')
if score != '无数据' or rank != '无数据':
report.append(f" {subject}: {score} (校次: {rank})")
# 总分和排名
report.append("\n🏆 总分与排名:")
report.append(f" 总分: {record.get('总分', '无数据')}")
report.append(f" 大类排名: {record.get('大类排名', '无数据')}")
report.append(f" 组合排名: {record.get('组合排名', '无数据')}")
return "\n".join(report)
def compute_rank_trend(self, records):
"""计算相比上次考试的组合排名变化。返回字符串如 '↓3名' 或 '↑2名' 或 '无变化'。"""
try:
# records 按时间倒序排列(最新的在最前面)
if not records or len(records) < 2:
return "无变化"
# 提取最近两次的组合排名字段并尝试解析为整数
def parse_rank(rec):
v = rec.get('大类排名') or rec.get('大类排名'.strip())
if not v:
return None
# 移除非数字字符
s = re.sub(r"[^0-9]", "", str(v))
return int(s) if s.isdigit() else None
latest = parse_rank(records[0]) # 最新的在第一个
prev = parse_rank(records[1]) # 上一次的在第二个
if latest is None or prev is None:
return "无数据"
diff = prev - latest
if diff == 0:
return "无变化"
if diff > 0:
return f"↑{diff}名"
return f"↓{abs(diff)}名"
except Exception:
return "计算错误"
def _get_local_ip(self):
"""获取本机局域网IP地址"""
try:
import subprocess
# 使用ifconfig获取所有IP地址
result = subprocess.run(['ifconfig'], capture_output=True, text=True)
if result.returncode == 0:
ips = []
lines = result.stdout.split('\n')
for line in lines:
if 'inet ' in line and '127.0.0.1' not in line and 'inet 169.254.' not in line:
parts = line.strip().split()
for i, part in enumerate(parts):
if part == 'inet' and i + 1 < len(parts):
ip = parts[i + 1]
ips.append(ip)
# 优先级:192.168.x.x > 10.x.x.x > 172.x.x.x > 其他
for ip in ips:
if ip.startswith('192.168.'):
return ip
for ip in ips:
if ip.startswith('10.'):
return ip
for ip in ips:
if ip.startswith('172.'):
return ip
if ips:
return ips[0]
except Exception:
# 备用方法
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except Exception:
pass
return "127.0.0.1"
def send_dot_notification(self, title, message, signature, icon=None):
"""发送到 dot.mindreset.tech 的通知请求。"""
try:
url = DOT_NOTIFY_URL
headers = {
"Authorization": f"Bearer {DOT_AUTH_TOKEN}",
"Content-Type": "application/json"
}
# 获取本机局域网IP
local_ip = self._get_local_ip()
data = {
"refreshNow": False,
"deviceId": "E4B063CC56DC",
"title": title,
"message": message,
"signature": signature,
"icon": icon or DOT_ICON,
"link": f"https://{local_ip}:1111"
}
resp = requests.post(url, json=data, headers=headers, timeout=10)
try:
return resp.json()
except Exception:
return {"status": "non-json-response", "code": resp.status_code}
except Exception as e:
return {"error": str(e)}
# 全局客户端实例
client = DingTalkFormQueryClient()
# 应用启动时加载学生数据
load_students_data()
@app.route('/', methods=['GET'])
def index():
return render_template('index.html', result=None)
@app.route('/search_student', methods=['POST'])
def search_student():
"""根据姓名查找学生"""
name = request.form.get('name', '').strip()
if not name:
return jsonify({"success": False, "message": "请输入学生姓名"})
if not STUDENTS_DATA:
return jsonify({"success": False, "message": "学生数据未加载,请检查students.json文件"})
# 模糊匹配学生姓名(支持拼音)
matched_students = []
name_lower = name.lower()
for student in STUDENTS_DATA:
student_name = student.get("学生姓名", "")
if not student_name:
continue
# 1. 直接包含匹配
if name in student_name:
match_type = "name"
# 2. 拼音匹配
else:
# 获取全拼 (e.g. "zhouyangguang")
pinyin_full = p.get_pinyin(student_name, '').lower()
# 获取首字母 (e.g. "zyg")
pinyin_initials = p.get_initials(student_name, '').lower()
if name_lower in pinyin_full or name_lower in pinyin_initials:
match_type = "pinyin"
else:
match_type = None
if match_type:
# 直接显示完整身份证号,不进行掩码处理
id_number = student.get("学号", "")
matched_students.append({
"姓名": student_name,
"身份证号": id_number,
"显示身份证号": id_number,
"is_self": False
})
if not matched_students:
return jsonify({"success": False, "message": "未找到匹配的学生"})
# 添加调试信息
print(f"🔍 搜索姓名: {name}")
print(f"📋 找到 {len(matched_students)} 个匹配的学生")
return jsonify({"success": True, "students": matched_students})
@app.route('/compare')
def compare_page():
"""成绩比较页面"""
return render_template('compare.html')
@app.route('/api/get_student_scores', methods=['POST'])
def get_student_scores():
"""获取学生成绩数据API"""
id_number = request.form.get('id_number')
if not id_number:
return jsonify({"success": False, "error": "身份证号不能为空"})
result = client.query_scores(id_number)
return jsonify(result)
@app.route('/query', methods=['POST'])
def query():
id_number = request.form.get('id_number')
result = None
if not id_number:
result = {"success": False, "error": "身份证号不能为空"}
else:
result = client.query_scores(id_number)
# 如果返回成功且检测到周洋光,则计算趋势并发送通知
try:
# 查询学生姓名是否为周洋光:在 students.json 中查找
student_name = None
for s in STUDENTS_DATA:
if s.get('学号') == id_number:
student_name = s.get('学生姓名')
break
if student_name == '周洋光' and result.get('success') and result.get('data'):
print("✅ 检测到周洋光,准备发送通知...")
trend = client.compute_rank_trend(result.get('data'))
# 取最新考试名称(列表已按倒序排列,取第一个)
latest_exam = result.get('data')[0].get('考试名称', '') if result.get('data') else ''
title = f"本次考试排名: {result.get('data')[0].get('大类排名','最新排名')}名"
message = f"相比上次考试趋势: {trend}"
signature = latest_exam
send_resp = client.send_dot_notification(title, message, signature)
print(f"通知发送结果: {send_resp}")
# 将趋势返回给客户端
result['trend'] = trend
except Exception as e:
print(f"发送通知或计算趋势时出错: {e}")
# 准备传递给模板的数据
template_data = {
'id_number_submitted': id_number,
'success': result.get('success'),
'error_message': result.get('error'),
'message': result.get('message'),
'scores_data': result.get('data'),
'trend': result.get('trend') # 添加趋势数据传递给模板
}
# 传递 self_name,用于模板判断显示“本人”徽章
self_name = None
for s in STUDENTS_DATA:
if s.get('学号') == id_number:
self_name = s.get('学生姓名')
break
return render_template('index.html', result=template_data, self_name=self_name)
def main():
"""主函数 - 改为启动Flask服务器"""
# 确保 templates 文件夹存在
if not os.path.exists("templates"):
os.makedirs("templates")
print("创建 'templates' 文件夹。请将 'index.html' 文件放入其中。")
# 提示信息
print("=" * 60)
print("钉钉表单成绩查询工具 - Web版")
print("=" * 60)
print("请在浏览器中打开 http://127.0.0.1:1111/ 进行访问")
print("注意:")
print(f"1. 请确保在程序目录下有 {client.cookies_file} 文件且内容正确")
print(f"2. 请确保在程序目录下有 {STUDENTS_FILE} 文件包含学生信息")
print("3. 服务器日志会显示在此控制台")
print("=" * 60)
app.run(host='0.0.0.0', port=1111, debug=True)
if __name__ == "__main__":
main()
|