KManager / debugger /collectors /script_collector.py
StarrySkyWorld's picture
Initial commit
494c89b
"""
Script Collector - collects all JS scripts from pages during registration.
Usage:
from debugger.collectors.script_collector import ScriptCollector
collector = ScriptCollector()
collector.collect(page) # Call after each page load
collector.save() # Save at the end
"""
import json
import hashlib
import base64
from pathlib import Path
from datetime import datetime
from urllib.parse import urlparse
from typing import Optional
class ScriptCollector:
"""Collects all JS scripts from browser pages."""
_instance: Optional['ScriptCollector'] = None
def __init__(self, output_dir: Path = None):
if output_dir is None:
from core.paths import get_paths
paths = get_paths()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_dir = paths.debug_sessions_dir / 'aws_scripts' / timestamp
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.collected = {} # url -> filepath
self.page_count = 0
self.enabled = True
print(f"[ScriptCollector] Output: {self.output_dir}")
@classmethod
def get_instance(cls) -> 'ScriptCollector':
"""Get singleton instance."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
@classmethod
def reset(cls):
"""Reset singleton (for new registration session)."""
if cls._instance:
cls._instance.save()
cls._instance = None
def collect(self, page, page_name: str = None) -> int:
"""
Collect all JS scripts from current page.
Args:
page: DrissionPage ChromiumPage instance
page_name: Optional name for this page (auto-generated if not provided)
Returns:
Number of new scripts collected
"""
if not self.enabled:
return 0
self.page_count += 1
url = page.url
if not page_name:
parsed = urlparse(url)
page_name = f"{self.page_count:02d}_{parsed.netloc.replace('.', '_')}"
print(f"[ScriptCollector] Page {self.page_count}: {url[:60]}...")
# Get all script URLs from page
try:
js_urls = page.run_js('''
const urls = new Set();
// Script tags with src
document.querySelectorAll('script[src]').forEach(s => {
if (s.src) urls.add(s.src);
});
// Performance API - catches dynamically loaded scripts
performance.getEntriesByType('resource').forEach(r => {
if (r.name.includes('.js') || r.initiatorType === 'script') {
urls.add(r.name);
}
});
return Array.from(urls);
''') or []
except Exception as e:
print(f"[ScriptCollector] Error getting scripts: {e}")
js_urls = []
print(f"[ScriptCollector] Found {len(js_urls)} script URLs")
# Create page directory
page_dir = self.output_dir / page_name
page_dir.mkdir(exist_ok=True)
new_count = 0
for js_url in js_urls:
if js_url in self.collected:
continue
try:
# Use CDP to fetch script content (bypasses CORS)
result = page.run_cdp('Network.loadNetworkResource',
frameId=page.run_cdp('Page.getFrameTree')['frameTree']['frame']['id'],
url=js_url,
options={'disableCache': False, 'includeCredentials': False}
)
content = None
if result.get('resource', {}).get('success'):
# Content might be base64 encoded
stream = result['resource'].get('stream')
if stream:
# Read from stream
data = page.run_cdp('IO.read', handle=stream)
content = data.get('data', '')
if data.get('base64Encoded'):
content = base64.b64decode(content).decode('utf-8', errors='ignore')
page.run_cdp('IO.close', handle=stream)
else:
content = result['resource'].get('content', '')
if not content or len(content) < 50:
# Fallback: try regular fetch
content = page.run_js(f'''
return await fetch("{js_url}").then(r => r.text()).catch(() => null);
''')
if content and len(content) > 50:
# Generate safe filename
parsed = urlparse(js_url)
filename = parsed.path.split('/')[-1] or 'script.js'
filename = filename.split('?')[0]
if not filename.endswith('.js'):
filename += '.js'
# Add hash for uniqueness
url_hash = hashlib.md5(js_url.encode()).hexdigest()[:8]
filename = f"{url_hash}_{filename}"
# Save script
filepath = page_dir / filename
filepath.write_text(content, encoding='utf-8')
self.collected[js_url] = str(filepath)
new_count += 1
size_kb = len(content) / 1024
print(f"[ScriptCollector] ✓ {filename} ({size_kb:.1f}KB)")
except Exception as e:
# Try simple approach as last resort
try:
content = page.run_js(f'return await fetch("{js_url}").then(r=>r.text()).catch(()=>null)')
if content and len(content) > 50:
parsed = urlparse(js_url)
filename = parsed.path.split('/')[-1] or 'script.js'
filename = filename.split('?')[0]
if not filename.endswith('.js'):
filename += '.js'
url_hash = hashlib.md5(js_url.encode()).hexdigest()[:8]
filename = f"{url_hash}_{filename}"
filepath = page_dir / filename
filepath.write_text(content, encoding='utf-8')
self.collected[js_url] = str(filepath)
new_count += 1
print(f"[ScriptCollector] ✓ {filename} (fallback)")
except:
pass
print(f"[ScriptCollector] Downloaded {new_count} new scripts")
return new_count
def save(self) -> Path:
"""Save index of all collected scripts."""
index = {
'timestamp': datetime.now().isoformat(),
'pages': self.page_count,
'total_scripts': len(self.collected),
'scripts': self.collected
}
index_file = self.output_dir / 'index.json'
with open(index_file, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
print(f"[ScriptCollector] Saved index: {index_file}")
print(f"[ScriptCollector] Total: {len(self.collected)} scripts from {self.page_count} pages")
return index_file
def disable(self):
"""Disable collection."""
self.enabled = False
def enable(self):
"""Enable collection."""
self.enabled = True
# Global function for easy access
def collect_scripts(page, page_name: str = None) -> int:
"""Collect scripts from page using singleton collector."""
return ScriptCollector.get_instance().collect(page, page_name)
def save_collected_scripts() -> Path:
"""Save all collected scripts."""
return ScriptCollector.get_instance().save()