| |
| """Best-Scoring-Notebook (24).ipynb |
| |
| Automatically generated by Colab. |
| |
| Original file is located at |
| https://colab.research.google.com/drive/1To0YVSRunnAEw2y5QkbuT_LBhin1rLgq |
| |
| # Setup The Environment |
| """ |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| import time |
| global_deadline = time.perf_counter() + 5*3600 |
| global_remaining = global_deadline - time.perf_counter() |
| cutoff_duration = global_remaining - 350 |
| def get_global_remaining(): |
| return max(0, global_deadline - time.perf_counter()) |
|
|
| import os |
| os.environ["CUDA_LAUNCH_BLOCKING"] = "1" |
| import torch |
|
|
| """ |
| import logging |
| logging.basicConfig(level=logging.DEBUG) |
| """ |
|
|
| import asyncio |
| import torch |
| import subprocess |
| import warnings |
| import glob |
| import pandas as pd |
| import traceback |
| import nest_asyncio |
| import httpx |
| import re |
| import time |
| import copy |
| import json |
| import requests |
| import pandas as pd |
| import polars as pl |
| from collections import Counter |
| from typing import List |
| import secrets |
| import json |
| pd.set_option('display.max_colwidth', None) |
| warnings.filterwarnings("ignore", category=SyntaxWarning) |
| nest_asyncio.apply() |
| os.environ["TORCH_COMPILE_DISABLE"] = "1" |
| os.environ["TORCHDYNAMO_DISABLE"] = "1" |
| os.environ['TRANSFORMERS_NO_FLAX'] = '1' |
| os.environ['CUDA_VISIBLE_DEVICES'] = '0' |
| os.environ['TOKENIZERS_PARALLELISM'] = 'false' |
| os.environ['TRITON_PTXAS_PATH'] = '/usr/local/cuda/bin/ptxas' |
| os.environ['TIKTOKEN_RS_CACHE_DIR']= "/content/harmony_encoding" |
| os.environ["TORCH_CUDA_ARCH_LIST"] = '9.0' |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"]="expandable_segments:True" |
| |
| from collections import Counter, defaultdict |
|
|
| |
| os.environ["TORCHINDUCTOR_CACHE_DIR"] = "torch_cache" |
|
|
| import os, sys |
| original_pythonpath = os.environ.get("PYTHONPATH", "") |
| path1 = '/content/modified-nemo-skills' |
| merged_pythonpath = f"{path1}:{original_pythonpath}" if original_pythonpath else {path1} |
| os.environ["PYTHONPATH"] = merged_pythonpath |
| sys.path.append('/content/modified-nemo-skills') |
|
|
| from nemo_skills.code_execution.sandbox import get_sandbox |
| from nemo_skills.inference.model import get_code_execution_model |
| from nemo_skills.prompt.utils import get_prompt |
| from nemo_skills.inference.model import get_model |
|
|
| """# Configuration Parameters""" |
|
|
| host = "127.0.0.1" |
| port = 5000 |
| tp_size = 1 |
| max_public = 10 |
| max_tokens = 80000 |
| max_input_tokens = 1800 |
| tokens_to_generate = 78200 - 10 |
| max_batch_size = 8 |
| timeout_seconds = 300 |
| global_buffer = 350 |
| finish_at_last_n = 2 |
| max_code_output_characters = 1100 |
| code_execution_timeout = 5 |
| max_code_executions = 125 |
| g_score = 0 |
| g_count = 0 |
| prompt_score = Counter() |
| sampling_params = { |
| "tokens_to_generate": tokens_to_generate, |
| "temperature": 1, |
| "top_p": 1, |
| } |
|
|
| thoughts = [""] * 50 |
| thoughts = thoughts[:max_batch_size] |
| i = 0 |
|
|
| model_path = "/content/model" |
|
|
| """# Start Server - Load Model & Sandbox""" |
|
|
| server_started = False |
| def load_model(): |
| cmd = [ |
| "python", |
| "-m", |
| "nemo_skills.inference.server.serve_vllm", |
| f"--model={model_path}", |
| "--port=5000", |
| "--num_gpus=1", |
| "--max_model_len=80000", |
| "--max_num_batched_tokens=65000", |
| "--max_num_seqs=13", |
| "--max-cudagraph-capture-size=2048", |
| "--gpu_memory_utilization=0.96", |
| "--kv_cache_dtype=fp8_e4m3", |
| "--stream-interval=200", |
| "--enable-prefix-caching", |
| "--uvicorn-log-level debug", |
| "--enable-log-requests", |
| "--enable-log-outputs", |
| "--async-scheduling", |
| ] |
|
|
| log_file = open("vllm.log", "w") |
| vllm_server = subprocess.Popen( |
| cmd, |
| stdout=log_file, |
| stderr=log_file, |
| text=True, |
| bufsize=1 |
| ) |
| return vllm_server |
|
|
| vllm_server=load_model() |
|
|
| def wait_for_server(url=f"http://{host}:{port}", timeout=1200): |
| start = time.perf_counter() |
| while True: |
| try: |
| r = requests.get(f"{url}/docs") |
| if r.status_code == 200: |
| print("✅ Server is ready",time.perf_counter()-start) |
| return True |
| except Exception: |
| pass |
|
|
| if time.perf_counter() - start > timeout: |
| raise TimeoutError("Server did not start in time") |
|
|
| time.sleep(1) |
|
|
| def sandbox_server(): |
| log_file = open("sandbox.log", "w") |
| sandbox_process = subprocess.Popen( |
| ["python", "-m", "nemo_skills.code_execution.local_sandbox.local_sandbox_server"], |
| stdout=log_file, |
| stderr=log_file, |
| text=True, |
| bufsize=1) |
|
|
| time.sleep(3) |
|
|
| time.sleep(2) |
| sandbox_server() |
| sandbox = get_sandbox() |
|
|
| """# Prompt Types and Updating Prompt""" |
|
|
| default_prompt = ( |
| 'You are an elite mathematical problem solver with expertise at the International ' |
| 'Mathematical Olympiad (IMO) level. Your goal is to find the correct answer through ' |
| 'rigorous mathematical reasoning.\n\n' |
|
|
| '# Problem-Solving Approach:\n' |
| '1. UNDERSTAND: Carefully read and rephrase the problem in your own words. ' |
| 'Identify what is given, what needs to be found, and any constraints.\n' |
| '2. EXPLORE: Consider multiple solution strategies. Think about relevant theorems, ' |
| 'techniques, patterns, or analogous problems. Don\'t commit to one approach immediately.\n' |
| '3. PLAN: Select the most promising approach and outline key steps before executing.\n' |
| '4. EXECUTE: Work through your solution methodically. Show all reasoning steps clearly.\n' |
| '5. VERIFY: Check your answer by substituting back, testing edge cases, or using ' |
| 'alternative methods. Ensure logical consistency throughout.\n\n' |
|
|
| '# Mathematical Reasoning Principles:\n' |
| '- Break complex problems into smaller, manageable sub-problems\n' |
| '- Look for patterns, symmetries, and special cases that provide insight\n' |
| '- Use concrete examples to build intuition before generalizing\n' |
| '- Consider extreme cases and boundary conditions\n' |
| '- If stuck, try working backwards from the desired result\n' |
| '- Be willing to restart with a different approach if needed\n\n' |
|
|
| '# Verification Requirements:\n' |
| '- Cross-check arithmetic and algebraic manipulations\n' |
| '- Verify that your solution satisfies all problem constraints\n' |
| '- Test your answer with simple cases or special values when possible\n' |
| '- Ensure dimensional consistency and reasonableness of the result\n\n' |
|
|
| "#RESPONSE FORMAT:\n\n" |
| "The final answer must be a non-negative integer.\n. Instead of the \\boxed{} format use json format. Follow the instructions for the format-" |
| ' "Answer": <non-negative integer>,"Confidence": <number between 0 and 1>' |
| "Do not output any additional reasoning after this JSON.\n" |
| "Do not output any additional reasoning after this JSON.\n" |
| ) |
|
|
| |
| system_message='{system_prompt}' |
| prompt_template = get_prompt(prompt_config='gpt-oss/math',system_message=system_message,tokenizer=model_path,code_tags="gpt-oss") |
| chat_template_kwargs = { |
| "builtin_tools": ["python"], |
| "reasoning_effort":"high" |
|
|
| } |
|
|
| def safe_concat(a, b,function_name): |
| if a is None or b is None: |
| raise ValueError(f"Cannot concatenate: a={a}, b={b}, Error Raised from function {function_name}") |
| return a + b |
|
|
| """# Data Extraction & Early Stopping""" |
|
|
| class Result: |
| def __init__(self): |
| self.early_stop_flag = False |
| def best_voted_answer(self): |
| return self.best_answer |
|
|
| def majority_voting(self, answer_list): |
| count = defaultdict(float) |
| |
| self.answer_list = answer_list |
| self.valid_answers = [x["Answer"] for x in self.answer_list if x["Answer"] != -1] |
| print("Answer_list after popping -1", self.valid_answers, "%%%%") |
|
|
| |
| if len(self.valid_answers) == 0: |
| self.best_answer = None |
| self.best_count = 0 |
| self.second_count = 0 |
| self.sorted_answers = [] |
| return |
|
|
| for a in self.valid_answers: |
| count[a] += 1 |
| self.sorted_answers = sorted(count.items(), key=lambda x: x[1], reverse=True) |
|
|
| self.best_answer, self.best_count = self.sorted_answers[0] |
| self.second_count = self.sorted_answers[1][1] if len(self.sorted_answers) > 1 else 0 |
|
|
| if ( |
| self.best_count == 1 |
| and self.best_answer == 0 |
| and len(self.sorted_answers) > 1 |
| and self.sorted_answers[1] is not None |
| ): |
|
|
| self.best_answer, self.best_count = self.sorted_answers[1] |
|
|
|
|
| def early_stop(self, answer_list, num_done): |
| print("Num_done is",num_done) |
| self.num_done = num_done |
| self.majority_voting(answer_list) |
| n_valid = len(self.valid_answers) |
| best = self.best_count |
| gap = self.best_count - self.second_count |
| print(f"Num done: {self.num_done}, Valid answers: {n_valid}, " |
| f"Best count: {best}, Second count: {self.second_count}") |
|
|
| if n_valid == 0: |
| return False |
|
|
| if best >= 3 and gap >= 1: |
| self.early_stop_flag = True |
| print(f">>> EARLY STOP at {self.num_done} completions | " |
| f"best={self.best_answer} (count={best}, gap={gap})") |
|
|
| return self.early_stop_flag |
|
|
| def get_best_answer(self,answer_list, num_done, flag): |
| if not flag: |
| self.majority_voting(answer_list) |
| else: |
| self.early_stop(answer_list, num_done) |
| return self.best_voted_answer(), self.early_stop_flag |
|
|
| import re, requests |
|
|
| class Answer: |
| def __init__(self): |
| self.best_answer = None |
| self.input_message = "" |
| self.best_count = 0 |
| self.second_count = 0 |
| self.answer_list = [] |
| self.early_stop_flag = False |
| self.sorted_answers = [] |
| self.valid_answers = [] |
| self.sampling_param = { |
| "tokens_to_generate": 7000, |
| "temperature": 0.9, |
| "top_p": 0.95, |
| } |
| self.timeout = httpx.Timeout( |
| connect=60.0, |
| read=300.0, |
| write=60.0, |
| pool=120.0, |
| ) |
|
|
| def clean_messages(self, text): |
| cleaned = re.sub(r'<\|[^|]*\|>', '', text) |
| return cleaned.strip() |
|
|
|
|
| async def extract_answer(self, question, model_output): |
| answer = -1 |
| confidence = -0.1 |
| seed = secrets.randbits(32) |
| input_message = self.clean_messages(model_output) |
| rid = secrets.token_hex(8) |
| message = prompt_template.fill( |
| input_dict={ |
| "problem": safe_concat(question,input_message,"extract_answer"), |
| "system_prompt": promptobj.get_dprompt("extract_answer"), |
| }, |
| chat_template_kwargs = chat_template_kwargs, |
| format_as_string=True |
| ) |
| print(prompt_template) |
| print("textd was called") |
| try: |
| data, completion_tokens = await server_obj.generate_response( |
| prompt=message, |
| random_seed=seed, |
| stream=True, |
| calling_function = "extract_answer", |
| extra_body={"request_id": rid, "reasoning_effort":"medium"}, |
| timeout = self.timeout, |
| **self.sampling_param, |
| ) |
|
|
| if data is not None and isinstance(data, dict): |
| return data |
| else: |
| return {"Answer":-1, "Confidence":-0.1} |
|
|
| except Exception as e: |
| print(f"[extract_answer failed] {type(e).__name__}: {e}") |
| return {"Answer":answer,"Confidence": confidence} |
|
|
| """# Inference""" |
|
|
| |
| |
| server_obj = get_code_execution_model(server_type = 'vllm', |
| model=model_path, |
| base_url="http://127.0.0.1:5000/v1", |
| api_key='EMPTY', |
| sandbox=sandbox, |
| code_execution={ |
| 'max_code_output_characters': max_code_output_characters, |
| 'code_execution_timeout': code_execution_timeout, |
| 'max_code_executions': max_code_executions, |
| }) |
|
|
| async def abort_request(request_ids: str | list[str]): |
| """Sequential best-effort server-side abort. |
| Uses short timeouts so a slow/down server doesn't block. |
| Silently ignores failures. |
| """ |
| if isinstance(request_ids, str): |
| request_ids = [request_ids] |
|
|
| timeout = httpx.Timeout(connect=1.0, read=2.0, write=1.0, pool=1.0) |
|
|
| async with httpx.AsyncClient(timeout=timeout) as client: |
| for rid in request_ids: |
| try: |
| await client.delete(f"http://{host}:{port}/v1/requests/{rid}") |
| except Exception: |
| |
| pass |
| await asyncio.sleep(0.05) |
|
|
| class ClientClass: |
| def __init__(self, prompt): |
| global sampling_params |
| self.thresh_hold = 3 |
| self.system_prompt = prompt |
| self.answer = {} |
| self.randomseed_list = [] |
| self.num_done = 0 |
| self.sampling_param = copy.deepcopy(sampling_params) |
| self.question = "" |
| self.finished_generations = [] |
| self.final_answer = None |
| self.early_stop_flag = False |
| self.flattened_prompt_list = [] |
| self.list_of_questions = [] |
| self.answer_list = [] |
| self.request_ids = [] |
| self.tasks = [] |
| self.timeout = httpx.Timeout( |
| connect=30.0, |
| read= 500.0 , |
| write=30.0, |
| pool=120.0, |
| ) |
| self.answerobj = Answer() |
|
|
| async def send_request_to_server(self): |
| print("Request sent") |
| self.request_ids = [secrets.token_hex(8) for _ in self.list_of_questions] |
| self.randomseed_list = [k for k in range(len(self.list_of_questions))] |
| for prompt, seed, rid in zip(self.list_of_questions, self.randomseed_list, self.request_ids): |
| task = asyncio.create_task( |
| server_obj.generate_async( |
| prompt=prompt, |
| random_seed=seed, |
| timeout=self.timeout, |
| remove_stop_phrases=False, |
| stream = True, |
| extra_body={"request_id": rid,"enable_thinking":True,"reasoning_effort":"high"}, |
| **prompt_template.get_code_execution_args(), |
| **self.sampling_param, |
| ) |
| ) |
| self.tasks.append(task) |
|
|
| try: |
| processed = set() |
| for completed in asyncio.as_completed(self.tasks): |
| try: |
| result = await completed |
| self.num_done += 1 |
| processed.add(completed) |
| self.finished_generations.append(result["generation"]) |
| if result["answer"] is not None: |
| self.answer = json.loads(result["answer"]) |
| print("The answer and confidence after json parsing", self.answer) |
| yield self.answer |
| else: |
| self.answer = await self.answerobj.extract_answer(self.question, result["generation"]) |
| print("The answer and confidence after interaction with 2nd model",self.answer) |
| yield self.answer |
| except GeneratorExit: |
| return |
| except Exception as e: |
| traceback.print_exc() |
| error_type = type(e).__name__ |
| print(f"[ERROR] {error_type}") |
| traceback.print_exc() |
| self.answer = { |
| "Answer": -1, |
| "Confidence": -0.1, |
| } |
| yield self.answer |
|
|
| finally: |
| |
| for t in self.tasks: |
| if t.done() and t not in processed: |
| try: |
| if not t.cancelled() and t.exception() is None: |
| self.res = t.result() |
|
|
| elif t.exception() is not None: |
| |
| pass |
| except Exception: |
| pass |
| elif not t.done(): |
| t.cancel() |
| asyncio.create_task(abort_request(self.request_ids)) |
|
|
| |
|
|
| def flatten_prompt_list(self): |
| global max_batch_size |
| self.flattened_prompt_list = [ |
| self.system_prompt |
| |
| for _ in range(max_batch_size) |
| ] |
|
|
| def generate_question_copies(self, question): |
| self.question = question |
| self.list_of_questions = [ |
| prompt_template.fill( |
| input_dict={ |
| "problem": question, |
| "system_prompt": system_prompt, |
| }, |
| chat_template_kwargs = chat_template_kwargs, |
| format_as_string=True |
| ) |
| for system_prompt in self.flattened_prompt_list |
| ] |
|
|
|
|
| async def predict_for_question(self, question): |
| self.flatten_prompt_list() |
| self.generate_question_copies(question) |
|
|
| gen = self.send_request_to_server() |
|
|
| try: |
| async for answer in gen: |
| yield answer |
|
|
| except Exception as e: |
| print("Error in predict_for_question:", e) |
| raise |
|
|
| finally: |
| try: |
| await gen.aclose() |
| except Exception: |
| pass |
|
|
| import math |
|
|
| class BufferBorrower: |
| """ |
| Dynamic buffer-time borrowing strategy for inference. |
| |
| Borrows from buffer time based on task difficulty and step-back |
| token usage, using a sigmoid curve for smooth allocation. |
| |
| Parameters |
| ---------- |
| max_difficulty : int or float |
| The upper bound of the difficulty scale (e.g., 5 or 1.0). |
| alpha : float |
| Weight for the difficulty signal (default 0.6). |
| beta : float |
| Weight for the step-back token signal (default 0.4). |
| b_max : float |
| Maximum fraction of buffer that can be borrowed (default 0.7). |
| k : float |
| Steepness of the sigmoid transition (default 6). |
| threshold : float |
| Midpoint of the sigmoid curve (default 0.4). |
| """ |
|
|
| def __init__( |
| self, |
| b_max: float = 0.85, |
| k: float = 6.0, |
| threshold: float = 0.4, |
| total_questions: int = 50, |
| total_available_time: int = 15720, |
| ): |
|
|
| self.b_max = b_max |
| self.k = k |
| self.threshold = threshold |
| self.total_questions = total_questions |
| self.total_available_time = total_available_time |
|
|
| def compute_time_pressure( |
| self, |
| remaining_time: float, |
| questions_completed: int, |
| global_buffer: float = 0.0, |
| ) -> float: |
| remaining_q = max(1, self.total_questions - questions_completed) |
| if remaining_time <= 0: |
| return 1.5 |
| ideal_pace = self.total_available_time / self.total_questions |
| available_pace = remaining_time / remaining_q |
| pressure = ideal_pace / available_pace |
| return max(0.3, min(1.5, pressure)) |
|
|
| def allocate_time( |
| self, |
| remaining_time: float, |
| questions_completed: int, |
| global_buffer: float = 0.0, |
| allowed_time : float = 320, |
| ) -> dict: |
| """ |
| Allocate effective inference and remaining buffer time. |
| |
| Parameters |
| ---------- |
| allowed_time : float |
| Base inference time budget. |
| global_buffer : float |
| global buffer time budget. |
| difficulty : float |
| Task difficulty score. |
| stepback_tokens : int |
| Tokens used in step-back phase. |
| stepback_budget : int |
| Total step-back token budget. |
| |
| Returns |
| ------- |
| dict |
| Keys: effective_inference, remaining_buffer, borrowed, |
| borrow_fraction. |
| """ |
| pressure = self.compute_time_pressure( |
| remaining_time, |
| questions_completed, |
| global_buffer |
| ) |
| borrow_fraction = 1/pressure |
| max_borrowable = 95 |
| print("borrow fraction", borrow_fraction) |
| borrowed = min(pressure * global_buffer, max_borrowable) |
|
|
|
|
| return { |
| "effective_inference": allowed_time + borrowed, |
| "global_buffer": global_buffer - borrowed, |
| "borrowed": borrowed, |
| "borrow_fraction": borrow_fraction, |
| } |
|
|
| class TimeBudget: |
| def __init__(self, total_seconds): |
| self.start = time.perf_counter() |
| self.deadline = self.start + total_seconds |
|
|
| @property |
| def remaining(self): |
| return max(0, self.deadline - time.perf_counter()) |
|
|
| @property |
| def elapsed(self): |
| return time.perf_counter() - self.start |
|
|
| @property |
| def expired(self): |
| return self.remaining <= 0 |
|
|
| class Pipeline: |
| def __init__(self): |
| self.budget_seconds = 0 |
| self.k = 1 |
| self.budget_seconds = 0 |
| async def get_prediction(self, problem_text): |
| global global_buffer, i, borrower, max_batch_size,last_30, sampling_param |
| budgetobj = None |
| timeout = 60 |
| |
| thresh_hold = 3 |
| num_done = 0 |
| max_generation_count = self.k*max_batch_size |
| answer_list = [] |
| finalanswerobj = Result() |
| print("Pipeline step 1") |
| deadline = 0 |
| allowed_time = 320 |
| self.budget_seconds = allowed_time |
| if global_buffer> 0: |
| result = borrower.allocate_time( |
| remaining_time = get_global_remaining(), |
| questions_completed = i, |
| allowed_time = allowed_time, |
| global_buffer = global_buffer |
| ) |
|
|
| self.budget_seconds = result["effective_inference"] |
| global_buffer = result["global_buffer"] |
| print(f'borrowed={result["borrowed"]:.0f}') |
| print(f"Budget: base={allowed_time:.0f}s " |
| f"= {self.budget_seconds:.0f}s (global remaining: {get_global_remaining():.0f}s)") |
| budgetobj = TimeBudget(self.budget_seconds) |
|
|
| clientobj = ClientClass(default_prompt) |
| deadline = max(deadline, budgetobj.remaining) |
| operation_start_time = time.perf_counter() |
| print("Deadline is", deadline) |
| gen = clientobj.predict_for_question(problem_text) |
| try: |
| async with asyncio.timeout(deadline): |
| async for answer in gen: |
| answer_list.append(answer) |
| print("Answer list on timeout is:-") |
| print(answer_list) |
| num_done = len(answer_list) |
| if num_done >= thresh_hold and num_done < max_generation_count: |
| prediction, early_stop_flag = finalanswerobj.get_best_answer(answer_list, num_done, True) |
| if early_stop_flag: |
| return prediction |
|
|
| elif num_done == max_generation_count: |
| prediction, _ = finalanswerobj.get_best_answer(answer_list, num_done, False) |
| return prediction |
| else: |
| continue |
| except (TimeoutError, asyncio.TimeoutError): |
| traceback.print_exc() |
| prediction, _ = finalanswerobj.get_best_answer(answer_list, num_done, False) |
| return prediction |
|
|
| except Exception as e: |
| traceback.print_exc() |
| print(f"UNEXPECTED ERROR: {type(e).__name__} {e}") |
| if answer_list: |
| prediction, _ = finalanswerobj.get_best_answer(answer_list, num_done, False) |
| return prediction |
| return None |
|
|
| finally: |
| await gen.aclose() |
| print("Operation duration", time.perf_counter()-operation_start_time) |
| if budgetobj.elapsed > self.budget_seconds: |
| global_buffer -= (budgetobj.elapsed - self.budget_seconds) |
| else: |
| global_buffer += (self.budget_seconds - budgetobj.elapsed) |
|
|
| def predict(id_: pl.Series, problem: pl.Series) -> pl.DataFrame | pd.DataFrame: |
| """Make a prediction.""" |
| global server_started, i |
| start_pred_time = time.perf_counter() |
| pipelineobj = Pipeline() |
| if server_started is False: |
| server_started = wait_for_server() |
|
|
| id_ = id_.item(0) |
| problem_text: str = problem.item(0) |
|
|
| |
| if get_global_remaining() < 30: |
| return pl.DataFrame({"id": id_, "answer": 29443}) |
| loop = asyncio.get_event_loop() |
| prediction = loop.run_until_complete(pipelineobj.get_prediction(problem_text)) |
|
|
| |
| if prediction is None: |
| prediction = 29443 |
|
|
| i = i + 1 |
|
|
| print("Returned dataframe is ", pl.DataFrame({"id": id_, "answer": prediction})) |
| return pl.DataFrame({"id": id_, "answer": prediction}) |
|
|
| borrower = "" |
|
|
| def run_local_inference(file_path: str, output_path: str = "submission.csv"): |
| global borrower |
| import pandas as pd |
| import polars as pl |
| borrower = BufferBorrower(total_questions = 50, total_available_time = get_global_remaining()) |
| |
| if file_path.endswith(".xlsx"): |
| df = pd.read_excel(file_path) |
| else: |
| df = pd.read_csv(file_path) |
|
|
| |
| assert "problem" in df.columns, "Column 'problem' is required" |
| df = df.dropna(subset=["problem"]) |
|
|
| |
| df = df[df["problem"].str.strip() != ""] |
|
|
| if "id" not in df.columns: |
| df["id"] = range(len(df)) |
|
|
| results = [] |
|
|
| for idx, row in df.iterrows(): |
| id_val = row["id"] |
| problem_text = row["problem"] |
|
|
| |
| id_series = pl.Series([id_val]) |
| problem_series = pl.Series([problem_text]) |
|
|
| try: |
| pred_df = predict(id_series, problem_series) |
|
|
| if isinstance(pred_df, pl.DataFrame): |
| pred = pred_df.to_pandas() |
| else: |
| pred = pred_df |
|
|
| results.append(pred.iloc[0]) |
|
|
| except Exception as e: |
| print(f"Error at row {idx}: {e}") |
| results.append({"id": id_val, "answer": 29443}) |
|
|
| final_df = pd.DataFrame(results) |
| final_df.to_csv(output_path, index=False) |
|
|
| print(f"✅ Saved predictions to {output_path}") |
| return final_df |
|
|
| run_local_inference("/content/AIMO_ReferenceProblems.xlsx") |