setu / tests /test_hitl_workflow.py
khagu's picture
chore: finally untrack large database files
3998131
"""
Test script for Human-in-the-Loop Bias Detection workflow
Run this to verify the HITL implementation is working correctly
"""
import requests
import json
import sys
from pathlib import Path
# Configuration
API_BASE = "http://localhost:8000/api/v1/bias-detection-hitl"
AUTH_TOKEN = "YOUR_AUTH_TOKEN_HERE" # Replace with actual token
# Sample PDF path (update this to your test PDF)
TEST_PDF_PATH = "path/to/test/document.pdf"
def print_section(title):
"""Print a formatted section header"""
print("\n" + "="*60)
print(f" {title}")
print("="*60)
def print_json(data, indent=2):
"""Pretty print JSON data"""
print(json.dumps(data, indent=indent, ensure_ascii=False))
def test_health_check():
"""Test 1: Health Check"""
print_section("TEST 1: Health Check")
try:
response = requests.get(f"{API_BASE}/health")
response.raise_for_status()
print("βœ“ Health check passed")
print_json(response.json())
return True
except Exception as e:
print(f"βœ— Health check failed: {e}")
return False
def test_start_review(pdf_path):
"""Test 2: Start Review Session"""
print_section("TEST 2: Start Review Session")
if not Path(pdf_path).exists():
print(f"βœ— PDF file not found: {pdf_path}")
print("Please update TEST_PDF_PATH in the script")
return None
try:
with open(pdf_path, 'rb') as f:
files = {'file': f}
data = {
'refine_with_llm': 'true',
'confidence_threshold': '0.7'
}
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'}
response = requests.post(
f"{API_BASE}/start-review",
files=files,
data=data,
headers=headers
)
response.raise_for_status()
result = response.json()
print("βœ“ Review session started successfully")
print(f"\nSession ID: {result['session_id']}")
print(f"Total Sentences: {result['total_sentences']}")
print(f"Biased: {result['biased_count']}")
print(f"Neutral: {result['neutral_count']}")
# Show first biased sentence
biased_sentences = [s for s in result['sentences'] if s['is_biased']]
if biased_sentences:
print("\nFirst Biased Sentence:")
sentence = biased_sentences[0]
print(f" Original: {sentence['original_sentence']}")
print(f" Category: {sentence['category']}")
print(f" Confidence: {sentence['confidence']:.2f}")
print(f" Suggestion: {sentence['suggestion']}")
return result
except requests.exceptions.HTTPError as e:
print(f"βœ— HTTP Error: {e}")
print(f"Response: {e.response.text}")
return None
except Exception as e:
print(f"βœ— Failed to start review: {e}")
return None
def test_get_session_status(session_id):
"""Test 3: Get Session Status"""
print_section("TEST 3: Get Session Status")
try:
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'}
response = requests.get(
f"{API_BASE}/session/{session_id}",
headers=headers
)
response.raise_for_status()
result = response.json()
print("βœ“ Session status retrieved")
print(f"\nStatus: {result['status']}")
print(f"Pending: {result['pending_count']}")
print(f"Approved: {result['approved_count']}")
print(f"Needs Regeneration: {result['needs_regeneration_count']}")
return result
except Exception as e:
print(f"βœ— Failed to get session status: {e}")
return None
def test_approve_suggestion(session_id, sentence_id, suggestion):
"""Test 4: Approve Suggestion"""
print_section("TEST 4: Approve Suggestion")
try:
headers = {
'Authorization': f'Bearer {AUTH_TOKEN}',
'Content-Type': 'application/json'
}
data = {
'session_id': session_id,
'sentence_id': sentence_id,
'action': 'approve',
'approved_suggestion': suggestion
}
response = requests.post(
f"{API_BASE}/approve-suggestion",
json=data,
headers=headers
)
response.raise_for_status()
result = response.json()
print(f"βœ“ {result['message']}")
return True
except Exception as e:
print(f"βœ— Failed to approve suggestion: {e}")
return False
def test_reject_and_regenerate(session_id, sentence_id):
"""Test 5: Reject and Regenerate Suggestion"""
print_section("TEST 5: Reject and Regenerate")
try:
headers = {
'Authorization': f'Bearer {AUTH_TOKEN}',
'Content-Type': 'application/json'
}
# Step 1: Reject
print("Rejecting suggestion...")
reject_data = {
'session_id': session_id,
'sentence_id': sentence_id,
'action': 'reject'
}
response = requests.post(
f"{API_BASE}/approve-suggestion",
json=reject_data,
headers=headers
)
response.raise_for_status()
print("βœ“ Suggestion rejected")
# Step 2: Regenerate
print("\nRegenerating new suggestion...")
regen_data = {
'session_id': session_id,
'sentence_id': sentence_id
}
response = requests.post(
f"{API_BASE}/regenerate-suggestion",
json=regen_data,
headers=headers
)
response.raise_for_status()
result = response.json()
print("βœ“ New suggestion generated")
print(f"New Suggestion: {result['new_suggestion']}")
return result['new_suggestion']
except Exception as e:
print(f"βœ— Failed to reject/regenerate: {e}")
return None
def test_generate_pdf(session_id, output_path="test_debiased.pdf"):
"""Test 6: Generate PDF"""
print_section("TEST 6: Generate PDF")
try:
headers = {
'Authorization': f'Bearer {AUTH_TOKEN}',
'Content-Type': 'application/json'
}
data = {'session_id': session_id}
response = requests.post(
f"{API_BASE}/generate-pdf",
json=data,
headers=headers
)
response.raise_for_status()
# Save PDF
with open(output_path, 'wb') as f:
f.write(response.content)
changes = response.headers.get('X-Changes-Applied', 'unknown')
print(f"βœ“ PDF generated successfully")
print(f"Changes Applied: {changes}")
print(f"Saved to: {output_path}")
return True
except requests.exceptions.HTTPError as e:
print(f"βœ— HTTP Error: {e}")
if e.response.text:
print(f"Error Details: {e.response.text}")
return False
except Exception as e:
print(f"βœ— Failed to generate PDF: {e}")
return False
def run_complete_workflow():
"""Run the complete HITL workflow test"""
print("\n" + "β–ˆ"*60)
print(" HITL BIAS DETECTION - WORKFLOW TEST")
print("β–ˆ"*60)
# Test 1: Health Check
if not test_health_check():
print("\n⚠ Health check failed. Is the server running?")
return False
# Test 2: Start Review
review_data = test_start_review(TEST_PDF_PATH)
if not review_data:
print("\n⚠ Could not start review session")
return False
session_id = review_data['session_id']
biased_sentences = [s for s in review_data['sentences'] if s['is_biased']]
if not biased_sentences:
print("\n⚠ No biased sentences found. Test with a PDF containing bias.")
return False
# Test 3: Get Session Status
test_get_session_status(session_id)
# Test 4: Approve first biased sentence
first_sentence = biased_sentences[0]
test_approve_suggestion(
session_id,
first_sentence['sentence_id'],
first_sentence['suggestion']
)
# Test 5: Reject and regenerate (if there's a second biased sentence)
if len(biased_sentences) > 1:
second_sentence = biased_sentences[1]
new_suggestion = test_reject_and_regenerate(
session_id,
second_sentence['sentence_id']
)
# Approve the new suggestion
if new_suggestion:
test_approve_suggestion(
session_id,
second_sentence['sentence_id'],
new_suggestion
)
# Auto-approve remaining sentences for testing
print_section("Auto-approving remaining sentences")
for sentence in biased_sentences[2:]:
test_approve_suggestion(
session_id,
sentence['sentence_id'],
sentence['suggestion']
)
# Test 6: Generate PDF
test_generate_pdf(session_id)
print_section("TEST SUMMARY")
print("βœ“ All tests completed successfully!")
print(f"\nSession ID: {session_id}")
print("Check 'test_debiased.pdf' for the generated output")
return True
def main():
"""Main entry point"""
print("\nπŸ”§ HITL Bias Detection Test Script")
print("="*60)
# Check configuration
if AUTH_TOKEN == "YOUR_AUTH_TOKEN_HERE":
print("⚠ Please set AUTH_TOKEN in the script")
print("Get token from Supabase authentication")
sys.exit(1)
if TEST_PDF_PATH == "path/to/test/document.pdf":
print("⚠ Please set TEST_PDF_PATH in the script")
print("Use a Nepali PDF with potentially biased content")
sys.exit(1)
# Run workflow
success = run_complete_workflow()
if success:
print("\nβœ… All tests passed!")
sys.exit(0)
else:
print("\n❌ Some tests failed")
sys.exit(1)
if __name__ == "__main__":
main()