|
|
import requests |
|
|
import json |
|
|
import os |
|
|
from typing import Dict, Any, Optional, Union |
|
|
from urllib.parse import urljoin |
|
|
from bs4 import BeautifulSoup |
|
|
import html2text |
|
|
import time |
|
|
|
|
|
from ..core.module import BaseModule |
|
|
|
|
|
|
|
|
class RequestBase(BaseModule): |
|
|
""" |
|
|
Base class for handling HTTP requests, parsing content, and saving data. |
|
|
This class provides common functionality for web scraping and HTTP operations. |
|
|
""" |
|
|
|
|
|
def __init__(self, timeout: int = 30, max_retries: int = 3, delay_between_requests: float = 1.0): |
|
|
""" |
|
|
Initialize the RequestBase with configuration options. |
|
|
|
|
|
Args: |
|
|
timeout: Request timeout in seconds |
|
|
max_retries: Maximum number of retry attempts |
|
|
delay_between_requests: Delay between requests in seconds |
|
|
""" |
|
|
super().__init__() |
|
|
self.timeout = timeout |
|
|
self.max_retries = max_retries |
|
|
self.delay_between_requests = delay_between_requests |
|
|
self.session = requests.Session() |
|
|
|
|
|
|
|
|
self.html_converter = html2text.HTML2Text() |
|
|
self.html_converter.ignore_links = False |
|
|
self.html_converter.ignore_images = False |
|
|
self.html_converter.body_width = 0 |
|
|
|
|
|
|
|
|
self.session.headers.update({ |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
|
}) |
|
|
|
|
|
def request(self, url: str, method: str = 'GET', headers: Optional[Dict[str, str]] = None, |
|
|
params: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, |
|
|
json_data: Optional[Dict[str, Any]] = None) -> requests.Response: |
|
|
""" |
|
|
Make an HTTP request with retry logic and error handling. |
|
|
|
|
|
Args: |
|
|
url: The URL to request |
|
|
method: HTTP method (GET, POST, PUT, DELETE, etc.) |
|
|
headers: Additional headers to include |
|
|
params: URL parameters |
|
|
data: Form data to send |
|
|
json_data: JSON data to send |
|
|
|
|
|
Returns: |
|
|
requests.Response object |
|
|
|
|
|
Raises: |
|
|
requests.RequestException: If request fails after all retries |
|
|
""" |
|
|
if headers: |
|
|
request_headers = {**self.session.headers, **headers} |
|
|
else: |
|
|
request_headers = self.session.headers |
|
|
|
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
response = self.session.request( |
|
|
method=method.upper(), |
|
|
url=url, |
|
|
headers=request_headers, |
|
|
params=params, |
|
|
data=data, |
|
|
json=json_data, |
|
|
timeout=self.timeout |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
if attempt < self.max_retries - 1: |
|
|
time.sleep(self.delay_between_requests) |
|
|
|
|
|
return response |
|
|
|
|
|
except requests.RequestException as e: |
|
|
if attempt == self.max_retries - 1: |
|
|
raise e |
|
|
time.sleep(self.delay_between_requests * (attempt + 1)) |
|
|
|
|
|
def parse_html(self, html_content: str) -> BeautifulSoup: |
|
|
""" |
|
|
Parse HTML content using BeautifulSoup. |
|
|
|
|
|
Args: |
|
|
html_content: Raw HTML content |
|
|
|
|
|
Returns: |
|
|
BeautifulSoup object for parsing |
|
|
""" |
|
|
return BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
def parse_json(self, json_content: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Parse JSON content. |
|
|
|
|
|
Args: |
|
|
json_content: Raw JSON content |
|
|
|
|
|
Returns: |
|
|
Parsed JSON as dictionary |
|
|
""" |
|
|
return json.loads(json_content) |
|
|
|
|
|
def extract_text(self, html_content: str, selector: Optional[str] = None) -> str: |
|
|
""" |
|
|
Extract text content from HTML using html2text. |
|
|
|
|
|
Args: |
|
|
html_content: Raw HTML content |
|
|
selector: CSS selector to extract specific elements (optional) |
|
|
|
|
|
Returns: |
|
|
Extracted text content |
|
|
""" |
|
|
if selector: |
|
|
soup = self.parse_html(html_content) |
|
|
elements = soup.select(selector) |
|
|
combined_html = '\n'.join([str(elem) for elem in elements]) |
|
|
return self.html_converter.handle(combined_html) |
|
|
else: |
|
|
return self.html_converter.handle(html_content) |
|
|
|
|
|
def extract_links(self, html_content: str, base_url: str = None) -> list: |
|
|
""" |
|
|
Extract all links from HTML content. |
|
|
|
|
|
Args: |
|
|
html_content: Raw HTML content |
|
|
base_url: Base URL to resolve relative links |
|
|
|
|
|
Returns: |
|
|
List of extracted URLs |
|
|
""" |
|
|
soup = self.parse_html(html_content) |
|
|
links = [] |
|
|
|
|
|
for link in soup.find_all('a', href=True): |
|
|
href = link['href'] |
|
|
if base_url and not href.startswith(('http://', 'https://', 'mailto:', 'tel:')): |
|
|
href = urljoin(base_url, href) |
|
|
links.append(href) |
|
|
|
|
|
return links |
|
|
|
|
|
def save_content(self, content: Union[str, Dict[str, Any], bytes], file_path: str, |
|
|
content_type: str = 'text') -> bool: |
|
|
""" |
|
|
Save content to a file. |
|
|
|
|
|
Args: |
|
|
content: Content to save (string, dictionary, or bytes) |
|
|
file_path: Path where to save the file |
|
|
content_type: Type of content ('text', 'json', 'html', 'pdf', 'binary') |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True) |
|
|
|
|
|
if content_type.lower() == 'json': |
|
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(content, f, indent=2, ensure_ascii=False) |
|
|
elif content_type.lower() in ['pdf', 'binary'] or isinstance(content, bytes): |
|
|
with open(file_path, 'wb') as f: |
|
|
if isinstance(content, bytes): |
|
|
f.write(content) |
|
|
else: |
|
|
f.write(str(content).encode('utf-8')) |
|
|
else: |
|
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
|
f.write(str(content)) |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error saving content to {file_path}: {e}") |
|
|
return False |
|
|
|
|
|
def get_page_info(self, url: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get basic information about a webpage. |
|
|
|
|
|
Args: |
|
|
url: URL to analyze |
|
|
|
|
|
Returns: |
|
|
Dictionary containing page information |
|
|
""" |
|
|
try: |
|
|
response = self.request(url) |
|
|
soup = self.parse_html(response.text) |
|
|
|
|
|
|
|
|
info = { |
|
|
'url': url, |
|
|
'status_code': response.status_code, |
|
|
'title': soup.title.string if soup.title else '', |
|
|
'content_type': response.headers.get('content-type', ''), |
|
|
'content_length': len(response.text), |
|
|
'links_count': len(soup.find_all('a', href=True)), |
|
|
'images_count': len(soup.find_all('img')), |
|
|
} |
|
|
|
|
|
|
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'}) |
|
|
if meta_desc: |
|
|
info['description'] = meta_desc.get('content', '') |
|
|
|
|
|
return info |
|
|
|
|
|
except Exception as e: |
|
|
return {'error': str(e), 'url': url} |
|
|
|
|
|
def request_and_process(self, url: str, method: str = 'GET', headers: Optional[Dict[str, str]] = None, |
|
|
params: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, |
|
|
json_data: Optional[Dict[str, Any]] = None, return_raw: bool = False, |
|
|
save_file_path: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Make a request and process the response with comprehensive error handling. |
|
|
|
|
|
Args: |
|
|
url: The URL to request |
|
|
method: HTTP method (GET, POST, PUT, DELETE, etc.) |
|
|
headers: Additional headers to include |
|
|
params: URL parameters |
|
|
data: Form data to send |
|
|
json_data: JSON data to send |
|
|
return_raw: If True, return raw HTML content, otherwise processed text |
|
|
save_file_path: Optional path to save the content |
|
|
|
|
|
Returns: |
|
|
Dictionary containing processed response data |
|
|
""" |
|
|
try: |
|
|
response = self.request( |
|
|
url=url, |
|
|
method=method, |
|
|
headers=headers, |
|
|
params=params, |
|
|
data=data, |
|
|
json_data=json_data |
|
|
) |
|
|
|
|
|
|
|
|
content_type = response.headers.get('content-type', '').lower() |
|
|
|
|
|
result = { |
|
|
'url': url, |
|
|
'method': method.upper(), |
|
|
'status_code': response.status_code, |
|
|
'success': True, |
|
|
'content_type': content_type, |
|
|
'content_length': len(response.text), |
|
|
'headers': dict(response.headers) |
|
|
} |
|
|
|
|
|
|
|
|
if return_raw: |
|
|
result['content'] = response.text |
|
|
else: |
|
|
if 'json' in content_type: |
|
|
try: |
|
|
result['content'] = response.json() |
|
|
except json.JSONDecodeError: |
|
|
result['content'] = response.text |
|
|
result['warning'] = 'Content-Type indicates JSON but parsing failed' |
|
|
else: |
|
|
result['content'] = self.extract_text(response.text) |
|
|
|
|
|
|
|
|
if save_file_path: |
|
|
save_success = self._save_response_content(response, save_file_path, content_type) |
|
|
result['saved_to_file'] = save_file_path if save_success else None |
|
|
if not save_success: |
|
|
result['save_warning'] = f'Failed to save content to {save_file_path}' |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
'url': url, |
|
|
'method': method.upper(), |
|
|
'error': str(e), |
|
|
'success': False |
|
|
} |
|
|
|
|
|
def _save_response_content(self, response: requests.Response, file_path: str, content_type: str) -> bool: |
|
|
""" |
|
|
Save response content to file with appropriate format. |
|
|
|
|
|
Args: |
|
|
response: The response object |
|
|
file_path: Path to save the file |
|
|
content_type: Content type of the response |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True) |
|
|
|
|
|
if 'json' in content_type: |
|
|
try: |
|
|
json_content = response.json() |
|
|
return self.save_content(json_content, file_path, 'json') |
|
|
except json.JSONDecodeError: |
|
|
return self.save_content(response.text, file_path, 'text') |
|
|
elif 'html' in content_type: |
|
|
return self.save_content(response.text, file_path, 'html') |
|
|
else: |
|
|
return self.save_content(response.text, file_path, 'text') |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error saving response content: {e}") |
|
|
return False |
|
|
|
|
|
def close(self): |
|
|
"""Close the session.""" |
|
|
self.session.close() |