|
|
""" |
|
|
Test script for Human-in-the-Loop Bias Detection workflow |
|
|
Run this to verify the HITL implementation is working correctly |
|
|
""" |
|
|
|
|
|
import requests |
|
|
import json |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
API_BASE = "http://localhost:8000/api/v1/bias-detection-hitl" |
|
|
AUTH_TOKEN = "YOUR_AUTH_TOKEN_HERE" |
|
|
|
|
|
|
|
|
TEST_PDF_PATH = "path/to/test/document.pdf" |
|
|
|
|
|
|
|
|
def print_section(title): |
|
|
"""Print a formatted section header""" |
|
|
print("\n" + "="*60) |
|
|
print(f" {title}") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
def print_json(data, indent=2): |
|
|
"""Pretty print JSON data""" |
|
|
print(json.dumps(data, indent=indent, ensure_ascii=False)) |
|
|
|
|
|
|
|
|
def test_health_check(): |
|
|
"""Test 1: Health Check""" |
|
|
print_section("TEST 1: Health Check") |
|
|
|
|
|
try: |
|
|
response = requests.get(f"{API_BASE}/health") |
|
|
response.raise_for_status() |
|
|
|
|
|
print("β Health check passed") |
|
|
print_json(response.json()) |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Health check failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_start_review(pdf_path): |
|
|
"""Test 2: Start Review Session""" |
|
|
print_section("TEST 2: Start Review Session") |
|
|
|
|
|
if not Path(pdf_path).exists(): |
|
|
print(f"β PDF file not found: {pdf_path}") |
|
|
print("Please update TEST_PDF_PATH in the script") |
|
|
return None |
|
|
|
|
|
try: |
|
|
with open(pdf_path, 'rb') as f: |
|
|
files = {'file': f} |
|
|
data = { |
|
|
'refine_with_llm': 'true', |
|
|
'confidence_threshold': '0.7' |
|
|
} |
|
|
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} |
|
|
|
|
|
response = requests.post( |
|
|
f"{API_BASE}/start-review", |
|
|
files=files, |
|
|
data=data, |
|
|
headers=headers |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
result = response.json() |
|
|
|
|
|
print("β Review session started successfully") |
|
|
print(f"\nSession ID: {result['session_id']}") |
|
|
print(f"Total Sentences: {result['total_sentences']}") |
|
|
print(f"Biased: {result['biased_count']}") |
|
|
print(f"Neutral: {result['neutral_count']}") |
|
|
|
|
|
|
|
|
biased_sentences = [s for s in result['sentences'] if s['is_biased']] |
|
|
if biased_sentences: |
|
|
print("\nFirst Biased Sentence:") |
|
|
sentence = biased_sentences[0] |
|
|
print(f" Original: {sentence['original_sentence']}") |
|
|
print(f" Category: {sentence['category']}") |
|
|
print(f" Confidence: {sentence['confidence']:.2f}") |
|
|
print(f" Suggestion: {sentence['suggestion']}") |
|
|
|
|
|
return result |
|
|
|
|
|
except requests.exceptions.HTTPError as e: |
|
|
print(f"β HTTP Error: {e}") |
|
|
print(f"Response: {e.response.text}") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"β Failed to start review: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def test_get_session_status(session_id): |
|
|
"""Test 3: Get Session Status""" |
|
|
print_section("TEST 3: Get Session Status") |
|
|
|
|
|
try: |
|
|
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} |
|
|
response = requests.get( |
|
|
f"{API_BASE}/session/{session_id}", |
|
|
headers=headers |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
result = response.json() |
|
|
|
|
|
print("β Session status retrieved") |
|
|
print(f"\nStatus: {result['status']}") |
|
|
print(f"Pending: {result['pending_count']}") |
|
|
print(f"Approved: {result['approved_count']}") |
|
|
print(f"Needs Regeneration: {result['needs_regeneration_count']}") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Failed to get session status: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def test_approve_suggestion(session_id, sentence_id, suggestion): |
|
|
"""Test 4: Approve Suggestion""" |
|
|
print_section("TEST 4: Approve Suggestion") |
|
|
|
|
|
try: |
|
|
headers = { |
|
|
'Authorization': f'Bearer {AUTH_TOKEN}', |
|
|
'Content-Type': 'application/json' |
|
|
} |
|
|
data = { |
|
|
'session_id': session_id, |
|
|
'sentence_id': sentence_id, |
|
|
'action': 'approve', |
|
|
'approved_suggestion': suggestion |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
f"{API_BASE}/approve-suggestion", |
|
|
json=data, |
|
|
headers=headers |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
result = response.json() |
|
|
|
|
|
print(f"β {result['message']}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Failed to approve suggestion: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_reject_and_regenerate(session_id, sentence_id): |
|
|
"""Test 5: Reject and Regenerate Suggestion""" |
|
|
print_section("TEST 5: Reject and Regenerate") |
|
|
|
|
|
try: |
|
|
headers = { |
|
|
'Authorization': f'Bearer {AUTH_TOKEN}', |
|
|
'Content-Type': 'application/json' |
|
|
} |
|
|
|
|
|
|
|
|
print("Rejecting suggestion...") |
|
|
reject_data = { |
|
|
'session_id': session_id, |
|
|
'sentence_id': sentence_id, |
|
|
'action': 'reject' |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
f"{API_BASE}/approve-suggestion", |
|
|
json=reject_data, |
|
|
headers=headers |
|
|
) |
|
|
response.raise_for_status() |
|
|
print("β Suggestion rejected") |
|
|
|
|
|
|
|
|
print("\nRegenerating new suggestion...") |
|
|
regen_data = { |
|
|
'session_id': session_id, |
|
|
'sentence_id': sentence_id |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
f"{API_BASE}/regenerate-suggestion", |
|
|
json=regen_data, |
|
|
headers=headers |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
result = response.json() |
|
|
print("β New suggestion generated") |
|
|
print(f"New Suggestion: {result['new_suggestion']}") |
|
|
|
|
|
return result['new_suggestion'] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Failed to reject/regenerate: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def test_generate_pdf(session_id, output_path="test_debiased.pdf"): |
|
|
"""Test 6: Generate PDF""" |
|
|
print_section("TEST 6: Generate PDF") |
|
|
|
|
|
try: |
|
|
headers = { |
|
|
'Authorization': f'Bearer {AUTH_TOKEN}', |
|
|
'Content-Type': 'application/json' |
|
|
} |
|
|
data = {'session_id': session_id} |
|
|
|
|
|
response = requests.post( |
|
|
f"{API_BASE}/generate-pdf", |
|
|
json=data, |
|
|
headers=headers |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
with open(output_path, 'wb') as f: |
|
|
f.write(response.content) |
|
|
|
|
|
changes = response.headers.get('X-Changes-Applied', 'unknown') |
|
|
print(f"β PDF generated successfully") |
|
|
print(f"Changes Applied: {changes}") |
|
|
print(f"Saved to: {output_path}") |
|
|
|
|
|
return True |
|
|
|
|
|
except requests.exceptions.HTTPError as e: |
|
|
print(f"β HTTP Error: {e}") |
|
|
if e.response.text: |
|
|
print(f"Error Details: {e.response.text}") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"β Failed to generate PDF: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def run_complete_workflow(): |
|
|
"""Run the complete HITL workflow test""" |
|
|
print("\n" + "β"*60) |
|
|
print(" HITL BIAS DETECTION - WORKFLOW TEST") |
|
|
print("β"*60) |
|
|
|
|
|
|
|
|
if not test_health_check(): |
|
|
print("\nβ Health check failed. Is the server running?") |
|
|
return False |
|
|
|
|
|
|
|
|
review_data = test_start_review(TEST_PDF_PATH) |
|
|
if not review_data: |
|
|
print("\nβ Could not start review session") |
|
|
return False |
|
|
|
|
|
session_id = review_data['session_id'] |
|
|
biased_sentences = [s for s in review_data['sentences'] if s['is_biased']] |
|
|
|
|
|
if not biased_sentences: |
|
|
print("\nβ No biased sentences found. Test with a PDF containing bias.") |
|
|
return False |
|
|
|
|
|
|
|
|
test_get_session_status(session_id) |
|
|
|
|
|
|
|
|
first_sentence = biased_sentences[0] |
|
|
test_approve_suggestion( |
|
|
session_id, |
|
|
first_sentence['sentence_id'], |
|
|
first_sentence['suggestion'] |
|
|
) |
|
|
|
|
|
|
|
|
if len(biased_sentences) > 1: |
|
|
second_sentence = biased_sentences[1] |
|
|
new_suggestion = test_reject_and_regenerate( |
|
|
session_id, |
|
|
second_sentence['sentence_id'] |
|
|
) |
|
|
|
|
|
|
|
|
if new_suggestion: |
|
|
test_approve_suggestion( |
|
|
session_id, |
|
|
second_sentence['sentence_id'], |
|
|
new_suggestion |
|
|
) |
|
|
|
|
|
|
|
|
print_section("Auto-approving remaining sentences") |
|
|
for sentence in biased_sentences[2:]: |
|
|
test_approve_suggestion( |
|
|
session_id, |
|
|
sentence['sentence_id'], |
|
|
sentence['suggestion'] |
|
|
) |
|
|
|
|
|
|
|
|
test_generate_pdf(session_id) |
|
|
|
|
|
print_section("TEST SUMMARY") |
|
|
print("β All tests completed successfully!") |
|
|
print(f"\nSession ID: {session_id}") |
|
|
print("Check 'test_debiased.pdf' for the generated output") |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main entry point""" |
|
|
print("\nπ§ HITL Bias Detection Test Script") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
if AUTH_TOKEN == "YOUR_AUTH_TOKEN_HERE": |
|
|
print("β Please set AUTH_TOKEN in the script") |
|
|
print("Get token from Supabase authentication") |
|
|
sys.exit(1) |
|
|
|
|
|
if TEST_PDF_PATH == "path/to/test/document.pdf": |
|
|
print("β Please set TEST_PDF_PATH in the script") |
|
|
print("Use a Nepali PDF with potentially biased content") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
success = run_complete_workflow() |
|
|
|
|
|
if success: |
|
|
print("\nβ
All tests passed!") |
|
|
sys.exit(0) |
|
|
else: |
|
|
print("\nβ Some tests failed") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|