ToDoAgent / Notify /usrSpareTime.py
Siyu Wang
updated to KK_Server
84ed1d1
# '''
# Description:
# Author: Manda
# Version:
# Date: 2025-03-30 16:42:47
# LastEditors: mdhuang555 67590178+mdhuang555@users.noreply.github.com
# LastEditTime: 2025-03-30 16:59:19
# '''
import mysql.connector
import os
from datetime import datetime
from collections import defaultdict
from dataBaseConnecter import DatabaseConnector
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def connect_to_database(db_config: dict) -> mysql.connector.MySQLConnection:
"""连接到MySQL数据库"""
try:
conn = mysql.connector.connect(
host=db_config['host'],
user=db_config['user'],
password=db_config['password'],
database=db_config['database'],
charset='utf8mb4'
)
return conn
except Exception as e:
print(f"数据库连接错误: {e}")
return None
def get_time_slot(hour: int, minute: int) -> str:
"""将时间转换为40分钟一段的时间段"""
# 计算一天中的第几个40分钟
total_minutes = hour * 60 + minute
slot_index = total_minutes // 40
# 计算时间段的起始和结束时间
start_minutes = slot_index * 40
end_minutes = start_minutes + 40
start_hour = start_minutes // 60
start_minute = start_minutes % 60
end_hour = end_minutes // 60
end_minute = end_minutes % 60
# 格式化时间段字符串
return f"{start_hour:02d}:{start_minute:02d}-{end_hour:02d}:{end_minute:02d}"
def analyze_time_slots(db_connector: DatabaseConnector) -> dict:
"""分析时间段分布"""
try:
# 连接数据库
conn = db_connector.connect_db()
if not conn:
print("无法连接到数据库")
return {}
cursor = conn.cursor(dictionary=True)
# 获取UCtodolist的数据和对应的ToDoList用户ID
query = """
SELECT uc.todo_id, uc.last_modified, t.user_id
FROM UCtodolist uc
JOIN ToDoList t ON uc.todo_id = t.todo_id
WHERE uc.last_modified IS NOT NULL
"""
cursor.execute(query)
results = cursor.fetchall()
# 按用户ID分组统计时间段
user_time_slots = defaultdict(lambda: defaultdict(int))
for row in results:
if isinstance(row['last_modified'], datetime):
hour = row['last_modified'].hour
minute = row['last_modified'].minute
time_slot = get_time_slot(hour, minute)
user_time_slots[row['user_id']][time_slot] += 1
return dict(user_time_slots)
except Exception as e:
print(f"分析时间段时出错: {e}")
return {}
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn:
conn.close()
def save_analysis_results(results: dict, output_dir: str = 'time_analysis'):
"""保存分析结果到文件"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for user_id, time_slots in results.items():
filename = os.path.join(output_dir, f'user_{user_id}_time_analysis.txt')
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"用户ID: {user_id}\n")
f.write("=" * 50 + "\n\n")
f.write("时间段使用频率统计(前6名):\n")
# 按频率排序并获取前6个时段
top_slots = sorted(time_slots.items(), key=lambda x: x[1], reverse=True)[:6]
for i, (slot, count) in enumerate(top_slots, 1):
f.write(f"第{i}名: {slot}\n")
f.write(f" 出现次数: {count}\n")
percentage = (count / sum(time_slots.values())) * 100
f.write(f" 占比: {percentage:.2f}%\n")
f.write("-" * 30 + "\n")
# 添加总计信息
f.write(f"\n总修改次数: {sum(time_slots.values())}\n")
f.write(f"总时间段数: {len(time_slots)}/36\n")
print(f"已保存用户 {user_id} 的时间分析到文件: {filename}")
except Exception as e:
print(f"保存用户 {user_id} 的分析结果时出错: {e}")
def main():
print("正在连接数据库...")
try:
# 创建数据库连接器实例
db_connector = DatabaseConnector()
print("正在分析时间段分布...")
results = analyze_time_slots(db_connector)
if results:
print(f"分析完成,共有 {len(results)} 个用户的数据")
save_analysis_results(results)
print("分析结果已保存到文件中")
else:
print("未找到可分析的数据")
except Exception as e:
print(f"处理过程中出错: {e}")
if __name__ == "__main__":
main()