Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Test script for horoscope system integration with astroastayogini.in | |
| """ | |
| import requests | |
| import json | |
| from datetime import date | |
| import sys | |
| def test_astrology_scraper(): | |
| """Test direct scraping from astrology.com""" | |
| print("Testing Astrology.com scraper...") | |
| try: | |
| from scrapers.astrology_com_scraper import AstrologyComScraper | |
| scraper = AstrologyComScraper() | |
| result = scraper.scrape_sign(scraper.base_url, "aries") | |
| if result.get('success'): | |
| print(f"β Successfully scraped Aries horoscope from astrology.com") | |
| print(f"Prediction: {result['prediction'][:100]}...") | |
| return True | |
| else: | |
| print(f"β Failed to scrape: {result.get('error')}") | |
| return False | |
| except Exception as e: | |
| print(f"β Error testing astrology.com: {str(e)}") | |
| return False | |
| def test_horoscope_scraper(): | |
| """Test direct scraping from horoscope.com""" | |
| print("\nTesting Horoscope.com scraper...") | |
| try: | |
| from scrapers.horoscope_com_scraper import HoroscopeComScraper | |
| scraper = HoroscopeComScraper() | |
| result = scraper.scrape_sign(scraper.base_url, "leo") | |
| if result.get('success'): | |
| print(f"β Successfully scraped Leo horoscope from horoscope.com") | |
| print(f"Prediction: {result['prediction'][:100]}...") | |
| return True | |
| else: | |
| print(f"β Failed to scrape: {result.get('error')}") | |
| return False | |
| except Exception as e: | |
| print(f"β Error testing horoscope.com: {str(e)}") | |
| return False | |
| def test_wordpress_integration(): | |
| """Test WordPress integration readiness for astroastayogini.in""" | |
| print("\nTesting WordPress integration readiness...") | |
| try: | |
| from services.wordpress_service import WordPressService | |
| # Test with mock astroastayogini.in configuration | |
| wp_service = WordPressService() | |
| if wp_service.is_configured: | |
| print("β WordPress service is configured") | |
| test_result = wp_service.test_connection() | |
| if test_result.get('success'): | |
| print("β WordPress connection test successful") | |
| else: | |
| print(f"β οΈ WordPress connection test failed: {test_result.get('error')}") | |
| else: | |
| print("β οΈ WordPress not configured yet (expected - needs astroastayogini.in credentials)") | |
| return True | |
| except Exception as e: | |
| print(f"β Error testing WordPress integration: {str(e)}") | |
| return False | |
| def test_openai_integration(): | |
| """Test OpenAI integration for horoscope consolidation""" | |
| print("\nTesting OpenAI integration...") | |
| import os | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| print("β οΈ OpenAI API key not configured (provide OPENAI_API_KEY for AI consolidation)") | |
| return False | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI(api_key=api_key) | |
| # Test with sample horoscope data | |
| test_predictions = [ | |
| { | |
| "source": "astrology.com", | |
| "prediction": "Today brings positive energy for new beginnings." | |
| }, | |
| { | |
| "source": "horoscope.com", | |
| "prediction": "Financial opportunities may present themselves today." | |
| } | |
| ] | |
| # Create consolidation prompt | |
| sources_text = "" | |
| for i, pred in enumerate(test_predictions, 1): | |
| sources_text += f"SOURCE {i} ({pred['source']}):\n{pred['prediction']}\n\n" | |
| prompt = f""" | |
| Please analyze and consolidate these daily horoscope predictions for ARIES. | |
| {sources_text} | |
| Create a single, coherent daily horoscope prediction that synthesizes the information from all sources. | |
| Focus on the common themes and advice while maintaining the mystical and guiding tone typical of horoscopes. | |
| The response should be 2-3 paragraphs long and should NOT mention the sources or that it's a consolidation. | |
| Respond with JSON in this format: | |
| {{ | |
| "consolidated_prediction": "The consolidated horoscope text..." | |
| }} | |
| """ | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert astrologer specializing in synthesizing horoscope predictions."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"}, | |
| temperature=0.7 | |
| ) | |
| content = response.choices[0].message.content | |
| if content: | |
| result = json.loads(content) | |
| consolidated = result.get("consolidated_prediction", "") | |
| if consolidated: | |
| print("β OpenAI consolidation test successful") | |
| print(f"Consolidated prediction: {consolidated[:100]}...") | |
| return True | |
| print("β OpenAI consolidation test failed") | |
| return False | |
| except Exception as e: | |
| print(f"β Error testing OpenAI: {str(e)}") | |
| return False | |
| def main(): | |
| """Run all tests for astroastayogini.in integration""" | |
| print("π Testing Horoscope System for astroastayogini.in Integration") | |
| print("=" * 60) | |
| tests = [ | |
| test_astrology_scraper, | |
| test_horoscope_scraper, | |
| test_wordpress_integration, | |
| test_openai_integration | |
| ] | |
| results = [] | |
| for test in tests: | |
| try: | |
| result = test() | |
| results.append(result) | |
| except Exception as e: | |
| print(f"β Test failed with exception: {str(e)}") | |
| results.append(False) | |
| print("\n" + "=" * 60) | |
| print("INTEGRATION READINESS SUMMARY:") | |
| print("=" * 60) | |
| if results[0] and results[1]: | |
| print("β Horoscope Scraping: READY") | |
| else: | |
| print("β Horoscope Scraping: NEEDS ATTENTION") | |
| if results[2]: | |
| print("β WordPress Integration: READY (needs astroastayogini.in credentials)") | |
| else: | |
| print("β WordPress Integration: NEEDS ATTENTION") | |
| if results[3]: | |
| print("β AI Consolidation: READY") | |
| else: | |
| print("β οΈ AI Consolidation: NEEDS OPENAI_API_KEY") | |
| success_rate = sum(results) / len(results) | |
| print(f"\nOverall Readiness: {success_rate*100:.0f}%") | |
| if success_rate >= 0.5: | |
| print("\nπ System is ready for astroastayogini.in integration!") | |
| print("\nNext steps:") | |
| print("1. Provide WordPress credentials for astroastayogini.in") | |
| print("2. Provide OpenAI API key for enhanced horoscopes") | |
| print("3. Configure automated daily publishing") | |
| else: | |
| print("\nβ οΈ System needs configuration before integration") | |
| if __name__ == "__main__": | |
| main() |