File size: 11,274 Bytes
838dbf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
GIS-Coder Evaluation Script
============================
Tests the fine-tuned model on GIS code generation tasks.
Compares fine-tuned model vs base model.

Usage:
  python evaluate.py --adapter_id RhodWeo/GIS-Coder-7B
  python evaluate.py --adapter_id ./gis-coder-7b-output/final --compare_base
"""

import argparse
import torch
import time
import json
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel

SYSTEM_PROMPT = (
    "You are GIS-Coder, an expert Python programmer specializing in Geographic Information Systems (GIS) "
    "and geospatial analysis. You write clean, efficient, well-documented Python code using libraries like "
    "GeoPandas, Shapely, Rasterio, GDAL, PyProj, OSMNX, Folium, H3, Fiona, xarray, and MovingPandas. "
    "You always explain your reasoning step-by-step before providing code, and you handle edge cases."
)

EVAL_PROMPTS = [
    # OSMNX (Tier 1 — models score 0%)
    {"category": "OSMnx", "difficulty": "medium",
     "prompt": "Write a Python function using OSMnx to download the street network of a city and find all dead-end streets (nodes with degree 1)."},
    {"category": "OSMnx", "difficulty": "hard",
     "prompt": "Use OSMnx to create a 10-minute walking isochrone from a given point, then find all restaurants within that isochrone using OSM POI data."},
    
    # Rasterio (Tier 1)
    {"category": "Rasterio", "difficulty": "medium",
     "prompt": "Write a function to calculate slope and aspect from a DEM raster file using rasterio and numpy."},
    {"category": "Rasterio", "difficulty": "hard",
     "prompt": "Write a function that takes a multispectral satellite image and computes NDVI, NDWI, and NDBI indices, saving each as a separate GeoTIFF with proper nodata handling."},
    
    # MovingPandas (Tier 1 — models score 0%)
    {"category": "MovingPandas", "difficulty": "medium",
     "prompt": "Use MovingPandas to create trajectories from GPS tracking data, compute speed along each trajectory, and detect stops where the object was stationary for more than 5 minutes."},
    
    # GeoPandas (Tier 2)
    {"category": "GeoPandas", "difficulty": "easy",
     "prompt": "Perform a spatial join between a points GeoDataFrame and a polygons GeoDataFrame to count how many points fall within each polygon."},
    {"category": "GeoPandas", "difficulty": "medium",
     "prompt": "Write a function that reads a shapefile, buffers all geometries by 500 meters, dissolves overlapping buffers, and calculates the total area covered in square kilometers."},
    
    # CRS / PyProj (common failure)
    {"category": "PyProj", "difficulty": "medium",
     "prompt": "Write a function that takes two GeoDataFrames with potentially different CRS, automatically detects the mismatch, reprojects them to a common CRS, and computes their spatial intersection."},
    
    # Multi-library workflow
    {"category": "Multi-library", "difficulty": "hard",
     "prompt": "Write a Python script that downloads building footprints from OpenStreetMap using OSMnx, projects them to UTM for accurate area calculation, filters buildings larger than 500 sq meters, and creates an interactive Folium choropleth map colored by building area."},
    {"category": "Multi-library", "difficulty": "hard",
     "prompt": "Create a complete site suitability analysis for wind turbine placement: compute slope from DEM, extract distance to roads, buffer protected areas, and combine with a weighted overlay to produce a suitability score raster."},
    
    # H3
    {"category": "H3", "difficulty": "medium",
     "prompt": "Use H3 hexagonal indexing to aggregate 10,000 crime incident points into resolution-9 hexagonal bins, compute density per square kilometer, and identify the top 10 hotspot hexagons."},
    
    # Folium
    {"category": "Folium", "difficulty": "medium",
     "prompt": "Create an interactive Folium map with three layers: clustered markers for schools, a heatmap for population density, and a choropleth of median income by census tract. Add layer controls."},
]


def load_model(base_model_id, adapter_id=None, quantize=True):
    """Load model with optional LoRA adapter."""
    model_kwargs = {"trust_remote_code": True, "attn_implementation": "eager"}
    
    if quantize and torch.cuda.is_available():
        from transformers import BitsAndBytesConfig
        model_kwargs["quantization_config"] = BitsAndBytesConfig(
            load_in_4bit=True, bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16,
        )
        model_kwargs["dtype"] = torch.bfloat16
    else:
        model_kwargs["dtype"] = torch.float32
    
    model = AutoModelForCausalLM.from_pretrained(
        base_model_id, device_map="auto" if torch.cuda.is_available() else None,
        **model_kwargs,
    )
    tokenizer = AutoTokenizer.from_pretrained(base_model_id, trust_remote_code=True)
    
    if adapter_id:
        model = PeftModel.from_pretrained(model, adapter_id)
        print(f"Loaded adapter: {adapter_id}")
    
    model.eval()
    return model, tokenizer


def generate(model, tokenizer, prompt, max_new_tokens=768):
    """Generate response."""
    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": prompt},
    ]
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=2048)
    if torch.cuda.is_available():
        inputs = {k: v.cuda() for k, v in inputs.items()}
    
    t0 = time.time()
    with torch.no_grad():
        out = model.generate(
            **inputs, max_new_tokens=max_new_tokens,
            temperature=0.7, top_p=0.9, do_sample=True, repetition_penalty=1.1,
        )
    gen_time = time.time() - t0
    response = tokenizer.decode(out[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
    return response, gen_time


def score_response(response, category):
    """Score response quality on multiple dimensions."""
    r = response.lower()
    scores = {
        "has_code": 1 if ("```python" in response or "import " in response) else 0,
        "has_function": 1 if "def " in response else 0,
        "has_gis_lib": 1 if any(lib in r for lib in [
            "geopandas", "gpd.", "shapely", "rasterio", "osmnx", "ox.", "gdal",
            "pyproj", "folium", "h3.", "fiona", "xarray", "movingpandas", "mpd."
        ]) else 0,
        "has_cot": 1 if any(p in r for p in [
            "step by step", "first", "then", "i need to", "let me think"
        ]) else 0,
        "has_docstring": 1 if '"""' in response or "'''" in response else 0,
        "has_error_handling": 1 if any(k in r for k in ["try:", "except", "raise", "if.*none", "is_valid"]) else 0,
        "has_crs_handling": 1 if any(k in r for k in ["crs", "epsg", "to_crs", "reproject"]) else 0,
    }
    
    # Category-specific scoring
    cat_libs = {
        "OSMnx": ["osmnx", "ox."],
        "Rasterio": ["rasterio"],
        "MovingPandas": ["movingpandas", "mpd."],
        "GeoPandas": ["geopandas", "gpd."],
        "PyProj": ["pyproj", "crs", "epsg"],
        "H3": ["h3."],
        "Folium": ["folium"],
    }
    if category in cat_libs:
        scores["correct_library"] = 1 if any(lib in r for lib in cat_libs[category]) else 0
    else:
        scores["correct_library"] = scores["has_gis_lib"]
    
    scores["total"] = sum(scores.values())
    scores["max_possible"] = len(scores) - 1  # exclude total
    scores["pct"] = scores["total"] / scores["max_possible"] * 100
    return scores


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--base_model_id", default="Qwen/Qwen2.5-Coder-7B-Instruct")
    parser.add_argument("--adapter_id", default="RhodWeo/GIS-Coder-7B")
    parser.add_argument("--compare_base", action="store_true", help="Also evaluate base model for comparison")
    parser.add_argument("--max_new_tokens", type=int, default=768)
    parser.add_argument("--output", type=str, default="eval_results.json")
    parser.add_argument("--no_quantize", action="store_true")
    args = parser.parse_args()
    
    results = {"fine_tuned": [], "base": []}
    
    # Evaluate fine-tuned model
    print(f"\n{'='*60}")
    print(f"EVALUATING: {args.base_model_id} + {args.adapter_id}")
    print(f"{'='*60}")
    
    model, tokenizer = load_model(args.base_model_id, args.adapter_id, not args.no_quantize)
    
    for i, item in enumerate(EVAL_PROMPTS):
        print(f"\n[{i+1}/{len(EVAL_PROMPTS)}] {item['category']} ({item['difficulty']})")
        print(f"  Prompt: {item['prompt'][:80]}...")
        
        response, gen_time = generate(model, tokenizer, item["prompt"], args.max_new_tokens)
        scores = score_response(response, item["category"])
        
        print(f"  Time: {gen_time:.1f}s | Score: {scores['pct']:.0f}% | Code: {'✓' if scores['has_code'] else '✗'} | "
              f"Lib: {'✓' if scores['correct_library'] else '✗'} | CoT: {'✓' if scores['has_cot'] else '✗'}")
        
        results["fine_tuned"].append({
            **item, "response": response, "gen_time": gen_time, "scores": scores,
        })
    
    # Summary
    ft_scores = [r["scores"]["pct"] for r in results["fine_tuned"]]
    print(f"\n{'='*60}")
    print(f"FINE-TUNED MODEL SUMMARY")
    print(f"  Avg score: {sum(ft_scores)/len(ft_scores):.1f}%")
    print(f"  Code blocks: {sum(r['scores']['has_code'] for r in results['fine_tuned'])}/{len(EVAL_PROMPTS)}")
    print(f"  Correct library: {sum(r['scores']['correct_library'] for r in results['fine_tuned'])}/{len(EVAL_PROMPTS)}")
    print(f"  Chain-of-thought: {sum(r['scores']['has_cot'] for r in results['fine_tuned'])}/{len(EVAL_PROMPTS)}")
    
    by_cat = {}
    for r in results["fine_tuned"]:
        cat = r["category"]
        by_cat.setdefault(cat, []).append(r["scores"]["pct"])
    print(f"\n  By category:")
    for cat, scores in sorted(by_cat.items()):
        print(f"    {cat}: {sum(scores)/len(scores):.0f}%")
    
    # Optionally compare with base model
    if args.compare_base:
        print(f"\n{'='*60}")
        print(f"EVALUATING BASE: {args.base_model_id}")
        print(f"{'='*60}")
        
        del model
        torch.cuda.empty_cache() if torch.cuda.is_available() else None
        
        base_model, _ = load_model(args.base_model_id, None, not args.no_quantize)
        
        for i, item in enumerate(EVAL_PROMPTS):
            response, gen_time = generate(base_model, tokenizer, item["prompt"], args.max_new_tokens)
            scores = score_response(response, item["category"])
            results["base"].append({**item, "response": response, "gen_time": gen_time, "scores": scores})
        
        base_scores = [r["scores"]["pct"] for r in results["base"]]
        print(f"\n  BASE: {sum(base_scores)/len(base_scores):.1f}% vs FINE-TUNED: {sum(ft_scores)/len(ft_scores):.1f}%")
        print(f"  Improvement: +{sum(ft_scores)/len(ft_scores) - sum(base_scores)/len(base_scores):.1f}pp")
    
    # Save results
    with open(args.output, "w") as f:
        json.dump(results, f, indent=2, default=str)
    print(f"\nResults saved to {args.output}")


if __name__ == "__main__":
    main()