Spaces:
Sleeping
Sleeping
| # simple_test_fixed.py - COMPLETELY FIXED TEST SCRIPT | |
| import requests | |
| import asyncio | |
| import sys | |
| import os | |
| # Add the project root to Python path | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| def test_fixed_implementation(): | |
| """ | |
| π§ͺ Test the completely fixed implementation | |
| Tests all endpoints to ensure the ai_strategy field error is resolved | |
| """ | |
| print("π§ͺ TESTING FIXED IMPLEMENTATION") | |
| print("=" * 50) | |
| base_url = "http://127.0.0.1:8000" | |
| # 1. Test Health Check | |
| print("1. π₯ Testing Health Check...") | |
| try: | |
| response = requests.get(f"{base_url}/health") | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f" β Health Status: {data.get('status', 'unknown')}") | |
| print(f" π Services: {data.get('services', [])}") | |
| else: | |
| print(f" β Health check failed: {response.status_code}") | |
| except Exception as e: | |
| print(f" β Health check error: {e}") | |
| # 2. Test Enhanced System Status | |
| print("2. π Testing Enhanced System Status...") | |
| try: | |
| response = requests.get(f"{base_url}/api/webhook/system-status") | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f" β System Status: {data.get('status', 'unknown')}") | |
| print(f" π§ Enhanced Features: {len(data.get('enhanced_features', {}))}") | |
| print(f" ποΈ Version: {data.get('version', 'unknown')}") | |
| else: | |
| print(f" β System status failed: {response.status_code}") | |
| except Exception as e: | |
| print(f" β System status error: {e}") | |
| # 3. Test ElevenLabs Integration | |
| print("3. π Testing ElevenLabs Integration...") | |
| try: | |
| response = requests.get(f"{base_url}/api/webhook/test-enhanced-elevenlabs") | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f" β ElevenLabs Status: {data.get('status', 'unknown')}") | |
| print(f" π― API Connected: {data.get('api_connected', False)}") | |
| else: | |
| print(f" β ElevenLabs test failed: {response.status_code}") | |
| except Exception as e: | |
| print(f" β ElevenLabs test error: {e}") | |
| # 4. Test Enhanced Campaign Workflow (THE CRITICAL TEST) | |
| print("4. π― Testing Enhanced Campaign Workflow...") | |
| try: | |
| campaign_data = { | |
| "campaign_id": "quick_test_001", | |
| "product_name": "TestPro Device", | |
| "brand_name": "TestTech Solutions", | |
| "product_description": "Revolutionary tech device for modern users", | |
| "target_audience": "Tech enthusiasts aged 25-40", | |
| "campaign_goal": "Increase product awareness and drive sales", | |
| "product_niche": "technology", | |
| "total_budget": 8000.0 | |
| } | |
| response = requests.post(f"{base_url}/api/webhook/enhanced-campaign", json=campaign_data) | |
| if response.status_code == 202: | |
| data = response.json() | |
| print(f" β Campaign Status: success") | |
| print(f" π― Task ID: {data.get('task_id', 'unknown')}") | |
| print(f" β±οΈ Estimated Duration: {data.get('estimated_duration_minutes', 'unknown')} minutes") | |
| # Brief wait to let orchestration start | |
| import time | |
| time.sleep(2) | |
| else: | |
| print(f" β Campaign test failed: {response.status_code}") | |
| print(f" π Error: {response.text}") | |
| except Exception as e: | |
| print(f" β Campaign test error: {e}") | |
| # 5. Test Contract Generation | |
| print("5. π Testing Contract Generation...") | |
| try: | |
| contract_data = { | |
| "creator_name": "TestCreator Pro", | |
| "compensation": 2500.0, | |
| "campaign_details": { | |
| "brand": "TestTech Solutions", | |
| "product": "TestPro Device", | |
| "deliverables": ["1 Instagram post", "3 Stories"], | |
| "timeline": "2 weeks" | |
| } | |
| } | |
| response = requests.post(f"{base_url}/api/webhook/generate-enhanced-contract", json=contract_data) | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f" β Contract Status: {data.get('status', 'unknown')}") | |
| print(f" π° Compensation: ${data.get('contract_data', {}).get('compensation', 0)}") | |
| print(f" π Contract Generated: {data.get('contract_generated', False)}") | |
| else: | |
| print(f" β Contract test failed: {response.status_code}") | |
| except Exception as e: | |
| print(f" β Contract test error: {e}") | |
| print("=" * 50) | |
| print("π FIXED IMPLEMENTATION TEST COMPLETED") | |
| print("π‘ Key Fixes Applied:") | |
| print(" β’ Added missing ai_strategy field to CampaignOrchestrationState") | |
| print(" β’ Fixed all field mapping issues in orchestrator") | |
| print(" β’ Cleaned up OOP design with proper encapsulation") | |
| print(" β’ Removed legacy code and unnecessary complexity") | |
| print(" β’ Added comprehensive error handling") | |
| print("π The 'ai_strategy' field error should now be resolved!") | |
| if __name__ == "__main__": | |
| test_fixed_implementation() |