| from dotenv import load_dotenv |
| load_dotenv() |
|
|
| import os |
| import json |
| import pandas as pd |
| from datetime import datetime |
| import openpyxl |
| from openpyxl.utils import get_column_letter |
| from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix |
|
|
| |
| OUTPUT_DIR = "./data/outputs" |
| HISTORY_FILE = "./data/history.json" |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
| if not os.path.exists("./data"): |
| os.makedirs("./data", exist_ok=True) |
|
|
| |
| if os.path.exists(HISTORY_FILE): |
| with open(HISTORY_FILE, "r") as f: |
| history = json.load(f) |
| else: |
| history = {} |
|
|
| |
| import openai |
| client = openai |
|
|
| def run_openai_inference(prompt: str, passage: str, model: str): |
| passage_prompt = f""" |
| Here is the passage you need to analyze: |
| <passage> |
| {passage} |
| </passage> |
| """ |
|
|
|
|
| system_prompt = f"{prompt}\n\n{passage_prompt}" |
|
|
| |
|
|
| format = """ |
| Based on the identified type, extract and return the following data: |
| - score |
| **Output format:** |
| { "score": "return numeric score here", "reason": "return a short one liner reason for your score here" } |
| """ |
|
|
| |
|
|
| completion = client.chat.completions.create( |
| model=model, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": format}, |
| ] |
| ) |
|
|
| |
| try: |
| score = int(eval(completion.choices[0].message.content)['score']) |
| reason = eval(completion.choices[0].message.content)['reason'] |
| except: |
| score = None |
| reason = None |
|
|
| return score, reason |
|
|
| def process_dataframe(df, prompt: str, model: str, max_iterations: int = 5): |
| print("Starting process_dataframe function...") |
| |
| |
| print("Creating 'passage' column...") |
| df['passage'] = ( |
| df['LeftContext'].astype(str) + |
| " <expression>" + df['Keyword'].astype(str) + "</expression> " + |
| df['RightContext'].astype(str) |
| ) |
| |
| tasks = [] |
| indices = [] |
| print(f"Iterating over rows in random order...") |
| |
| |
| for idx, row in df.sample(frac=1.0).iterrows(): |
| if len(tasks) >= max_iterations: |
| print(f"Max iterations reached: {max_iterations}") |
| break |
| if pd.isna(row['Category']): |
| print(f"Skipping row {idx} due to missing 'Category'") |
| continue |
| indices.append(idx) |
| print(f"Scheduling task for row {idx} with passage: {row['passage']}") |
| tasks.append(run_openai_inference(prompt, row['passage'], model)) |
| |
| |
| print(f"Running inference for {len(tasks)} tasks...") |
| results = [task for task in tasks] |
| |
| |
| print(f"Assigning results to corresponding rows...") |
| for i, idx in enumerate(indices): |
| score, reason = results[i] |
| print(f"Row {idx}: Assigned score: {score}, reason: {reason}") |
| df.at[idx, 'Prediction'] = score |
| df.at[idx, 'Prediction Reason'] = reason |
| |
| |
| print(f"Dropping rows with missing predictions...") |
| df_out = df.dropna(subset=['Prediction']) |
| |
| print("Finished processing dataframe.") |
| return df_out |
|
|
|
|
| def evaluate_dataframe(y_true, y_pred): |
| try: |
| accuracy = accuracy_score(y_true, y_pred) |
| except: |
| accuracy = None |
| try: |
| precision = precision_score(y_true, y_pred, average='binary') |
| except: |
| precision = None |
| try: |
| recall = recall_score(y_true, y_pred, average='binary') |
| except: |
| recall = None |
| try: |
| f1 = f1_score(y_true, y_pred, average='binary') |
| except: |
| f1 = None |
| try: |
| cm = confusion_matrix(y_true, y_pred) |
| conf_matrix = cm.tolist() |
| except: |
| conf_matrix = None |
| return { |
| "accuracy": accuracy, |
| "precision": precision, |
| "recall": recall, |
| "f1": f1, |
| "conf_matrix": conf_matrix |
| } |
|
|
| def save_results(df_out, prompt, model): |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_file = os.path.join(OUTPUT_DIR, f"{timestamp}_processed.xlsx") |
| |
| df_prompt = pd.DataFrame({"Prompt": [prompt], "Model": [model]}) |
| |
| with pd.ExcelWriter(output_file, engine='openpyxl') as writer: |
| df_out.to_excel(writer, sheet_name="Outputs", index=False) |
| df_prompt.to_excel(writer, sheet_name="Inputs", index=False) |
| |
| |
| |
| wb = openpyxl.load_workbook(output_file) |
| for sheet_name, df in [("Outputs", df_out), ("Inputs", df_prompt)]: |
| ws = wb[sheet_name] |
| for idx, col in enumerate(df.columns, 1): |
| max_length = max((len(str(cell)) for cell in df[col].values), default=0) |
| max_length = max(max_length, len(col)) + 2 |
| col_letter = get_column_letter(idx) |
| ws.column_dimensions[col_letter].width = max_length |
| wb.save(output_file) |
| |
| |
| history[timestamp] = { |
| "file": output_file, |
| "prompt": prompt, |
| "model": model, |
| "score": df_out['Prediction'].mean() if not df_out['Prediction'].empty else None, |
| "samples": len(df_out) |
| } |
| with open(HISTORY_FILE, "w") as f: |
| json.dump(history, f, indent=4) |
| return output_file |
|
|
| def list_previous_files(): |
| |
| return history |
|
|