Paperbag commited on
Commit
40e8192
·
1 Parent(s): d9cd41c

feat: process agent questions concurrently using a thread pool.

Browse files
Files changed (1) hide show
  1. app.py +31 -12
app.py CHANGED
@@ -106,30 +106,49 @@ def run_and_submit_all( profile: gr.OAuthProfile | None):
106
  # 3. Run your Agent
107
  results_log = []
108
  answers_payload = []
109
- # print(f"Running agent on {len(questions_data)} questions...")
110
- print(f"Running agent on {len(questions_data)} questions temporarily...")
111
- for item in questions_data:
112
  task_id = item.get("task_id")
113
- question_text = item.get("question")
114
  file_name = item.get("file_name")
115
- if not task_id or question_text is None:
116
- print(f"Skipping item with missing task_id or question: {item}")
117
- continue
118
 
 
119
  if file_name:
120
- # Actually download the file to local cache and get absolute path
121
  resolved_path = file_extract(file_name, task_id)
122
  if resolved_path:
123
  question_text += f"\n\n[Attached File Local Path: {resolved_path}]"
124
  else:
125
  question_text += f"\n\n[Attached File: {file_name} (Download Failed)]"
 
126
  try:
127
  submitted_answer = agent(question_text)
128
- answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
129
- results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
 
 
 
130
  except Exception as e:
131
- print(f"Error running agent on task {task_id}: {e}")
132
- results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
 
134
  if not answers_payload:
135
  print("Agent did not produce any answers to submit.")
 
106
  # 3. Run your Agent
107
  results_log = []
108
  answers_payload = []
109
+ print(f"Running agent on {len(questions_data)} questions concurrently...")
110
+
111
+ def process_item(item):
112
  task_id = item.get("task_id")
113
+ orig_question_text = item.get("question")
114
  file_name = item.get("file_name")
115
+
116
+ if not task_id or orig_question_text is None:
117
+ return None
118
 
119
+ question_text = orig_question_text
120
  if file_name:
 
121
  resolved_path = file_extract(file_name, task_id)
122
  if resolved_path:
123
  question_text += f"\n\n[Attached File Local Path: {resolved_path}]"
124
  else:
125
  question_text += f"\n\n[Attached File: {file_name} (Download Failed)]"
126
+
127
  try:
128
  submitted_answer = agent(question_text)
129
+ return {
130
+ "task_id": task_id,
131
+ "submitted_answer": submitted_answer,
132
+ "question": orig_question_text
133
+ }
134
  except Exception as e:
135
+ return {
136
+ "task_id": task_id,
137
+ "submitted_answer": f"AGENT ERROR: {e}",
138
+ "question": orig_question_text,
139
+ "error": True
140
+ }
141
+
142
+ import concurrent.futures
143
+ # Max workers = 10 -> Process 10 questions at the same time
144
+ with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
145
+ # Submit all items
146
+ futures = {executor.submit(process_item, item): item for item in questions_data}
147
+ for future in concurrent.futures.as_completed(futures):
148
+ res = future.result()
149
+ if res:
150
+ answers_payload.append({"task_id": res["task_id"], "submitted_answer": res["submitted_answer"]})
151
+ results_log.append({"Task ID": res["task_id"], "Question": res["question"], "Submitted Answer": res["submitted_answer"]})
152
 
153
  if not answers_payload:
154
  print("Agent did not produce any answers to submit.")