Spaces:
Sleeping
Sleeping
| """ | |
| Generate USA IPM Information using GPT-4o | |
| This script queries GPT-4o for each species to: | |
| 1. Verify if it's present in the United States | |
| 2. If yes, generate IPM/management info as a single text block | |
| Uses the parallel processor for batch processing. | |
| """ | |
| import os | |
| import json | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| import asyncio | |
| from datetime import datetime | |
| # Load environment variables | |
| load_dotenv() | |
| # --- Configuration --- | |
| EXCEL_FILE_PATH = "species-organized/PestID Species - Organized.xlsx" | |
| REQUESTS_FILEPATH = "usa_ipm_requests.jsonl" | |
| RESULTS_FILEPATH = "usa_ipm_results.jsonl" | |
| CHECKPOINT_FILEPATH = "usa_ipm_checkpoint.json" | |
| def get_all_unique_species() -> list[dict]: | |
| """Load all unique species from Midwest USA + Africa + India sheets.""" | |
| # Read all sheets | |
| df_midwest = pd.read_excel(EXCEL_FILE_PATH, sheet_name="Midwest USA") | |
| df_africa = pd.read_excel(EXCEL_FILE_PATH, sheet_name="Africa") | |
| df_india = pd.read_excel(EXCEL_FILE_PATH, sheet_name="India") | |
| # Create a unified list of species with their common names and tags | |
| species_list = [] | |
| seen_species = set() | |
| # Process each dataframe | |
| for df, source in [(df_midwest, "Midwest USA"), (df_africa, "Africa"), (df_india, "India")]: | |
| for _, row in df.iterrows(): | |
| species = row.get("Species") | |
| if pd.notna(species) and str(species).strip().lower() not in seen_species: | |
| seen_species.add(str(species).strip().lower()) | |
| species_list.append({ | |
| "species": species, | |
| "common_name": row.get("Common Name", ""), | |
| "tag": row.get("Tag", ""), | |
| "source_region": source | |
| }) | |
| print(f"Found {len(species_list)} unique species") | |
| return species_list | |
| def create_gpt_prompt(species: str, common_name: str, tag: str) -> str: | |
| """Create the prompt for GPT to generate IPM info.""" | |
| prompt = f"""You are an agricultural expert. Analyze the following species and determine if it is present in the United States. | |
| Species: {species} | |
| Common Name: {common_name} | |
| Type: {tag} | |
| Please provide: | |
| 1. Whether this species is present/found in the United States (yes/no) | |
| 2. Your confidence level (high, medium, low) | |
| 3. Brief reasoning | |
| If it IS present in the USA, provide comprehensive management/IPM information including: | |
| - Role in agriculture (pest, beneficial, weed, etc.) | |
| - Damage caused (if applicable) or benefits provided | |
| - Cultural control/management practices | |
| - Biological control options or natural enemies | |
| - Chemical control options (if applicable) | |
| - Any other relevant management information | |
| Combine all the management information into a single comprehensive text block. Format with clear sections and bullet points for readability.""" | |
| return prompt | |
| def create_requests_jsonl(species_list: list[dict], output_path: str, checkpoint: dict = None): | |
| """Create JSONL file with API requests for the parallel processor.""" | |
| # Load checkpoint to skip already processed species | |
| processed_species = set() | |
| if checkpoint: | |
| processed_species = set(checkpoint.get("processed", [])) | |
| # Simplified JSON schema - just need presence check and single info field | |
| json_schema = { | |
| "name": "usa_species_response", | |
| "strict": True, | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "is_present_in_usa": { | |
| "type": "boolean", | |
| "description": "Whether this species is present/found in the United States" | |
| }, | |
| "confidence": { | |
| "type": "string", | |
| "enum": ["high", "medium", "low"], | |
| "description": "Confidence level of the determination" | |
| }, | |
| "reasoning": { | |
| "type": "string", | |
| "description": "Brief explanation for the determination" | |
| }, | |
| "ipm_info": { | |
| "type": "string", | |
| "description": "Comprehensive IPM/management information as a single text block. Empty string if not present in USA." | |
| } | |
| }, | |
| "required": ["is_present_in_usa", "confidence", "reasoning", "ipm_info"], | |
| "additionalProperties": False | |
| } | |
| } | |
| requests = [] | |
| for item in species_list: | |
| species = item["species"] | |
| # Skip if already processed | |
| if species in processed_species: | |
| print(f"Skipping already processed: {species}") | |
| continue | |
| prompt = create_gpt_prompt(species, item["common_name"], item["tag"]) | |
| request = { | |
| "model": "gpt-4o-2024-08-06", | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You are an expert agricultural scientist with deep knowledge of species found in the United States and their management strategies." | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| "response_format": { | |
| "type": "json_schema", | |
| "json_schema": json_schema | |
| }, | |
| "temperature": 0.3, | |
| "max_tokens": 2000, | |
| "metadata": { | |
| "species": species, | |
| "common_name": item["common_name"], | |
| "tag": item["tag"], | |
| "source_region": item["source_region"] | |
| } | |
| } | |
| requests.append(request) | |
| # Write to JSONL | |
| with open(output_path, "w") as f: | |
| for request in requests: | |
| f.write(json.dumps(request) + "\n") | |
| print(f"Created {len(requests)} requests in {output_path}") | |
| return len(requests) | |
| def load_checkpoint() -> dict: | |
| """Load checkpoint file if it exists.""" | |
| if os.path.exists(CHECKPOINT_FILEPATH): | |
| with open(CHECKPOINT_FILEPATH, "r") as f: | |
| return json.load(f) | |
| return {"processed": [], "last_run": None} | |
| def save_checkpoint(checkpoint: dict): | |
| """Save checkpoint to file.""" | |
| checkpoint["last_run"] = datetime.now().isoformat() | |
| with open(CHECKPOINT_FILEPATH, "w") as f: | |
| json.dump(checkpoint, f, indent=2) | |
| def parse_results_and_create_excel(results_path: str, output_excel_path: str): | |
| """Parse the results JSONL and create the USA sheet in Excel.""" | |
| results = [] | |
| with open(results_path, "r") as f: | |
| for line in f: | |
| try: | |
| data = json.loads(line) | |
| request = data[0] | |
| response = data[1] | |
| metadata = data[2] if len(data) > 2 else request.get("metadata", {}) | |
| # Extract the response content | |
| if "choices" in response: | |
| content = response["choices"][0]["message"]["content"] | |
| parsed = json.loads(content) | |
| # Include if present in USA (regardless of pest/beneficial status) | |
| if parsed.get("is_present_in_usa", False): | |
| ipm_info = parsed.get("ipm_info", "") | |
| results.append({ | |
| "Common Name": metadata.get("common_name", ""), | |
| "Species": metadata.get("species", ""), | |
| "Tag": metadata.get("tag", ""), | |
| "Accuracy": f"LLM-generated ({parsed.get('confidence', 'medium')})", | |
| "IPM Info": ipm_info | |
| }) | |
| else: | |
| print(f"Excluded (not in USA): {metadata.get('species', 'unknown')} - {parsed.get('reasoning', '')[:100]}") | |
| except (json.JSONDecodeError, KeyError, IndexError) as e: | |
| print(f"Error parsing result: {e}") | |
| continue | |
| # Create DataFrame — deduplicate by species name, case-insensitively | |
| # (results JSONL may accumulate across runs AND across capitalization variants) | |
| df = pd.DataFrame(results) | |
| before = len(df) | |
| df['_species_lower'] = df['Species'].str.strip().str.lower() | |
| df = df.drop_duplicates(subset=['_species_lower'], keep='last') | |
| df = df.drop(columns=['_species_lower']) | |
| if before != len(df): | |
| print(f"Deduplicated {before - len(df)} entries (multiple runs + capitalization variants)") | |
| print(f"\nCreated DataFrame with {len(df)} US species") | |
| print(df.head()) | |
| # Load existing Excel and add/update USA sheet | |
| excel_path = EXCEL_FILE_PATH | |
| # Read existing sheets | |
| with pd.ExcelFile(excel_path) as xls: | |
| existing_sheets = {sheet: pd.read_excel(xls, sheet_name=sheet) for sheet in xls.sheet_names} | |
| # Add or update USA sheet | |
| existing_sheets["USA"] = df | |
| # Write back to Excel | |
| with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: | |
| for sheet_name, sheet_df in existing_sheets.items(): | |
| sheet_df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| print(f"\nUpdated Excel file with USA sheet: {excel_path}") | |
| return df | |
| async def run_parallel_processor(): | |
| """Run the parallel processor to make API calls.""" | |
| from api_request_parallel_processor_universal import process_api_requests_from_file | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OPENAI_API_KEY environment variable not set") | |
| await process_api_requests_from_file( | |
| vendor_name="openai", | |
| requests_filepath=REQUESTS_FILEPATH, | |
| save_filepath=RESULTS_FILEPATH, | |
| request_url="https://api.openai.com/v1/chat/completions", | |
| api_key=api_key, | |
| max_requests_per_minute=500, # Conservative rate | |
| max_tokens_per_minute=150000, | |
| token_encoding_name="cl100k_base", | |
| max_attempts=5, | |
| logging_level=20 # INFO level | |
| ) | |
| def main(): | |
| """Main execution flow.""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Generate USA IPM info using GPT-4o") | |
| parser.add_argument("--step", choices=["prepare", "process", "parse", "all"], default="all", | |
| help="Which step to run: prepare (create requests), process (call API), parse (create Excel), or all") | |
| parser.add_argument("--force", action="store_true", help="Force regeneration, ignore checkpoint") | |
| args = parser.parse_args() | |
| checkpoint = {} if args.force else load_checkpoint() | |
| if args.step in ["prepare", "all"]: | |
| print("\n=== Step 1: Preparing requests ===") | |
| species_list = get_all_unique_species() | |
| num_requests = create_requests_jsonl(species_list, REQUESTS_FILEPATH, checkpoint) | |
| if num_requests == 0: | |
| print("No new species to process!") | |
| if args.step == "all": | |
| args.step = "parse" # Skip to parsing if no new requests | |
| if args.step in ["process", "all"]: | |
| print("\n=== Step 2: Processing API requests ===") | |
| # Clear results file before a new run to prevent duplicate accumulation | |
| if os.path.exists(RESULTS_FILEPATH) and args.force: | |
| os.remove(RESULTS_FILEPATH) | |
| print(f"Cleared {RESULTS_FILEPATH} (--force run)") | |
| if os.path.exists(REQUESTS_FILEPATH): | |
| # Check if there are requests to process | |
| with open(REQUESTS_FILEPATH, "r") as f: | |
| if f.read().strip(): | |
| asyncio.run(run_parallel_processor()) | |
| # Update checkpoint with processed species | |
| with open(RESULTS_FILEPATH, "r") as rf: | |
| for line in rf: | |
| try: | |
| data = json.loads(line) | |
| metadata = data[2] if len(data) > 2 else data[0].get("metadata", {}) | |
| species = metadata.get("species") | |
| if species: | |
| checkpoint.setdefault("processed", []).append(species) | |
| except: | |
| pass | |
| save_checkpoint(checkpoint) | |
| else: | |
| print("No requests to process") | |
| else: | |
| print("No requests file found. Run with --step prepare first.") | |
| if args.step in ["parse", "all"]: | |
| print("\n=== Step 3: Parsing results and creating Excel ===") | |
| if os.path.exists(RESULTS_FILEPATH): | |
| df = parse_results_and_create_excel(RESULTS_FILEPATH, EXCEL_FILE_PATH) | |
| print(f"\nSuccess! Created USA sheet with {len(df)} species.") | |
| else: | |
| print("No results file found. Run with --step process first.") | |
| if __name__ == "__main__": | |
| main() | |