Spaces:
Sleeping
Sleeping
| # ''' | |
| # Description: | |
| # Author: Manda | |
| # Version: | |
| # Date: 2025-03-30 16:42:47 | |
| # LastEditors: mdhuang555 67590178+mdhuang555@users.noreply.github.com | |
| # LastEditTime: 2025-03-30 16:59:19 | |
| # ''' | |
| import mysql.connector | |
| import os | |
| from datetime import datetime | |
| from collections import defaultdict | |
| from dataBaseConnecter import DatabaseConnector | |
| import sys | |
| import io | |
| sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') | |
| def connect_to_database(db_config: dict) -> mysql.connector.MySQLConnection: | |
| """连接到MySQL数据库""" | |
| try: | |
| conn = mysql.connector.connect( | |
| host=db_config['host'], | |
| user=db_config['user'], | |
| password=db_config['password'], | |
| database=db_config['database'], | |
| charset='utf8mb4' | |
| ) | |
| return conn | |
| except Exception as e: | |
| print(f"数据库连接错误: {e}") | |
| return None | |
| def get_time_slot(hour: int, minute: int) -> str: | |
| """将时间转换为40分钟一段的时间段""" | |
| # 计算一天中的第几个40分钟 | |
| total_minutes = hour * 60 + minute | |
| slot_index = total_minutes // 40 | |
| # 计算时间段的起始和结束时间 | |
| start_minutes = slot_index * 40 | |
| end_minutes = start_minutes + 40 | |
| start_hour = start_minutes // 60 | |
| start_minute = start_minutes % 60 | |
| end_hour = end_minutes // 60 | |
| end_minute = end_minutes % 60 | |
| # 格式化时间段字符串 | |
| return f"{start_hour:02d}:{start_minute:02d}-{end_hour:02d}:{end_minute:02d}" | |
| def analyze_time_slots(db_connector: DatabaseConnector) -> dict: | |
| """分析时间段分布""" | |
| try: | |
| # 连接数据库 | |
| conn = db_connector.connect_db() | |
| if not conn: | |
| print("无法连接到数据库") | |
| return {} | |
| cursor = conn.cursor(dictionary=True) | |
| # 获取UCtodolist的数据和对应的ToDoList用户ID | |
| query = """ | |
| SELECT uc.todo_id, uc.last_modified, t.user_id | |
| FROM UCtodolist uc | |
| JOIN ToDoList t ON uc.todo_id = t.todo_id | |
| WHERE uc.last_modified IS NOT NULL | |
| """ | |
| cursor.execute(query) | |
| results = cursor.fetchall() | |
| # 按用户ID分组统计时间段 | |
| user_time_slots = defaultdict(lambda: defaultdict(int)) | |
| for row in results: | |
| if isinstance(row['last_modified'], datetime): | |
| hour = row['last_modified'].hour | |
| minute = row['last_modified'].minute | |
| time_slot = get_time_slot(hour, minute) | |
| user_time_slots[row['user_id']][time_slot] += 1 | |
| return dict(user_time_slots) | |
| except Exception as e: | |
| print(f"分析时间段时出错: {e}") | |
| return {} | |
| finally: | |
| if 'cursor' in locals(): | |
| cursor.close() | |
| if 'conn' in locals() and conn: | |
| conn.close() | |
| def save_analysis_results(results: dict, output_dir: str = 'time_analysis'): | |
| """保存分析结果到文件""" | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| for user_id, time_slots in results.items(): | |
| filename = os.path.join(output_dir, f'user_{user_id}_time_analysis.txt') | |
| try: | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| f.write(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| f.write(f"用户ID: {user_id}\n") | |
| f.write("=" * 50 + "\n\n") | |
| f.write("时间段使用频率统计(前6名):\n") | |
| # 按频率排序并获取前6个时段 | |
| top_slots = sorted(time_slots.items(), key=lambda x: x[1], reverse=True)[:6] | |
| for i, (slot, count) in enumerate(top_slots, 1): | |
| f.write(f"第{i}名: {slot}\n") | |
| f.write(f" 出现次数: {count}\n") | |
| percentage = (count / sum(time_slots.values())) * 100 | |
| f.write(f" 占比: {percentage:.2f}%\n") | |
| f.write("-" * 30 + "\n") | |
| # 添加总计信息 | |
| f.write(f"\n总修改次数: {sum(time_slots.values())}\n") | |
| f.write(f"总时间段数: {len(time_slots)}/36\n") | |
| print(f"已保存用户 {user_id} 的时间分析到文件: {filename}") | |
| except Exception as e: | |
| print(f"保存用户 {user_id} 的分析结果时出错: {e}") | |
| def main(): | |
| print("正在连接数据库...") | |
| try: | |
| # 创建数据库连接器实例 | |
| db_connector = DatabaseConnector() | |
| print("正在分析时间段分布...") | |
| results = analyze_time_slots(db_connector) | |
| if results: | |
| print(f"分析完成,共有 {len(results)} 个用户的数据") | |
| save_analysis_results(results) | |
| print("分析结果已保存到文件中") | |
| else: | |
| print("未找到可分析的数据") | |
| except Exception as e: | |
| print(f"处理过程中出错: {e}") | |
| if __name__ == "__main__": | |
| main() |