agllm2-dev / generate_usa_ipm_info.py
arbabarshad's picture
Add SLF reporting sources to knowledge base, increase retriever k to 8
e96f60e
"""
Generate USA IPM Information using GPT-4o
This script queries GPT-4o for each species to:
1. Verify if it's present in the United States
2. If yes, generate IPM/management info as a single text block
Uses the parallel processor for batch processing.
"""
import os
import json
import pandas as pd
from dotenv import load_dotenv
import asyncio
from datetime import datetime
# Load environment variables
load_dotenv()
# --- Configuration ---
EXCEL_FILE_PATH = "species-organized/PestID Species - Organized.xlsx"
REQUESTS_FILEPATH = "usa_ipm_requests.jsonl"
RESULTS_FILEPATH = "usa_ipm_results.jsonl"
CHECKPOINT_FILEPATH = "usa_ipm_checkpoint.json"
def get_all_unique_species() -> list[dict]:
"""Load all unique species from Midwest USA + Africa + India sheets."""
# Read all sheets
df_midwest = pd.read_excel(EXCEL_FILE_PATH, sheet_name="Midwest USA")
df_africa = pd.read_excel(EXCEL_FILE_PATH, sheet_name="Africa")
df_india = pd.read_excel(EXCEL_FILE_PATH, sheet_name="India")
# Create a unified list of species with their common names and tags
species_list = []
seen_species = set()
# Process each dataframe
for df, source in [(df_midwest, "Midwest USA"), (df_africa, "Africa"), (df_india, "India")]:
for _, row in df.iterrows():
species = row.get("Species")
if pd.notna(species) and str(species).strip().lower() not in seen_species:
seen_species.add(str(species).strip().lower())
species_list.append({
"species": species,
"common_name": row.get("Common Name", ""),
"tag": row.get("Tag", ""),
"source_region": source
})
print(f"Found {len(species_list)} unique species")
return species_list
def create_gpt_prompt(species: str, common_name: str, tag: str) -> str:
"""Create the prompt for GPT to generate IPM info."""
prompt = f"""You are an agricultural expert. Analyze the following species and determine if it is present in the United States.
Species: {species}
Common Name: {common_name}
Type: {tag}
Please provide:
1. Whether this species is present/found in the United States (yes/no)
2. Your confidence level (high, medium, low)
3. Brief reasoning
If it IS present in the USA, provide comprehensive management/IPM information including:
- Role in agriculture (pest, beneficial, weed, etc.)
- Damage caused (if applicable) or benefits provided
- Cultural control/management practices
- Biological control options or natural enemies
- Chemical control options (if applicable)
- Any other relevant management information
Combine all the management information into a single comprehensive text block. Format with clear sections and bullet points for readability."""
return prompt
def create_requests_jsonl(species_list: list[dict], output_path: str, checkpoint: dict = None):
"""Create JSONL file with API requests for the parallel processor."""
# Load checkpoint to skip already processed species
processed_species = set()
if checkpoint:
processed_species = set(checkpoint.get("processed", []))
# Simplified JSON schema - just need presence check and single info field
json_schema = {
"name": "usa_species_response",
"strict": True,
"schema": {
"type": "object",
"properties": {
"is_present_in_usa": {
"type": "boolean",
"description": "Whether this species is present/found in the United States"
},
"confidence": {
"type": "string",
"enum": ["high", "medium", "low"],
"description": "Confidence level of the determination"
},
"reasoning": {
"type": "string",
"description": "Brief explanation for the determination"
},
"ipm_info": {
"type": "string",
"description": "Comprehensive IPM/management information as a single text block. Empty string if not present in USA."
}
},
"required": ["is_present_in_usa", "confidence", "reasoning", "ipm_info"],
"additionalProperties": False
}
}
requests = []
for item in species_list:
species = item["species"]
# Skip if already processed
if species in processed_species:
print(f"Skipping already processed: {species}")
continue
prompt = create_gpt_prompt(species, item["common_name"], item["tag"])
request = {
"model": "gpt-4o-2024-08-06",
"messages": [
{
"role": "system",
"content": "You are an expert agricultural scientist with deep knowledge of species found in the United States and their management strategies."
},
{
"role": "user",
"content": prompt
}
],
"response_format": {
"type": "json_schema",
"json_schema": json_schema
},
"temperature": 0.3,
"max_tokens": 2000,
"metadata": {
"species": species,
"common_name": item["common_name"],
"tag": item["tag"],
"source_region": item["source_region"]
}
}
requests.append(request)
# Write to JSONL
with open(output_path, "w") as f:
for request in requests:
f.write(json.dumps(request) + "\n")
print(f"Created {len(requests)} requests in {output_path}")
return len(requests)
def load_checkpoint() -> dict:
"""Load checkpoint file if it exists."""
if os.path.exists(CHECKPOINT_FILEPATH):
with open(CHECKPOINT_FILEPATH, "r") as f:
return json.load(f)
return {"processed": [], "last_run": None}
def save_checkpoint(checkpoint: dict):
"""Save checkpoint to file."""
checkpoint["last_run"] = datetime.now().isoformat()
with open(CHECKPOINT_FILEPATH, "w") as f:
json.dump(checkpoint, f, indent=2)
def parse_results_and_create_excel(results_path: str, output_excel_path: str):
"""Parse the results JSONL and create the USA sheet in Excel."""
results = []
with open(results_path, "r") as f:
for line in f:
try:
data = json.loads(line)
request = data[0]
response = data[1]
metadata = data[2] if len(data) > 2 else request.get("metadata", {})
# Extract the response content
if "choices" in response:
content = response["choices"][0]["message"]["content"]
parsed = json.loads(content)
# Include if present in USA (regardless of pest/beneficial status)
if parsed.get("is_present_in_usa", False):
ipm_info = parsed.get("ipm_info", "")
results.append({
"Common Name": metadata.get("common_name", ""),
"Species": metadata.get("species", ""),
"Tag": metadata.get("tag", ""),
"Accuracy": f"LLM-generated ({parsed.get('confidence', 'medium')})",
"IPM Info": ipm_info
})
else:
print(f"Excluded (not in USA): {metadata.get('species', 'unknown')} - {parsed.get('reasoning', '')[:100]}")
except (json.JSONDecodeError, KeyError, IndexError) as e:
print(f"Error parsing result: {e}")
continue
# Create DataFrame — deduplicate by species name, case-insensitively
# (results JSONL may accumulate across runs AND across capitalization variants)
df = pd.DataFrame(results)
before = len(df)
df['_species_lower'] = df['Species'].str.strip().str.lower()
df = df.drop_duplicates(subset=['_species_lower'], keep='last')
df = df.drop(columns=['_species_lower'])
if before != len(df):
print(f"Deduplicated {before - len(df)} entries (multiple runs + capitalization variants)")
print(f"\nCreated DataFrame with {len(df)} US species")
print(df.head())
# Load existing Excel and add/update USA sheet
excel_path = EXCEL_FILE_PATH
# Read existing sheets
with pd.ExcelFile(excel_path) as xls:
existing_sheets = {sheet: pd.read_excel(xls, sheet_name=sheet) for sheet in xls.sheet_names}
# Add or update USA sheet
existing_sheets["USA"] = df
# Write back to Excel
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
for sheet_name, sheet_df in existing_sheets.items():
sheet_df.to_excel(writer, sheet_name=sheet_name, index=False)
print(f"\nUpdated Excel file with USA sheet: {excel_path}")
return df
async def run_parallel_processor():
"""Run the parallel processor to make API calls."""
from api_request_parallel_processor_universal import process_api_requests_from_file
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
await process_api_requests_from_file(
vendor_name="openai",
requests_filepath=REQUESTS_FILEPATH,
save_filepath=RESULTS_FILEPATH,
request_url="https://api.openai.com/v1/chat/completions",
api_key=api_key,
max_requests_per_minute=500, # Conservative rate
max_tokens_per_minute=150000,
token_encoding_name="cl100k_base",
max_attempts=5,
logging_level=20 # INFO level
)
def main():
"""Main execution flow."""
import argparse
parser = argparse.ArgumentParser(description="Generate USA IPM info using GPT-4o")
parser.add_argument("--step", choices=["prepare", "process", "parse", "all"], default="all",
help="Which step to run: prepare (create requests), process (call API), parse (create Excel), or all")
parser.add_argument("--force", action="store_true", help="Force regeneration, ignore checkpoint")
args = parser.parse_args()
checkpoint = {} if args.force else load_checkpoint()
if args.step in ["prepare", "all"]:
print("\n=== Step 1: Preparing requests ===")
species_list = get_all_unique_species()
num_requests = create_requests_jsonl(species_list, REQUESTS_FILEPATH, checkpoint)
if num_requests == 0:
print("No new species to process!")
if args.step == "all":
args.step = "parse" # Skip to parsing if no new requests
if args.step in ["process", "all"]:
print("\n=== Step 2: Processing API requests ===")
# Clear results file before a new run to prevent duplicate accumulation
if os.path.exists(RESULTS_FILEPATH) and args.force:
os.remove(RESULTS_FILEPATH)
print(f"Cleared {RESULTS_FILEPATH} (--force run)")
if os.path.exists(REQUESTS_FILEPATH):
# Check if there are requests to process
with open(REQUESTS_FILEPATH, "r") as f:
if f.read().strip():
asyncio.run(run_parallel_processor())
# Update checkpoint with processed species
with open(RESULTS_FILEPATH, "r") as rf:
for line in rf:
try:
data = json.loads(line)
metadata = data[2] if len(data) > 2 else data[0].get("metadata", {})
species = metadata.get("species")
if species:
checkpoint.setdefault("processed", []).append(species)
except:
pass
save_checkpoint(checkpoint)
else:
print("No requests to process")
else:
print("No requests file found. Run with --step prepare first.")
if args.step in ["parse", "all"]:
print("\n=== Step 3: Parsing results and creating Excel ===")
if os.path.exists(RESULTS_FILEPATH):
df = parse_results_and_create_excel(RESULTS_FILEPATH, EXCEL_FILE_PATH)
print(f"\nSuccess! Created USA sheet with {len(df)} species.")
else:
print("No results file found. Run with --step process first.")
if __name__ == "__main__":
main()