kwik-ai / functions.py
autoahxan's picture
Upload folder using huggingface_hub
d51140f verified
from dotenv import load_dotenv
load_dotenv()
import os
import json
import pandas as pd
from datetime import datetime
import openpyxl
from openpyxl.utils import get_column_letter
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
# Set up global paths and history file
OUTPUT_DIR = "./data/outputs"
HISTORY_FILE = "./data/history.json"
os.makedirs(OUTPUT_DIR, exist_ok=True)
if not os.path.exists("./data"):
os.makedirs("./data", exist_ok=True)
# Load history if exists, otherwise initialize an empty dictionary
if os.path.exists(HISTORY_FILE):
with open(HISTORY_FILE, "r") as f:
history = json.load(f)
else:
history = {}
# Import the OpenAI library.
import openai
client = openai
def run_openai_inference(prompt: str, passage: str, model: str):
passage_prompt = f"""
Here is the passage you need to analyze:
<passage>
{passage}
</passage>
"""
system_prompt = f"{prompt}\n\n{passage_prompt}"
# print(f"passage_prompt: {system_prompt}", end="\n\n")
format = """
Based on the identified type, extract and return the following data:
- score
**Output format:**
{ "score": "return numeric score here", "reason": "return a short one liner reason for your score here" }
"""
# print(f"system prompt is: {system_prompt}")
completion = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": format},
]
)
# print(completion.choices[0].message.content)
try:
score = int(eval(completion.choices[0].message.content)['score'])
reason = eval(completion.choices[0].message.content)['reason']
except:
score = None
reason = None
return score, reason
def process_dataframe(df, prompt: str, model: str, max_iterations: int = 5):
print("Starting process_dataframe function...")
# Create a new column 'passage' by combining existing columns.
print("Creating 'passage' column...")
df['passage'] = (
df['LeftContext'].astype(str) +
" <expression>" + df['Keyword'].astype(str) + "</expression> " +
df['RightContext'].astype(str)
)
tasks = []
indices = [] # store row indices corresponding to scheduled tasks
print(f"Iterating over rows in random order...")
# Iterate over rows in random order
for idx, row in df.sample(frac=1.0).iterrows():
if len(tasks) >= max_iterations:
print(f"Max iterations reached: {max_iterations}")
break
if pd.isna(row['Category']):
print(f"Skipping row {idx} due to missing 'Category'")
continue
indices.append(idx)
print(f"Scheduling task for row {idx} with passage: {row['passage']}")
tasks.append(run_openai_inference(prompt, row['passage'], model))
# Run the inference and collect results
print(f"Running inference for {len(tasks)} tasks...")
results = [task for task in tasks] # You can replace this line with asyncio.gather to run tasks concurrently
# Assign the results to the corresponding rows in the DataFrame
print(f"Assigning results to corresponding rows...")
for i, idx in enumerate(indices):
score, reason = results[i]
print(f"Row {idx}: Assigned score: {score}, reason: {reason}")
df.at[idx, 'Prediction'] = score
df.at[idx, 'Prediction Reason'] = reason
# Remove rows with missing predictions
print(f"Dropping rows with missing predictions...")
df_out = df.dropna(subset=['Prediction'])
print("Finished processing dataframe.")
return df_out
def evaluate_dataframe(y_true, y_pred):
try:
accuracy = accuracy_score(y_true, y_pred)
except:
accuracy = None
try:
precision = precision_score(y_true, y_pred, average='binary')
except:
precision = None
try:
recall = recall_score(y_true, y_pred, average='binary')
except:
recall = None
try:
f1 = f1_score(y_true, y_pred, average='binary')
except:
f1 = None
try:
cm = confusion_matrix(y_true, y_pred)
conf_matrix = cm.tolist() # convert to list for easier JSON serialization
except:
conf_matrix = None
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1": f1,
"conf_matrix": conf_matrix
}
def save_results(df_out, prompt, model):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(OUTPUT_DIR, f"{timestamp}_processed.xlsx")
# Create a DataFrame for the prompt/model info
df_prompt = pd.DataFrame({"Prompt": [prompt], "Model": [model]})
# Write the outputs and inputs to separate sheets using Pandas ExcelWriter
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
df_out.to_excel(writer, sheet_name="Outputs", index=False)
df_prompt.to_excel(writer, sheet_name="Inputs", index=False)
# No need to call writer.save() here—the context manager handles it.
# Adjust column widths in both sheets using openpyxl
wb = openpyxl.load_workbook(output_file)
for sheet_name, df in [("Outputs", df_out), ("Inputs", df_prompt)]:
ws = wb[sheet_name]
for idx, col in enumerate(df.columns, 1):
max_length = max((len(str(cell)) for cell in df[col].values), default=0)
max_length = max(max_length, len(col)) + 2
col_letter = get_column_letter(idx)
ws.column_dimensions[col_letter].width = max_length
wb.save(output_file)
# Update history with run details
history[timestamp] = {
"file": output_file,
"prompt": prompt,
"model": model,
"score": df_out['Prediction'].mean() if not df_out['Prediction'].empty else None,
"samples": len(df_out)
}
with open(HISTORY_FILE, "w") as f:
json.dump(history, f, indent=4)
return output_file
def list_previous_files():
# Return the history of processed files as a dictionary
return history