Spaces:
Sleeping
Sleeping
| import logging | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import warnings | |
| logger = logging.getLogger(__name__) | |
| # Suppress SSL warnings for analysis | |
| warnings.filterwarnings('ignore', message='Unverified HTTPS request') | |
| class ContentAgent: | |
| """Fetches text content from URLs safely.""" | |
| METADATA_TIMEOUT = 5 | |
| MAX_TEXT_LENGTH = 2000 | |
| # Use standard browser UA to avoid being blocked by anti-bot systems | |
| USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
| def __init__(self): | |
| logger.info("ContentAgent: Initialized") | |
| def fetch_content(self, url): | |
| """ | |
| Fetches website content and extracts visible text. | |
| Returns a dict with 'text', 'title', and 'error' (if any). | |
| """ | |
| try: | |
| # SSRF Protection: Block local IPs | |
| if self._is_local_ip(url): | |
| logger.warning(f"ContentAgent: Blocked SSRF attempt to {url}") | |
| return {'text': "", 'error': "Access to local resources blocked", 'success': False} | |
| headers = {'User-Agent': self.USER_AGENT} | |
| response = requests.get( | |
| url, | |
| headers=headers, | |
| timeout=self.METADATA_TIMEOUT, | |
| verify=False, # Intentionally skip verify to analyze bad sites | |
| stream=True | |
| ) | |
| # Limit size | |
| content = '' | |
| for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): | |
| if chunk: | |
| content += chunk | |
| if len(content) > 100000: # Stop downloading after 100kb | |
| break | |
| soup = BeautifulSoup(content, 'html.parser') | |
| # Kill javascript and styles | |
| for script in soup(["script", "style"]): | |
| script.extract() | |
| text = soup.get_text() | |
| # Clean text | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return { | |
| 'text': text[:self.MAX_TEXT_LENGTH], | |
| 'title': soup.title.string.strip() if soup.title and soup.title.string else "", | |
| 'status_code': response.status_code, | |
| 'success': True | |
| } | |
| except requests.RequestException as e: | |
| # Expected behavior for dead/fake phishing sites | |
| logger.info(f"ContentAgent: Could not reach {url} - {str(e)[:100]}") | |
| return { | |
| 'text': "", | |
| 'error': str(e), | |
| 'success': False | |
| } | |
| except Exception as e: | |
| logger.error(f"ContentAgent: Unexpected error - {e}") | |
| return { | |
| 'text': "", | |
| 'error': f"Processing error: {e}", | |
| 'success': False | |
| } | |
| def _is_local_ip(self, url): | |
| """Check if URL points to localhost or private network.""" | |
| from urllib.parse import urlparse | |
| import socket | |
| try: | |
| hostname = urlparse(url).hostname | |
| if not hostname: return False | |
| # Direct text checks | |
| if hostname in ('localhost', '127.0.0.1', '::1'): | |
| return True | |
| # If it's an IP, check private ranges | |
| try: | |
| # Basic check for 192.168.x.x, 10.x.x.x, 172.16.x.x | |
| if hostname.startswith(('192.168.', '10.', '172.')): | |
| # 172.16.0.0 – 172.31.255.255 | |
| if hostname.startswith('172.'): | |
| octet = int(hostname.split('.')[1]) | |
| if 16 <= octet <= 31: return True | |
| return True | |
| except: | |
| pass | |
| return False | |
| except: | |
| return True # Fail safe | |