| """ |
| MASH Inference & Evaluation |
| |
| - Load trained model (SFT or DPO) |
| - Humanize AI-generated text |
| - Optionally apply Stage 4 refinement |
| - Evaluate with GPTZero |
| """ |
|
|
| import os |
| import sys |
| import json |
| import argparse |
| import time |
| import torch |
|
|
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
| from model import StyleBART |
|
|
|
|
| def humanize_text(model, text: str, essay_type: str = 'ps', |
| device: str = 'cuda', max_length: int = 512, |
| num_beams: int = 4) -> str: |
| """ |
| Humanize a single AI-generated text. |
| |
| Args: |
| model: trained StyleBART model |
| text: AI-generated text to humanize |
| essay_type: 'ps' or 'supp' |
| device: device string |
| max_length: max generation length |
| num_beams: beam search width |
| |
| Returns: |
| Humanized text string |
| """ |
| model.eval() |
| style_key = f'human_{essay_type}' |
| |
| inputs = model.tokenizer( |
| text, |
| max_length=512, |
| truncation=True, |
| return_tensors='pt', |
| ).to(device) |
| |
| with torch.no_grad(): |
| generated = model.generate_text( |
| inputs['input_ids'], |
| inputs['attention_mask'], |
| style_keys=[style_key], |
| max_length=max_length, |
| num_beams=num_beams, |
| ) |
| |
| output = model.tokenizer.decode(generated[0], skip_special_tokens=True) |
| return output |
|
|
|
|
| def humanize_batch(model, texts: list, essay_types: list, |
| device: str = 'cuda', batch_size: int = 8, |
| max_length: int = 512, num_beams: int = 4) -> list: |
| """Humanize a batch of texts.""" |
| model.eval() |
| results = [] |
| |
| for i in range(0, len(texts), batch_size): |
| batch_texts = texts[i:i+batch_size] |
| batch_types = essay_types[i:i+batch_size] |
| style_keys = [f'human_{t}' for t in batch_types] |
| |
| inputs = model.tokenizer( |
| batch_texts, |
| max_length=512, |
| truncation=True, |
| padding=True, |
| return_tensors='pt', |
| ).to(device) |
| |
| with torch.no_grad(): |
| generated = model.generate_text( |
| inputs['input_ids'], |
| inputs['attention_mask'], |
| style_keys=style_keys, |
| max_length=max_length, |
| num_beams=num_beams, |
| ) |
| |
| for j in range(len(batch_texts)): |
| output = model.tokenizer.decode(generated[j], skip_special_tokens=True) |
| results.append(output) |
| |
| return results |
|
|
|
|
| def evaluate_with_gptzero(texts: list, api_key: str = None) -> list: |
| """Evaluate texts with GPTZero API.""" |
| import requests |
| |
| if api_key is None: |
| api_key = os.environ.get('GPTZERO_API_KEY', '') |
| |
| results = [] |
| for i, text in enumerate(texts): |
| try: |
| resp = requests.post( |
| 'https://api.gptzero.me/v2/predict/text', |
| json={'document': text, 'version': '2024-04-04'}, |
| headers={'x-api-key': api_key, 'Content-Type': 'application/json'}, |
| timeout=30, |
| ) |
| resp.raise_for_status() |
| doc = resp.json().get('documents', [{}])[0] |
| results.append({ |
| 'ai_prob': doc.get('completely_generated_prob', 0), |
| 'human_prob': 1 - doc.get('completely_generated_prob', 0), |
| 'class': doc.get('predicted_class', 'unknown'), |
| }) |
| except Exception as e: |
| print(f" GPTZero error for text {i}: {e}") |
| results.append({'ai_prob': -1, 'human_prob': -1, 'class': 'error'}) |
| |
| time.sleep(0.5) |
| |
| return results |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--model_path', required=True, help='Path to trained model') |
| parser.add_argument('--input', required=True, help='Input JSONL file or single text') |
| parser.add_argument('--output', default='results.jsonl', help='Output JSONL file') |
| parser.add_argument('--essay_type', default='ps', choices=['ps', 'supp']) |
| parser.add_argument('--eval_gptzero', action='store_true', help='Evaluate with GPTZero') |
| parser.add_argument('--batch_size', type=int, default=8) |
| parser.add_argument('--num_beams', type=int, default=4) |
| parser.add_argument('--max_length', type=int, default=512) |
| args = parser.parse_args() |
| |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| print(f"Device: {device}") |
| |
| |
| print(f"Loading model from {args.model_path}...") |
| model = StyleBART.load_pretrained(args.model_path, device=str(device)) |
| model = model.to(device) |
| model.eval() |
| |
| |
| if os.path.isfile(args.input) and args.input.endswith('.jsonl'): |
| data = [] |
| with open(args.input) as f: |
| for line in f: |
| data.append(json.loads(line)) |
| texts = [d.get('input_text', d.get('ai_text', '')) for d in data] |
| essay_types = [d.get('essay_type', d.get('type', args.essay_type)) for d in data] |
| else: |
| texts = [args.input] |
| essay_types = [args.essay_type] |
| |
| print(f"Processing {len(texts)} texts...") |
| |
| |
| t0 = time.time() |
| humanized = humanize_batch( |
| model, texts, essay_types, |
| device=str(device), |
| batch_size=args.batch_size, |
| max_length=args.max_length, |
| num_beams=args.num_beams, |
| ) |
| elapsed = time.time() - t0 |
| print(f"Humanization complete in {elapsed:.1f}s ({elapsed/len(texts):.2f}s/text)") |
| |
| |
| gptzero_results = None |
| if args.eval_gptzero: |
| print("Evaluating with GPTZero...") |
| gptzero_results = evaluate_with_gptzero(humanized) |
| |
| |
| ai_probs = [r['ai_prob'] for r in gptzero_results if r['ai_prob'] >= 0] |
| if ai_probs: |
| avg_ai = sum(ai_probs) / len(ai_probs) |
| n_pass = sum(1 for p in ai_probs if p < 0.5) |
| print(f" Average AI prob: {avg_ai:.2%}") |
| print(f" Pass rate (<50% AI): {n_pass}/{len(ai_probs)} ({n_pass/len(ai_probs):.0%})") |
| |
| |
| with open(args.output, 'w') as f: |
| for i in range(len(texts)): |
| result = { |
| 'input_text': texts[i][:500], |
| 'humanized_text': humanized[i], |
| 'essay_type': essay_types[i], |
| 'input_words': len(texts[i].split()), |
| 'output_words': len(humanized[i].split()), |
| } |
| if gptzero_results: |
| result['gptzero'] = gptzero_results[i] |
| f.write(json.dumps(result, ensure_ascii=False) + '\n') |
| |
| print(f"Results saved to {args.output}") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|