Spaces:
Sleeping
Sleeping
File size: 5,313 Bytes
84ed1d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# '''
# Description:
# Author: Manda
# Version:
# Date: 2025-03-30 16:42:47
# LastEditors: mdhuang555 67590178+mdhuang555@users.noreply.github.com
# LastEditTime: 2025-03-30 16:59:19
# '''
import mysql.connector
import os
from datetime import datetime
from collections import defaultdict
from dataBaseConnecter import DatabaseConnector
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def connect_to_database(db_config: dict) -> mysql.connector.MySQLConnection:
"""连接到MySQL数据库"""
try:
conn = mysql.connector.connect(
host=db_config['host'],
user=db_config['user'],
password=db_config['password'],
database=db_config['database'],
charset='utf8mb4'
)
return conn
except Exception as e:
print(f"数据库连接错误: {e}")
return None
def get_time_slot(hour: int, minute: int) -> str:
"""将时间转换为40分钟一段的时间段"""
# 计算一天中的第几个40分钟
total_minutes = hour * 60 + minute
slot_index = total_minutes // 40
# 计算时间段的起始和结束时间
start_minutes = slot_index * 40
end_minutes = start_minutes + 40
start_hour = start_minutes // 60
start_minute = start_minutes % 60
end_hour = end_minutes // 60
end_minute = end_minutes % 60
# 格式化时间段字符串
return f"{start_hour:02d}:{start_minute:02d}-{end_hour:02d}:{end_minute:02d}"
def analyze_time_slots(db_connector: DatabaseConnector) -> dict:
"""分析时间段分布"""
try:
# 连接数据库
conn = db_connector.connect_db()
if not conn:
print("无法连接到数据库")
return {}
cursor = conn.cursor(dictionary=True)
# 获取UCtodolist的数据和对应的ToDoList用户ID
query = """
SELECT uc.todo_id, uc.last_modified, t.user_id
FROM UCtodolist uc
JOIN ToDoList t ON uc.todo_id = t.todo_id
WHERE uc.last_modified IS NOT NULL
"""
cursor.execute(query)
results = cursor.fetchall()
# 按用户ID分组统计时间段
user_time_slots = defaultdict(lambda: defaultdict(int))
for row in results:
if isinstance(row['last_modified'], datetime):
hour = row['last_modified'].hour
minute = row['last_modified'].minute
time_slot = get_time_slot(hour, minute)
user_time_slots[row['user_id']][time_slot] += 1
return dict(user_time_slots)
except Exception as e:
print(f"分析时间段时出错: {e}")
return {}
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn:
conn.close()
def save_analysis_results(results: dict, output_dir: str = 'time_analysis'):
"""保存分析结果到文件"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for user_id, time_slots in results.items():
filename = os.path.join(output_dir, f'user_{user_id}_time_analysis.txt')
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"用户ID: {user_id}\n")
f.write("=" * 50 + "\n\n")
f.write("时间段使用频率统计(前6名):\n")
# 按频率排序并获取前6个时段
top_slots = sorted(time_slots.items(), key=lambda x: x[1], reverse=True)[:6]
for i, (slot, count) in enumerate(top_slots, 1):
f.write(f"第{i}名: {slot}\n")
f.write(f" 出现次数: {count}\n")
percentage = (count / sum(time_slots.values())) * 100
f.write(f" 占比: {percentage:.2f}%\n")
f.write("-" * 30 + "\n")
# 添加总计信息
f.write(f"\n总修改次数: {sum(time_slots.values())}\n")
f.write(f"总时间段数: {len(time_slots)}/36\n")
print(f"已保存用户 {user_id} 的时间分析到文件: {filename}")
except Exception as e:
print(f"保存用户 {user_id} 的分析结果时出错: {e}")
def main():
print("正在连接数据库...")
try:
# 创建数据库连接器实例
db_connector = DatabaseConnector()
print("正在分析时间段分布...")
results = analyze_time_slots(db_connector)
if results:
print(f"分析完成,共有 {len(results)} 个用户的数据")
save_analysis_results(results)
print("分析结果已保存到文件中")
else:
print("未找到可分析的数据")
except Exception as e:
print(f"处理过程中出错: {e}")
if __name__ == "__main__":
main() |