superxu520 commited on
Commit
1004573
·
1 Parent(s): b47f655

"perf:memory-optimization-with-explicit-gc"

Browse files
Files changed (1) hide show
  1. sync_data.py +29 -0
sync_data.py CHANGED
@@ -8,6 +8,7 @@ import sys
8
  import logging
9
  import time
10
  import threading
 
11
  from datetime import datetime, timedelta
12
  from concurrent.futures import ThreadPoolExecutor, as_completed
13
  from typing import List, Optional, Dict, Any
@@ -456,6 +457,8 @@ def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> Dict
456
 
457
  if all_new_data:
458
  inc_df = pd.concat(all_new_data, ignore_index=True)
 
 
459
  total_records = len(inc_df)
460
  # 识别变动月份
461
  changed = inc_df.assign(yr=inc_df['trade_date'].dt.year, mo=inc_df['trade_date'].dt.month)[['yr', 'mo']].drop_duplicates().values
@@ -488,12 +491,20 @@ def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> Dict
488
  month_inc = inc_df[(inc_df['trade_date'].dt.year == yr) & (inc_df['trade_date'].dt.month == mo)]
489
  if old_df is not None:
490
  final_month_df = pd.concat([old_df, month_inc]).drop_duplicates(subset=['code', 'trade_date'])
 
 
491
  else:
492
  final_month_df = month_inc
493
 
494
  final_month_df.to_parquet(local_path)
495
  changed_files.append(filename) # 记录变更的文件
496
  logger.info(f"Saved updated data for {filename}")
 
 
 
 
 
 
497
  else:
498
  total_records = 0
499
 
@@ -654,6 +665,8 @@ def sync_fund_flow(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[s
654
  # 5. 按月分表保存
655
  if all_data:
656
  new_df = pd.concat(all_data, ignore_index=True)
 
 
657
  total_records = len(new_df)
658
 
659
  # 确定需要更新的月份
@@ -682,16 +695,23 @@ def sync_fund_flow(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[s
682
 
683
  if old_month_df is not None:
684
  final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date'])
 
 
685
  else:
686
  final_month_df = month_data
687
 
688
  final_month_df.to_parquet(local_path)
689
  changed_files.append(filename) # 记录变更的文件
690
  logger.info(f"Saved fund flow data for {filename}")
 
 
691
 
692
  current += 1
693
 
694
  logger.info(f"Fund flow updated: {len(new_df)} new records")
 
 
 
695
 
696
  return {
697
  'count': len(success_codes),
@@ -829,6 +849,8 @@ def sync_valuation(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[s
829
  # 5. 按月分表保存
830
  if all_data:
831
  new_df = pd.concat(all_data, ignore_index=True)
 
 
832
  total_records = len(new_df)
833
 
834
  if not new_df.empty:
@@ -855,16 +877,23 @@ def sync_valuation(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[s
855
 
856
  if old_month_df is not None:
857
  final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date'])
 
 
858
  else:
859
  final_month_df = month_data
860
 
861
  final_month_df.to_parquet(local_path)
862
  changed_files.append(filename) # 记录变更的文件
863
  logger.info(f"Saved valuation data for {filename}")
 
 
864
 
865
  current += 1
866
 
867
  logger.info(f"Valuation updated: {len(new_df)} new records")
 
 
 
868
 
869
  return {
870
  'count': success_count,
 
8
  import logging
9
  import time
10
  import threading
11
+ import gc
12
  from datetime import datetime, timedelta
13
  from concurrent.futures import ThreadPoolExecutor, as_completed
14
  from typing import List, Optional, Dict, Any
 
457
 
458
  if all_new_data:
459
  inc_df = pd.concat(all_new_data, ignore_index=True)
460
+ # 方案3:及时释放内存
461
+ del all_new_data
462
  total_records = len(inc_df)
463
  # 识别变动月份
464
  changed = inc_df.assign(yr=inc_df['trade_date'].dt.year, mo=inc_df['trade_date'].dt.month)[['yr', 'mo']].drop_duplicates().values
 
491
  month_inc = inc_df[(inc_df['trade_date'].dt.year == yr) & (inc_df['trade_date'].dt.month == mo)]
492
  if old_df is not None:
493
  final_month_df = pd.concat([old_df, month_inc]).drop_duplicates(subset=['code', 'trade_date'])
494
+ # 方案3:释放旧数据内存
495
+ del old_df, month_inc
496
  else:
497
  final_month_df = month_inc
498
 
499
  final_month_df.to_parquet(local_path)
500
  changed_files.append(filename) # 记录变更的文件
501
  logger.info(f"Saved updated data for {filename}")
502
+ # 方案3:释放最终数据内存
503
+ del final_month_df
504
+
505
+ # 方案3:循环结束后释放inc_df并触发GC
506
+ del inc_df
507
+ gc.collect()
508
  else:
509
  total_records = 0
510
 
 
665
  # 5. 按月分表保存
666
  if all_data:
667
  new_df = pd.concat(all_data, ignore_index=True)
668
+ # 方案3:及时释放内存
669
+ del all_data
670
  total_records = len(new_df)
671
 
672
  # 确定需要更新的月份
 
695
 
696
  if old_month_df is not None:
697
  final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date'])
698
+ # 方案3:释放旧数据内存
699
+ del old_month_df, month_data
700
  else:
701
  final_month_df = month_data
702
 
703
  final_month_df.to_parquet(local_path)
704
  changed_files.append(filename) # 记录变更的文件
705
  logger.info(f"Saved fund flow data for {filename}")
706
+ # 方案3:释放最终数据内存
707
+ del final_month_df
708
 
709
  current += 1
710
 
711
  logger.info(f"Fund flow updated: {len(new_df)} new records")
712
+ # 方案3:释放new_df并触发GC
713
+ del new_df
714
+ gc.collect()
715
 
716
  return {
717
  'count': len(success_codes),
 
849
  # 5. 按月分表保存
850
  if all_data:
851
  new_df = pd.concat(all_data, ignore_index=True)
852
+ # 方案3:及时释放内存
853
+ del all_data
854
  total_records = len(new_df)
855
 
856
  if not new_df.empty:
 
877
 
878
  if old_month_df is not None:
879
  final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date'])
880
+ # 方案3:释放旧数据内存
881
+ del old_month_df, month_data
882
  else:
883
  final_month_df = month_data
884
 
885
  final_month_df.to_parquet(local_path)
886
  changed_files.append(filename) # 记录变更的文件
887
  logger.info(f"Saved valuation data for {filename}")
888
+ # 方案3:释放最终数据内存
889
+ del final_month_df
890
 
891
  current += 1
892
 
893
  logger.info(f"Valuation updated: {len(new_df)} new records")
894
+ # 方案3:释放new_df并触发GC
895
+ del new_df
896
+ gc.collect()
897
 
898
  return {
899
  'count': success_count,