import os import hydra import aiflows from aiflows.backends.api_info import ApiInfo from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys from aiflows import logging from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache from aiflows.utils import serving from aiflows.workers import run_dispatch_worker_thread from aiflows.messages import FlowMessage from aiflows.interfaces import KeyInterface from aiflows.utils.colink_utils import start_colink_server from aiflows import flow_verse import pandas as pd import sys from copy import deepcopy import requests import time dependencies = [ { "url": "aiflows/FunSearchFlowModule", "revision": "../FunSearchFlowModule" } ] flow_verse.sync_dependencies(dependencies) from flow_modules.aiflows.FunSearchFlowModule.Loader import Loader logging.set_verbosity_debug() def load_problem(id, ds_location = "./data/codeforces.jsonl.gz"): def make_problem_descriptions_str(row): def write_public_tests_individual_io_str(row): public_tests = row.public_tests_individual_io tests = "" for i,test in enumerate(public_tests): input = test[0] output = test[1] tests += f"Test {i+1}:\n Input: {input}\n Output: \'{output}\'\n" return tests problem_descritption = row.problem_description input_descriptions = row.input_description ouput_descriptions = row.output_description public_tests = write_public_tests_individual_io_str(row) problem_description_str = f"Problem Description:\n{problem_descritption}\n\n" input_description_str = f"Input Description:\n{input_descriptions}\n\n" output_description_str = f"Output Description:\n{ouput_descriptions}\n\n" public_tests_str = f"Public Tests:\n{public_tests}\n" final_str = problem_description_str + input_description_str + output_description_str +public_tests_str return final_str df = pd.read_json(ds_location, lines=True, compression='gzip') row = df[df.id == id].iloc[0] assert row.non_unique_output == False, "Problem has non unique output. Not supported yet" problem_description = make_problem_descriptions_str(row) public_test = row.public_tests_individual_io tests = {} test_counter = 1 for public_test in public_test: tests["test_"+str(test_counter)] = {"tests_inputs": public_test[0], "expected_outputs": public_test[1]} test_counter += 1 for hidden_test in row.hidden_tests_io: tests["test_"+str(test_counter)] = {"tests_inputs": hidden_test[0], "expected_outputs": hidden_test[1]} test_counter += 1 return tests, problem_description def download_codeforces_data(data_folder_path,file_name): print("Downloading data....") os.makedirs(data_folder_path, exist_ok=True) url = "https://github.com/epfl-dlab/cc_flows/raw/main/data/codeforces/codeforces.jsonl.gz" response = requests.get(url, stream=True) if response.status_code == 200: with open(os.path.join(data_folder_path,file_name), 'wb') as file: for chunk in response: file.write(chunk) print("Download complete") else: print("Failed to download data", response.status_code) def get_configs(problem_id, ds_location = "./data/codeforces.jsonl.gz"): tests, problem_description = load_problem(problem_id,ds_location) path = os.path.join(".", "demo.yaml") funsearch_cfg = read_yaml_file(path) evaluate_function_file_path: str = "./cf_functions.py" evaluate_function_name: str = "evaluate" evolve_function_name:str = "solve_function" loader = Loader(file_path = evaluate_function_file_path, target_name = evaluate_function_name) evaluate_function: str= loader.load_target() evaluate_file_full_content = loader.load_full_file() evaluate_file_full_content = f"\"\"\"{problem_description}\"\"\"\n\n" + evaluate_file_full_content #~~~~~ ProgramDBFlow Overrides ~~~~~~~~ funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_function"] = evaluate_function funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_file_full_content"] = evaluate_file_full_content funsearch_cfg["subflows_config"]["ProgramDBFlow"]["artifact_to_evolve_name"] = evolve_function_name if len(tests) > 0: first_test = tests["test_1"] dummy_solution = f"def {evolve_function_name}(input) -> str:" +\ "\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\ f"\n return \'{first_test['expected_outputs']}\'\n" else: dummy_solution = f"def {evolve_function_name}(input) -> str:" +\ "\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\ f"\n return 0.0\"\"\n" #~~~~~~~~~~Evaluator overrides~~~~~~~~~~~~ funsearch_cfg["subflows_config"]["EvaluatorFlow"]["py_file"] = evaluate_file_full_content funsearch_cfg["subflows_config"]["EvaluatorFlow"]["run_error_score"] = -1 funsearch_cfg["subflows_config"]["EvaluatorFlow"]["function_to_run_name"] = evaluate_function_name funsearch_cfg["subflows_config"]["EvaluatorFlow"]["test_inputs"] = tests #Hides test inputs from LLM (necessary for hidden tests. Makes same setup as in a real contest.) funsearch_cfg["subflows_config"]["EvaluatorFlow"]["use_test_input_as_key"] = False #~~~~~~~~~~Sampler overrides~~~~~~~~~~~~ funsearch_cfg["subflows_config"]["SamplerFlow"]["system_message_prompt_template"]["partial_variables"] = \ { "evaluate_name": evaluate_function_name, "evolve_name": evolve_function_name, "artifacts_per_prompt": 2 } return funsearch_cfg, dummy_solution FLOW_MODULES_PATH = "./" if __name__ == "__main__": cl = start_colink_server() problem_id = "1789B" #put the problem id here if not os.path.exists("./data/codeforces.jsonl.gz"): download_codeforces_data("./data", "codeforces.jsonl.gz") funsearch_cfg, dummy_solution = get_configs(problem_id) #Serve Program Database and get its flow type explicitly api_information = [ApiInfo(backend_used="openai", api_key = os.getenv("OPENAI_API_KEY"))] serving.recursive_serve_flow( cl=cl, flow_class_name="flow_modules.aiflows.FunSearchFlowModule.FunSearch", flow_endpoint="FunSearch", ) # #Serve the rest # serving.recursive_serve_flow( # cl=cl, # flow_type="FunSearch_served", # default_config=funsearch_cfg, # default_state=None, # default_dispatch_point="coflows_dispatch", # ) n_workers = 10 for i in range(n_workers): run_dispatch_worker_thread(cl) quick_load_api_keys(funsearch_cfg, api_information, key="api_infos") config_overrides = None #Mount ProgramDBFlow first to get it's flow ref funsearch_proxy = serving.get_flow_instance( cl=cl, flow_endpoint="FunSearch", config_overrides=funsearch_cfg, ) data = { "from": "SamplerFlow", "operation": "register_program", "api_output": dummy_solution } input_message = funsearch_proxy.package_input_message(data = data) funsearch_proxy.send_message(input_message) data = { "from": "FunSearch", "operation": "start", "content": {"num_samplers": 5}, } input_message = funsearch_proxy.package_input_message(data = data) funsearch_proxy.send_message(input_message) data = { "from": "FunSearch", "operation": "stop", "content": {}, } input_message = funsearch_proxy.package_input_message(data = data) funsearch_proxy.send_message(input_message) wait_time = 1000 print(f"Waiting {wait_time} seconds before requesting result...") time.sleep(wait_time) data = { "from": "FunSearch", "operation": "get_best_programs_per_island", "content": {} } input_message = funsearch_proxy.package_input_message(data = data) future = funsearch_proxy.get_reply_future(input_message) print("waiting for response....") response = future.get_data() print(response)