| """ |
| GIS-Coder Evaluation Script |
| ============================ |
| Tests the fine-tuned model on GIS code generation tasks. |
| Compares fine-tuned model vs base model. |
| |
| Usage: |
| python evaluate.py --adapter_id RhodWeo/GIS-Coder-7B |
| python evaluate.py --adapter_id ./gis-coder-7b-output/final --compare_base |
| """ |
|
|
| import argparse |
| import torch |
| import time |
| import json |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| from peft import PeftModel |
|
|
| SYSTEM_PROMPT = ( |
| "You are GIS-Coder, an expert Python programmer specializing in Geographic Information Systems (GIS) " |
| "and geospatial analysis. You write clean, efficient, well-documented Python code using libraries like " |
| "GeoPandas, Shapely, Rasterio, GDAL, PyProj, OSMNX, Folium, H3, Fiona, xarray, and MovingPandas. " |
| "You always explain your reasoning step-by-step before providing code, and you handle edge cases." |
| ) |
|
|
| EVAL_PROMPTS = [ |
| |
| {"category": "OSMnx", "difficulty": "medium", |
| "prompt": "Write a Python function using OSMnx to download the street network of a city and find all dead-end streets (nodes with degree 1)."}, |
| {"category": "OSMnx", "difficulty": "hard", |
| "prompt": "Use OSMnx to create a 10-minute walking isochrone from a given point, then find all restaurants within that isochrone using OSM POI data."}, |
| |
| |
| {"category": "Rasterio", "difficulty": "medium", |
| "prompt": "Write a function to calculate slope and aspect from a DEM raster file using rasterio and numpy."}, |
| {"category": "Rasterio", "difficulty": "hard", |
| "prompt": "Write a function that takes a multispectral satellite image and computes NDVI, NDWI, and NDBI indices, saving each as a separate GeoTIFF with proper nodata handling."}, |
| |
| |
| {"category": "MovingPandas", "difficulty": "medium", |
| "prompt": "Use MovingPandas to create trajectories from GPS tracking data, compute speed along each trajectory, and detect stops where the object was stationary for more than 5 minutes."}, |
| |
| |
| {"category": "GeoPandas", "difficulty": "easy", |
| "prompt": "Perform a spatial join between a points GeoDataFrame and a polygons GeoDataFrame to count how many points fall within each polygon."}, |
| {"category": "GeoPandas", "difficulty": "medium", |
| "prompt": "Write a function that reads a shapefile, buffers all geometries by 500 meters, dissolves overlapping buffers, and calculates the total area covered in square kilometers."}, |
| |
| |
| {"category": "PyProj", "difficulty": "medium", |
| "prompt": "Write a function that takes two GeoDataFrames with potentially different CRS, automatically detects the mismatch, reprojects them to a common CRS, and computes their spatial intersection."}, |
| |
| |
| {"category": "Multi-library", "difficulty": "hard", |
| "prompt": "Write a Python script that downloads building footprints from OpenStreetMap using OSMnx, projects them to UTM for accurate area calculation, filters buildings larger than 500 sq meters, and creates an interactive Folium choropleth map colored by building area."}, |
| {"category": "Multi-library", "difficulty": "hard", |
| "prompt": "Create a complete site suitability analysis for wind turbine placement: compute slope from DEM, extract distance to roads, buffer protected areas, and combine with a weighted overlay to produce a suitability score raster."}, |
| |
| |
| {"category": "H3", "difficulty": "medium", |
| "prompt": "Use H3 hexagonal indexing to aggregate 10,000 crime incident points into resolution-9 hexagonal bins, compute density per square kilometer, and identify the top 10 hotspot hexagons."}, |
| |
| |
| {"category": "Folium", "difficulty": "medium", |
| "prompt": "Create an interactive Folium map with three layers: clustered markers for schools, a heatmap for population density, and a choropleth of median income by census tract. Add layer controls."}, |
| ] |
|
|
|
|
| def load_model(base_model_id, adapter_id=None, quantize=True): |
| """Load model with optional LoRA adapter.""" |
| model_kwargs = {"trust_remote_code": True, "attn_implementation": "eager"} |
| |
| if quantize and torch.cuda.is_available(): |
| from transformers import BitsAndBytesConfig |
| model_kwargs["quantization_config"] = BitsAndBytesConfig( |
| load_in_4bit=True, bnb_4bit_quant_type="nf4", |
| bnb_4bit_compute_dtype=torch.bfloat16, |
| ) |
| model_kwargs["dtype"] = torch.bfloat16 |
| else: |
| model_kwargs["dtype"] = torch.float32 |
| |
| model = AutoModelForCausalLM.from_pretrained( |
| base_model_id, device_map="auto" if torch.cuda.is_available() else None, |
| **model_kwargs, |
| ) |
| tokenizer = AutoTokenizer.from_pretrained(base_model_id, trust_remote_code=True) |
| |
| if adapter_id: |
| model = PeftModel.from_pretrained(model, adapter_id) |
| print(f"Loaded adapter: {adapter_id}") |
| |
| model.eval() |
| return model, tokenizer |
|
|
|
|
| def generate(model, tokenizer, prompt, max_new_tokens=768): |
| """Generate response.""" |
| messages = [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": prompt}, |
| ] |
| text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=2048) |
| if torch.cuda.is_available(): |
| inputs = {k: v.cuda() for k, v in inputs.items()} |
| |
| t0 = time.time() |
| with torch.no_grad(): |
| out = model.generate( |
| **inputs, max_new_tokens=max_new_tokens, |
| temperature=0.7, top_p=0.9, do_sample=True, repetition_penalty=1.1, |
| ) |
| gen_time = time.time() - t0 |
| response = tokenizer.decode(out[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True) |
| return response, gen_time |
|
|
|
|
| def score_response(response, category): |
| """Score response quality on multiple dimensions.""" |
| r = response.lower() |
| scores = { |
| "has_code": 1 if ("```python" in response or "import " in response) else 0, |
| "has_function": 1 if "def " in response else 0, |
| "has_gis_lib": 1 if any(lib in r for lib in [ |
| "geopandas", "gpd.", "shapely", "rasterio", "osmnx", "ox.", "gdal", |
| "pyproj", "folium", "h3.", "fiona", "xarray", "movingpandas", "mpd." |
| ]) else 0, |
| "has_cot": 1 if any(p in r for p in [ |
| "step by step", "first", "then", "i need to", "let me think" |
| ]) else 0, |
| "has_docstring": 1 if '"""' in response or "'''" in response else 0, |
| "has_error_handling": 1 if any(k in r for k in ["try:", "except", "raise", "if.*none", "is_valid"]) else 0, |
| "has_crs_handling": 1 if any(k in r for k in ["crs", "epsg", "to_crs", "reproject"]) else 0, |
| } |
| |
| |
| cat_libs = { |
| "OSMnx": ["osmnx", "ox."], |
| "Rasterio": ["rasterio"], |
| "MovingPandas": ["movingpandas", "mpd."], |
| "GeoPandas": ["geopandas", "gpd."], |
| "PyProj": ["pyproj", "crs", "epsg"], |
| "H3": ["h3."], |
| "Folium": ["folium"], |
| } |
| if category in cat_libs: |
| scores["correct_library"] = 1 if any(lib in r for lib in cat_libs[category]) else 0 |
| else: |
| scores["correct_library"] = scores["has_gis_lib"] |
| |
| scores["total"] = sum(scores.values()) |
| scores["max_possible"] = len(scores) - 1 |
| scores["pct"] = scores["total"] / scores["max_possible"] * 100 |
| return scores |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--base_model_id", default="Qwen/Qwen2.5-Coder-7B-Instruct") |
| parser.add_argument("--adapter_id", default="RhodWeo/GIS-Coder-7B") |
| parser.add_argument("--compare_base", action="store_true", help="Also evaluate base model for comparison") |
| parser.add_argument("--max_new_tokens", type=int, default=768) |
| parser.add_argument("--output", type=str, default="eval_results.json") |
| parser.add_argument("--no_quantize", action="store_true") |
| args = parser.parse_args() |
| |
| results = {"fine_tuned": [], "base": []} |
| |
| |
| print(f"\n{'='*60}") |
| print(f"EVALUATING: {args.base_model_id} + {args.adapter_id}") |
| print(f"{'='*60}") |
| |
| model, tokenizer = load_model(args.base_model_id, args.adapter_id, not args.no_quantize) |
| |
| for i, item in enumerate(EVAL_PROMPTS): |
| print(f"\n[{i+1}/{len(EVAL_PROMPTS)}] {item['category']} ({item['difficulty']})") |
| print(f" Prompt: {item['prompt'][:80]}...") |
| |
| response, gen_time = generate(model, tokenizer, item["prompt"], args.max_new_tokens) |
| scores = score_response(response, item["category"]) |
| |
| print(f" Time: {gen_time:.1f}s | Score: {scores['pct']:.0f}% | Code: {'✓' if scores['has_code'] else '✗'} | " |
| f"Lib: {'✓' if scores['correct_library'] else '✗'} | CoT: {'✓' if scores['has_cot'] else '✗'}") |
| |
| results["fine_tuned"].append({ |
| **item, "response": response, "gen_time": gen_time, "scores": scores, |
| }) |
| |
| |
| ft_scores = [r["scores"]["pct"] for r in results["fine_tuned"]] |
| print(f"\n{'='*60}") |
| print(f"FINE-TUNED MODEL SUMMARY") |
| print(f" Avg score: {sum(ft_scores)/len(ft_scores):.1f}%") |
| print(f" Code blocks: {sum(r['scores']['has_code'] for r in results['fine_tuned'])}/{len(EVAL_PROMPTS)}") |
| print(f" Correct library: {sum(r['scores']['correct_library'] for r in results['fine_tuned'])}/{len(EVAL_PROMPTS)}") |
| print(f" Chain-of-thought: {sum(r['scores']['has_cot'] for r in results['fine_tuned'])}/{len(EVAL_PROMPTS)}") |
| |
| by_cat = {} |
| for r in results["fine_tuned"]: |
| cat = r["category"] |
| by_cat.setdefault(cat, []).append(r["scores"]["pct"]) |
| print(f"\n By category:") |
| for cat, scores in sorted(by_cat.items()): |
| print(f" {cat}: {sum(scores)/len(scores):.0f}%") |
| |
| |
| if args.compare_base: |
| print(f"\n{'='*60}") |
| print(f"EVALUATING BASE: {args.base_model_id}") |
| print(f"{'='*60}") |
| |
| del model |
| torch.cuda.empty_cache() if torch.cuda.is_available() else None |
| |
| base_model, _ = load_model(args.base_model_id, None, not args.no_quantize) |
| |
| for i, item in enumerate(EVAL_PROMPTS): |
| response, gen_time = generate(base_model, tokenizer, item["prompt"], args.max_new_tokens) |
| scores = score_response(response, item["category"]) |
| results["base"].append({**item, "response": response, "gen_time": gen_time, "scores": scores}) |
| |
| base_scores = [r["scores"]["pct"] for r in results["base"]] |
| print(f"\n BASE: {sum(base_scores)/len(base_scores):.1f}% vs FINE-TUNED: {sum(ft_scores)/len(ft_scores):.1f}%") |
| print(f" Improvement: +{sum(ft_scores)/len(ft_scores) - sum(base_scores)/len(base_scores):.1f}pp") |
| |
| |
| with open(args.output, "w") as f: |
| json.dump(results, f, indent=2, default=str) |
| print(f"\nResults saved to {args.output}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|