1oscon commited on
Commit
2c0494e
·
verified ·
1 Parent(s): be9406d

Update 配置.py

Browse files
Files changed (1) hide show
  1. 配置.py +69 -58
配置.py CHANGED
@@ -73,7 +73,7 @@ def _sniff_delimiter(path):
73
 
74
  def 读取CSV预览(file_obj):
75
  if file_obj is None:
76
- return "未选择文件", [], ""
77
  path = file_obj.name
78
  delim = _sniff_delimiter(path)
79
  headers = []
@@ -87,12 +87,13 @@ def 读取CSV预览(file_obj):
87
  if i < 5:
88
  head_rows.append(row)
89
  total += 1
90
- info = f"分隔符: '{delim}' | 列: {headers} | 总行(不含表头): ~{total}"
91
- return info, headers, "\n".join([str(x) for x in head_rows])
 
92
  except Exception as e:
93
- return f"读取失败: {e}", [], ""
94
 
95
- def 标准化CSV(file_obj, 映射, 输出文件名):
96
  if file_obj is None:
97
  return "请先上传CSV", ""
98
  path = file_obj.name
@@ -102,13 +103,11 @@ def 标准化CSV(file_obj, 映射, 输出文件名):
102
  os.makedirs(save_dir, exist_ok=True)
103
  out_path = os.path.join(save_dir, 输出文件名)
104
 
105
- # 校验映射
106
- 必要 = ["时间","","","",""]
107
- for k in 必要:
108
- if not 映射 or not 映射.get(k):
109
- return f"映射缺少必要列: {k}", ""
110
- vol_col = 映射.get("量") # 可选
111
-
112
  # 读取、转换并写出标准格式
113
  delim = _sniff_delimiter(path)
114
  写入 = 0
@@ -120,12 +119,12 @@ def 标准化CSV(file_obj, 映射, 输出文件名):
120
  wr.writerow(["时间","开","高","低","收","量"]) # 标准表头
121
  for row in r:
122
  try:
123
- tms = _parse_time_to_ms(row[映射["时间"]])
124
- o = float(row[映射[""]])
125
- h = float(row[映射[""]])
126
- l = float(row[映射[""]])
127
- c = float(row[映射[""]])
128
- v = float(row[vol_col]) if vol_col and row.get(vol_col) not in (None, "") else 0.0
129
  if tms is None:
130
  continue
131
  wr.writerow([tms, o, h, l, c, v])
@@ -213,58 +212,70 @@ def 构建设置面板(容器, 默认参数=None, 记录函数=None):
213
  with gr.Row():
214
  with gr.Column(scale=1):
215
  csv文件 = gr.File(label="上传 1m K线 CSV", file_types=[".csv"])
216
- 预览信息 = gr.Textbox(label="文件预览", lines=3, interactive=False)
 
 
 
217
  预览前几行 = gr.Textbox(label="前几行示例", lines=8, interactive=False)
218
- 表头下拉容器 = gr.Column()
219
- with 表头下拉容器:
220
- # 动态下拉在上传后创建(避免名未知)
221
- pass
 
 
 
 
 
 
222
  输出文件名 = gr.Textbox(label="输出文件名", value="uploaded_1m.csv")
223
  保存按钮 = gr.Button("标准化并保存")
224
  保存结果 = gr.Textbox(label="结果", interactive=False)
225
  输出路径 = gr.Textbox(label="输出路径", interactive=False)
 
226
  with gr.Column(scale=1):
227
- gr.Markdown("标准格式说明")
228
- gr.Markdown("目标列: 时间(ms), 开, 高, 低, 收, 量\n- 时间列自动识别秒/毫秒/ISO时间\n- 缺量则填0\n- 输出路径位于 /tmp/app_data/csv/")
 
229
 
230
- # 上传后预览并生成映射下拉
231
- def on_csv_upload(file_obj):
232
- info, headers, head = 读取CSV预览(file_obj)
233
- if not headers:
234
- return info, head, gr.update(visible=False), None, None, None, None, None
 
 
 
 
 
235
  映射 = _guess_mapping(headers)
236
- # 生成下拉组件
237
- with 表头下拉容器:
238
- 映射_时间 = gr.Dropdown(choices=headers, label="映射:时间列", value=映射.get("时间"))
239
- 映射_开 = gr.Dropdown(choices=headers, label="映射:开(Open)", value=映射.get("开"))
240
- 映射_高 = gr.Dropdown(choices=headers, label="映射:高(High)", value=映射.get("高"))
241
- 映射_低 = gr.Dropdown(choices=headers, label="映射:低(Low)", value=映射.get("低"))
242
- 映射_收 = gr.Dropdown(choices=headers, label="映射:收(Close)", value=映射.get(""))
243
- 映射_量 = gr.Dropdown(choices=headers, label="映射:量(Volume,可选)", value=映射.get(""))
244
- return info, head, gr.update(visible=True), 映射_时间, 映射_开, 映射_高, 映射_低, 映射_收, 映射_量
245
-
246
- # 用于保存时收集映射
247
- 动态映射状态 = gr.State(value=None)
248
-
249
- def 收集映射(时间c, 开c, 高c, 低c, 收c, 量c):
250
- return {"时间": 时间c, "开": 开c, "高": 高c, "低": 低c, "收": 收c, "量": 量c}
251
 
252
- # 绑定上传
253
- out = csv文件.upload(
254
- fn=on_csv_upload,
255
  inputs=csv文件,
256
- outputs=[预览信息, 预览前几行, 表头下拉容器, gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State()],
 
 
 
257
  )
258
-
259
- # 这里 out.outputs[3:] 分别是我们返回的 6 个下拉组(以State占位接收)
260
- def 保存触发(file_obj, 输出名, m时间, m开, m高, m低, m收, m量):
261
- 映射 = {"时间": m时间, "开": m开, "高": m高, "低": m低, "收": m收, "量": m量}
262
- msg, path = 标准化CSV(file_obj, 映射, 输出名)
263
- return msg, path
264
-
265
  保存按钮.click(
266
- 保存触发,
267
- inputs=[csv文件, 输出文件名] + out[1][3:], # 取动态返回的6个下拉
268
  outputs=[保存结果, 输出路径]
269
  )
270
 
 
73
 
74
  def 读取CSV预览(file_obj):
75
  if file_obj is None:
76
+ return "未选择文件", "", ""
77
  path = file_obj.name
78
  delim = _sniff_delimiter(path)
79
  headers = []
 
87
  if i < 5:
88
  head_rows.append(row)
89
  total += 1
90
+ info = f"分隔符: '{delim}' | 列: {len(headers)} | 总行(不含表头): ~{total}"
91
+ preview = "\n".join([",".join(row) for row in head_rows])
92
+ return info, ",".join(headers), preview
93
  except Exception as e:
94
+ return f"读取失败: {e}", "", ""
95
 
96
+ def 标准化CSV(file_obj, 输出文件名, 时间列, 开列, 高列, 低列, 收列, 量列):
97
  if file_obj is None:
98
  return "请先上传CSV", ""
99
  path = file_obj.name
 
103
  os.makedirs(save_dir, exist_ok=True)
104
  out_path = os.path.join(save_dir, 输出文件名)
105
 
106
+ # 校验必要列
107
+ 必要 = [时间, , , , ]
108
+ if not all(必要):
109
+ return f"请确保时间、开、高、低、收列都已选择", ""
110
+
 
 
111
  # 读取、转换并写出标准格式
112
  delim = _sniff_delimiter(path)
113
  写入 = 0
 
119
  wr.writerow(["时间","开","高","低","收","量"]) # 标准表头
120
  for row in r:
121
  try:
122
+ tms = _parse_time_to_ms(row[时间])
123
+ o = float(row[开])
124
+ h = float(row[高])
125
+ l = float(row[低])
126
+ c = float(row[收])
127
+ v = float(row[量列]) if 量列 and 量列 in row and row[量列] not in (None, "") else 0.0
128
  if tms is None:
129
  continue
130
  wr.writerow([tms, o, h, l, c, v])
 
212
  with gr.Row():
213
  with gr.Column(scale=1):
214
  csv文件 = gr.File(label="上传 1m K线 CSV", file_types=[".csv"])
215
+
216
+ # 预览区域
217
+ 预览信息 = gr.Textbox(label="文件信息", lines=2, interactive=False)
218
+ 表头显示 = gr.Textbox(label="检测到的表头", lines=2, interactive=False)
219
  预览前几行 = gr.Textbox(label="前几行示例", lines=8, interactive=False)
220
+
221
+ # 映射下拉菜单(静态创建)
222
+ gr.Markdown("###映射")
223
+ 时间列 = gr.Dropdown(label="时间列", choices=[], value=None)
224
+ 开列 = gr.Dropdown(label="开盘价列", choices=[], value=None)
225
+ 高列 = gr.Dropdown(label="最高价列", choices=[], value=None)
226
+ 低列 = gr.Dropdown(label="最低价列", choices=[], value=None)
227
+ 收列 = gr.Dropdown(label="收盘价列", choices=[], value=None)
228
+ 量列 = gr.Dropdown(label="成交量列(可选)", choices=[], value=None)
229
+
230
  输出文件名 = gr.Textbox(label="输出文件名", value="uploaded_1m.csv")
231
  保存按钮 = gr.Button("标准化并保存")
232
  保存结果 = gr.Textbox(label="结果", interactive=False)
233
  输出路径 = gr.Textbox(label="输出路径", interactive=False)
234
+
235
  with gr.Column(scale=1):
236
+ gr.Markdown("### 使用说明")
237
+ gr.Markdown("1. 上传 CSV 文件\n2. 系统自动检测表头\n3. 选择对应的列映射\n4. 点击保存生成标准格式")
238
+ gr.Markdown("标准格式: 时间(ms), 开, 高, 低, 收, 量")
239
 
240
+ # CSV 上传后的处理函数
241
+ def 处理CSV上传(file_obj):
242
+ if file_obj is None:
243
+ return ("", "", "", [], None, None, None, None, None, None)
244
+
245
+ info, headers_str, preview = 读取CSV预览(file_obj)
246
+ if not headers_str:
247
+ return (info, "", preview, [], None, None, None, None, None, None)
248
+
249
+ headers = headers_str.split(",")
250
  映射 = _guess_mapping(headers)
251
+
252
+ return (
253
+ info,
254
+ headers_str,
255
+ preview,
256
+ headers,
257
+ 映射.get("时间"),
258
+ 映射.get(""),
259
+ 映射.get("高"),
260
+ 映射.get("低"),
261
+ 映射.get("收"),
262
+ 映射.get("量")
263
+ )
 
 
264
 
265
+ # 更新下拉菜单选项和默认值
266
+ csv文件.change(
267
+ fn=处理CSV上传,
268
  inputs=csv文件,
269
+ outputs=[
270
+ 预览信息, 表头显示, 预览前几行,
271
+ 时间列, 时间列, 开列, 高列, 低列, 收列, 量列
272
+ ]
273
  )
274
+
275
+ # 保存按钮点击事
 
 
 
 
 
276
  保存按钮.click(
277
+ fn=标准化CSV,
278
+ inputs=[csv文件, 输出文件名, 时间列, 开列, 高列, 低列, 收列, 量列],
279
  outputs=[保存结果, 输出路径]
280
  )
281