""" Test script to verify RunPod handwriting service integration. This script tests the integration between the API and the deployed RunPod service. """ import asyncio import sys from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) from .utils import call_handwriting_service_batch from .config import settings async def test_runpod_integration(): """Test the RunPod handwriting service integration""" print("=" * 80) print("RunPod Handwriting Service Integration Test") print("=" * 80) # Check configuration print("\n1. Configuration:") print(f" - HANDWRITING_SERVICE_URL: {settings.HANDWRITING_SERVICE_URL}") print(f" - HANDWRITING_SERVICE_ENABLED: {settings.HANDWRITING_SERVICE_ENABLED}") print(f" - HANDWRITING_SERVICE_TIMEOUT: {settings.HANDWRITING_SERVICE_TIMEOUT}s") print(f" - HANDWRITING_SERVICE_MAX_RETRIES: {settings.HANDWRITING_SERVICE_MAX_RETRIES}") print(f" - RUNPOD_API_KEY: {'Set' if settings.RUNPOD_API_KEY else 'Not set (optional)'}") if not settings.HANDWRITING_SERVICE_ENABLED: print("\n❌ HANDWRITING_SERVICE_ENABLED is false. Please enable it in .env") return # Prepare test data test_texts = [ { "text": "Hello", "author_id": 42, "hw_id": "test_hw_0" }, { "text": "World", "author_id": 42, "hw_id": "test_hw_1" }, { "text": "DocGenie", "author_id": 100, "hw_id": "test_hw_2" }, { "text": "Batch", "author_id": 150, "hw_id": "test_hw_3" }, { "text": "Processing", "author_id": 200, "hw_id": "test_hw_4" } ] print(f"\n2. Testing TRUE BATCH PROCESSING (cost-efficient):") print(f" - {len(test_texts)} texts will be sent in ONE request") print(f" - Activates ONLY 1 RunPod worker (instead of {len(test_texts)} workers)") print(f" - Expected cost savings: ~45% compared to parallel processing") for text in test_texts: print(f" - '{text['text']}' (author_id: {text['author_id']})") # Call the service print("\n3. Calling RunPod service with BATCH request...") import time start_time = time.time() try: results = await call_handwriting_service_batch( test_texts, apply_ink_filter=True, num_inference_steps=100 ) elapsed = time.time() - start_time print(f"\n4. Results:") print(f" - Successfully generated: {len(results)}/{len(test_texts)}") print(f" - Total time: {elapsed:.1f}s ({elapsed/len(results):.1f}s per text)") print(f" - Worker activations: 1 (would be {len(test_texts)} with old parallel method)") if results: print("\n5. Sample result details:") for i, result in enumerate(results[:2]): # Show first 2 results print(f"\n Result {i+1}:") print(f" - hw_id: {result.get('hw_id')}") print(f" - text: {result.get('text')}") print(f" - author_id: {result.get('author_id')}") print(f" - width: {result.get('width')}px") print(f" - height: {result.get('height')}px") print(f" - image_base64: {result.get('image_base64')[:50]}... ({len(result.get('image_base64', ''))} chars)") print("\n" + "=" * 80) print("✅ BATCH PROCESSING TEST PASSED!") print("=" * 80) print("\nCost Analysis:") print(f" OLD (parallel): {len(test_texts)} workers × 18s = {len(test_texts) * 18}s total worker time") print(f" NEW (batched): 1 worker × {int(elapsed)}s = {int(elapsed)}s total worker time") print(f" Savings: ~{int((1 - elapsed / (len(test_texts) * 18)) * 100)}% reduction in worker activation costs") print("\nThe API now sends all {len(test_texts)} texts in ONE request, activating only 1 worker.") print("This significantly reduces RunPod costs while maintaining quality.") else: print("\n⚠️ No results returned. Check the error messages above.") except Exception as e: print(f"\n❌ Integration test FAILED!") print(f"Error: {e}") import traceback traceback.print_exc() print("\nPossible issues:") print("1. Check that HANDWRITING_SERVICE_URL in .env is correct") print("2. Verify the RunPod endpoint is deployed with v12 (batch support)") print("3. Check if RUNPOD_API_KEY is required and set correctly") print("4. Ensure the service handler supports batch input format") if __name__ == "__main__": asyncio.run(test_runpod_integration())