Spaces:
Sleeping
Sleeping
| """ | |
| Script Collector - collects all JS scripts from pages during registration. | |
| Usage: | |
| from debugger.collectors.script_collector import ScriptCollector | |
| collector = ScriptCollector() | |
| collector.collect(page) # Call after each page load | |
| collector.save() # Save at the end | |
| """ | |
| import json | |
| import hashlib | |
| import base64 | |
| from pathlib import Path | |
| from datetime import datetime | |
| from urllib.parse import urlparse | |
| from typing import Optional | |
| class ScriptCollector: | |
| """Collects all JS scripts from browser pages.""" | |
| _instance: Optional['ScriptCollector'] = None | |
| def __init__(self, output_dir: Path = None): | |
| if output_dir is None: | |
| from core.paths import get_paths | |
| paths = get_paths() | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| output_dir = paths.debug_sessions_dir / 'aws_scripts' / timestamp | |
| self.output_dir = Path(output_dir) | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| self.collected = {} # url -> filepath | |
| self.page_count = 0 | |
| self.enabled = True | |
| print(f"[ScriptCollector] Output: {self.output_dir}") | |
| def get_instance(cls) -> 'ScriptCollector': | |
| """Get singleton instance.""" | |
| if cls._instance is None: | |
| cls._instance = cls() | |
| return cls._instance | |
| def reset(cls): | |
| """Reset singleton (for new registration session).""" | |
| if cls._instance: | |
| cls._instance.save() | |
| cls._instance = None | |
| def collect(self, page, page_name: str = None) -> int: | |
| """ | |
| Collect all JS scripts from current page. | |
| Args: | |
| page: DrissionPage ChromiumPage instance | |
| page_name: Optional name for this page (auto-generated if not provided) | |
| Returns: | |
| Number of new scripts collected | |
| """ | |
| if not self.enabled: | |
| return 0 | |
| self.page_count += 1 | |
| url = page.url | |
| if not page_name: | |
| parsed = urlparse(url) | |
| page_name = f"{self.page_count:02d}_{parsed.netloc.replace('.', '_')}" | |
| print(f"[ScriptCollector] Page {self.page_count}: {url[:60]}...") | |
| # Get all script URLs from page | |
| try: | |
| js_urls = page.run_js(''' | |
| const urls = new Set(); | |
| // Script tags with src | |
| document.querySelectorAll('script[src]').forEach(s => { | |
| if (s.src) urls.add(s.src); | |
| }); | |
| // Performance API - catches dynamically loaded scripts | |
| performance.getEntriesByType('resource').forEach(r => { | |
| if (r.name.includes('.js') || r.initiatorType === 'script') { | |
| urls.add(r.name); | |
| } | |
| }); | |
| return Array.from(urls); | |
| ''') or [] | |
| except Exception as e: | |
| print(f"[ScriptCollector] Error getting scripts: {e}") | |
| js_urls = [] | |
| print(f"[ScriptCollector] Found {len(js_urls)} script URLs") | |
| # Create page directory | |
| page_dir = self.output_dir / page_name | |
| page_dir.mkdir(exist_ok=True) | |
| new_count = 0 | |
| for js_url in js_urls: | |
| if js_url in self.collected: | |
| continue | |
| try: | |
| # Use CDP to fetch script content (bypasses CORS) | |
| result = page.run_cdp('Network.loadNetworkResource', | |
| frameId=page.run_cdp('Page.getFrameTree')['frameTree']['frame']['id'], | |
| url=js_url, | |
| options={'disableCache': False, 'includeCredentials': False} | |
| ) | |
| content = None | |
| if result.get('resource', {}).get('success'): | |
| # Content might be base64 encoded | |
| stream = result['resource'].get('stream') | |
| if stream: | |
| # Read from stream | |
| data = page.run_cdp('IO.read', handle=stream) | |
| content = data.get('data', '') | |
| if data.get('base64Encoded'): | |
| content = base64.b64decode(content).decode('utf-8', errors='ignore') | |
| page.run_cdp('IO.close', handle=stream) | |
| else: | |
| content = result['resource'].get('content', '') | |
| if not content or len(content) < 50: | |
| # Fallback: try regular fetch | |
| content = page.run_js(f''' | |
| return await fetch("{js_url}").then(r => r.text()).catch(() => null); | |
| ''') | |
| if content and len(content) > 50: | |
| # Generate safe filename | |
| parsed = urlparse(js_url) | |
| filename = parsed.path.split('/')[-1] or 'script.js' | |
| filename = filename.split('?')[0] | |
| if not filename.endswith('.js'): | |
| filename += '.js' | |
| # Add hash for uniqueness | |
| url_hash = hashlib.md5(js_url.encode()).hexdigest()[:8] | |
| filename = f"{url_hash}_{filename}" | |
| # Save script | |
| filepath = page_dir / filename | |
| filepath.write_text(content, encoding='utf-8') | |
| self.collected[js_url] = str(filepath) | |
| new_count += 1 | |
| size_kb = len(content) / 1024 | |
| print(f"[ScriptCollector] ✓ {filename} ({size_kb:.1f}KB)") | |
| except Exception as e: | |
| # Try simple approach as last resort | |
| try: | |
| content = page.run_js(f'return await fetch("{js_url}").then(r=>r.text()).catch(()=>null)') | |
| if content and len(content) > 50: | |
| parsed = urlparse(js_url) | |
| filename = parsed.path.split('/')[-1] or 'script.js' | |
| filename = filename.split('?')[0] | |
| if not filename.endswith('.js'): | |
| filename += '.js' | |
| url_hash = hashlib.md5(js_url.encode()).hexdigest()[:8] | |
| filename = f"{url_hash}_{filename}" | |
| filepath = page_dir / filename | |
| filepath.write_text(content, encoding='utf-8') | |
| self.collected[js_url] = str(filepath) | |
| new_count += 1 | |
| print(f"[ScriptCollector] ✓ {filename} (fallback)") | |
| except: | |
| pass | |
| print(f"[ScriptCollector] Downloaded {new_count} new scripts") | |
| return new_count | |
| def save(self) -> Path: | |
| """Save index of all collected scripts.""" | |
| index = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'pages': self.page_count, | |
| 'total_scripts': len(self.collected), | |
| 'scripts': self.collected | |
| } | |
| index_file = self.output_dir / 'index.json' | |
| with open(index_file, 'w', encoding='utf-8') as f: | |
| json.dump(index, f, indent=2, ensure_ascii=False) | |
| print(f"[ScriptCollector] Saved index: {index_file}") | |
| print(f"[ScriptCollector] Total: {len(self.collected)} scripts from {self.page_count} pages") | |
| return index_file | |
| def disable(self): | |
| """Disable collection.""" | |
| self.enabled = False | |
| def enable(self): | |
| """Enable collection.""" | |
| self.enabled = True | |
| # Global function for easy access | |
| def collect_scripts(page, page_name: str = None) -> int: | |
| """Collect scripts from page using singleton collector.""" | |
| return ScriptCollector.get_instance().collect(page, page_name) | |
| def save_collected_scripts() -> Path: | |
| """Save all collected scripts.""" | |
| return ScriptCollector.get_instance().save() | |