File size: 8,299 Bytes
88678e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""
Functional tests for the scraper architecture changes.
Run: python3 tests/test_architecture_changes.py
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import asyncio

results = []

def test(name, func):
    try:
        func()
        results.append((name, True, None))
        print(f"  ✓ {name}")
    except Exception as e:
        results.append((name, False, str(e)))
        print(f"  ✗ {name}: {e}")


# === Test 1: StrategyMetrics ===
def test_strategy_metrics():
    from scraper_health import StrategyMetrics
    m = StrategyMetrics(storage_file='/tmp/test_sm.json')
    m.records = []  # Clear
    m.record('airbnb', 'curl-cffi', True, 3.5, result_count=5)
    m.record('airbnb', 'curl-cffi', False, 1.2, error='429')
    m.record('airbnb', 'patchright', True, 15.3, result_count=8)
    
    rate, count = m.get_success_rate('airbnb', 'curl-cffi')
    assert count == 2, f"Expected 2 attempts, got {count}"
    assert rate == 0.5, f"Expected 50% rate, got {rate}"
    
    rate2, count2 = m.get_success_rate('airbnb', 'patchright')
    assert count2 == 1 and rate2 == 1.0
    
    assert m.get_consecutive_failures('airbnb') == 0  # Last was patchright success

test("StrategyMetrics basic", test_strategy_metrics)


# === Test 2: AdaptiveRouter ===
def test_adaptive_router():
    from scraper_health import StrategyMetrics, AdaptiveRouter
    m = StrategyMetrics(storage_file='/tmp/test_ar.json')
    m.records = []
    # Patchright has 100% success, curl-cffi has 0%
    m.record('airbnb', 'curl-cffi', False, 1.0, error='429')
    m.record('airbnb', 'curl-cffi', False, 1.0, error='429')
    m.record('airbnb', 'patchright', True, 15.0, result_count=5)
    m.record('airbnb', 'patchright', True, 12.0, result_count=3)
    
    router = AdaptiveRouter(m)
    order = router.get_strategy_order('airbnb')
    assert order[-1] == 'fallback', f"Fallback should be last: {order}"
    assert order[0] == 'patchright', f"Patchright should be first (100%): {order}"

test("AdaptiveRouter ordering", test_adaptive_router)


# === Test 3: RequestDelayer pressure ===
def test_delayer_pressure():
    from rate_limit_bypass import RequestDelayer
    d = RequestDelayer(min_delay=2, max_delay=5)
    assert d._pressure_level == 0
    d.notify_pressure()
    assert d._pressure_level == 1
    assert d.min_delay == 3.0  # 2 * 1.5
    assert d.max_delay == 7.5  # 5 * 1.5
    d.notify_pressure()
    assert d._pressure_level == 2
    assert d.min_delay == 4.0  # 2 * 2.0
    assert d.max_delay == 10.0  # 5 * 2.0

test("RequestDelayer adaptive pressure", test_delayer_pressure)


# === Test 4: Cache with disk persistence ===
def test_cache():
    from rate_limit_bypass import Cache
    import json
    c = Cache(ttl_minutes=30, disk_file='/tmp/test_cache.json')
    key = c.make_key('airbnb', region='Amsterdam', checkin='2026-02-15')
    c.set(key, [{"name": "Test Deal", "price": 100}])
    
    # Read back from memory
    result = c.get(key)
    assert result is not None, "Cache miss"
    assert result[0]["name"] == "Test Deal"
    
    # Check disk file exists
    assert os.path.exists('/tmp/test_cache.json'), "Disk file not created"
    with open('/tmp/test_cache.json') as f:
        disk_data = json.load(f)
    assert key in disk_data, f"Key {key} not in disk data"

test("Cache with disk persistence", test_cache)


# === Test 5: HealthReport format ===
def test_health_report():
    from scraper_health import StrategyMetrics, HealthReport
    m = StrategyMetrics(storage_file='/tmp/test_hr.json')
    m.records = []
    m.record('airbnb', 'curl-cffi', True, 3.0, result_count=5)
    m.record('booking', 'curl-cffi', True, 2.0, result_count=3)
    
    report = HealthReport(m)
    text = report.generate()
    assert 'SCRAPER HEALTH REPORT' in text
    assert 'AIRBNB' in text
    assert 'BOOKING' in text

test("HealthReport generation", test_health_report)


# === Test 6: ExponentialBackoff cap ===
def test_backoff_cap():
    from rate_limit_bypass import ExponentialBackoff
    b = ExponentialBackoff(max_retries=10, base_delay=10)
    # At attempt 5: 2^5 * 10 = 320, should be capped at 120
    b.attempt = 5
    delay = b.get_delay()
    assert delay <= 130, f"Delay {delay} exceeds cap"  # 120 + max 10 jitter

test("ExponentialBackoff 120s cap", test_backoff_cap)


# === Test 7: SessionWarmer cold/warm check ===
def test_session_warmer():
    from rate_limit_bypass import SessionWarmer
    sw = SessionWarmer()
    assert not sw.is_warm('www.airbnb.com'), "Should be cold initially"
    import time
    sw._warmed_sessions['www.airbnb.com'] = time.time()
    assert sw.is_warm('www.airbnb.com'), "Should be warm after marking"

test("SessionWarmer state tracking", test_session_warmer)


# === Test 8: SmartAirbnbScraper fallback ===
def test_smart_scraper_fallback():
    from airbnb_scraper_enhanced import SmartAirbnbScraper
    scraper = SmartAirbnbScraper()
    fallback = scraper._get_fallback('Amsterdam', '2026-02-15', '2026-02-22', 4)
    assert len(fallback) > 0, "Fallback should return deals"
    assert fallback[0]['source'] == 'airbnb (fallback)'

test("SmartAirbnbScraper fallback", test_smart_scraper_fallback)


# === Test 9: EnhancedAirbnbScraper JSON parsing ===
def test_json_parsing():
    import json as json_mod
    from bs4 import BeautifulSoup
    from airbnb_scraper_enhanced import EnhancedAirbnbScraper
    
    mock_data = {
        "niobeClientData": [[
            "ROOT_QUERY",
            {
                "data": {
                    "presentation": {
                        "staysSearch": {
                            "results": {
                                "searchResults": [
                                    {
                                        "__typename": "StaySearchResult",
                                        "id": "123",
                                        "listing": {
                                            "id": "123",
                                            "title": "Spacious Family Home",
                                            "avgRatingLocalized": "4.8",
                                            "avgRatingA11yLabel": "4.8 out of 5 stars, 50 reviews",
                                            "personCapacity": 4,
                                            "listingUrl": "/rooms/123?adults=4"
                                        },
                                        "structuredDisplayPrice": {
                                            "primaryLine": {
                                                "price": "€200",
                                                "discountedPrice": "€150"
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    }
                }
            }
        ]]
    }
    
    html = f'<html><body><script type="application/json">{json_mod.dumps(mock_data)}</script></body></html>'
    soup = BeautifulSoup(html, 'html.parser')
    
    scraper = EnhancedAirbnbScraper()
    deals = scraper._parse_html(soup, "Amsterdam", "2026-02-15", "2026-02-22", 4)
    assert len(deals) == 1, f"Expected 1 deal, got {len(deals)}"
    assert deals[0]['name'] == 'Spacious Family Home'
    assert deals[0]['price_per_night'] == 150  # Should use discounted price

test("EnhancedAirbnbScraper JSON parsing", test_json_parsing)


# === Test 10: VacationAgent instantiation ===
def test_agent_init():
    from holland_agent import VacationAgent
    agent = VacationAgent(budget_min=40, budget_max=200)
    assert agent.budget_min == 40
    assert agent.budget_max == 200
    assert agent.airbnb_scraper is not None
    assert agent.booking_scraper is not None

test("VacationAgent instantiation", test_agent_init)


# === Summary ===
print()
passed = sum(1 for _, ok, _ in results if ok)
failed = sum(1 for _, ok, _ in results if not ok)
print(f"Results: {passed}/{len(results)} passed, {failed} failed")
if failed:
    print("\nFailed tests:")
    for name, ok, err in results:
        if not ok:
            print(f"  ✗ {name}: {err}")
    sys.exit(1)
else:
    print("ALL TESTS PASSED ✓")