|
|
from pptx import Presentation |
|
|
from pdf2image import convert_from_path |
|
|
import pdfplumber |
|
|
from docx import Document |
|
|
import subprocess |
|
|
import os |
|
|
from typing import Optional, List |
|
|
import string |
|
|
import random |
|
|
import re |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
import logging |
|
|
import time |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
|
|
|
class URLTextExtractor: |
|
|
""" |
|
|
A comprehensive utility for extracting text content from web pages with advanced features. |
|
|
|
|
|
Features: |
|
|
- Rotating User-Agents to mimic different browsers |
|
|
- Robust error handling and retry mechanism |
|
|
- Section preservation for maintaining document structure |
|
|
- Configurable extraction options |
|
|
- Logging support |
|
|
|
|
|
Attributes: |
|
|
USER_AGENTS (list): A comprehensive list of user agent strings to rotate through. |
|
|
logger (logging.Logger): Logger for tracking extraction attempts and errors. |
|
|
|
|
|
Example: |
|
|
>>> extractor = URLTextExtractor() |
|
|
>>> text = extractor.extract_text_from_url('https://example.com') |
|
|
>>> print(text) |
|
|
""" |
|
|
|
|
|
|
|
|
USER_AGENTS = [ |
|
|
|
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.1 Safari/605.1.15", |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0", |
|
|
|
|
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1", |
|
|
"Mozilla/5.0 (Linux; Android 10; SM-G970F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Mobile Safari/537.36", |
|
|
] |
|
|
|
|
|
def __init__(self, logger=None): |
|
|
""" |
|
|
Initialize the URLTextExtractor. |
|
|
|
|
|
Args: |
|
|
logger (logging.Logger, optional): Custom logger. |
|
|
If not provided, creates a default logger. |
|
|
""" |
|
|
self.logger = logger or self._create_default_logger() |
|
|
|
|
|
def _create_default_logger(self): |
|
|
""" |
|
|
Create a default logger for tracking extraction process. |
|
|
|
|
|
Returns: |
|
|
logging.Logger: Configured logger instance |
|
|
""" |
|
|
logger = logging.getLogger(__name__) |
|
|
logger.setLevel(logging.INFO) |
|
|
handler = logging.StreamHandler() |
|
|
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") |
|
|
handler.setFormatter(formatter) |
|
|
logger.addHandler(handler) |
|
|
return logger |
|
|
|
|
|
def _process_element_text(self, element): |
|
|
""" |
|
|
Process text within an element, handling anchor tags specially. |
|
|
|
|
|
Args: |
|
|
element (bs4.element.Tag): BeautifulSoup element to process |
|
|
|
|
|
Returns: |
|
|
str: Processed text with proper spacing |
|
|
""" |
|
|
|
|
|
for a_tag in element.find_all("a"): |
|
|
|
|
|
a_tag.replace_with(f" {a_tag.get_text(strip=True)} ") |
|
|
|
|
|
|
|
|
return element.get_text(separator=" ", strip=True) |
|
|
|
|
|
def extract_text_from_url( |
|
|
self, |
|
|
url, |
|
|
max_retries=3, |
|
|
preserve_sections=True, |
|
|
min_section_length=30, |
|
|
allowed_tags=None, |
|
|
): |
|
|
""" |
|
|
Extract text content from a given URL with advanced configuration. |
|
|
|
|
|
Args: |
|
|
url (str): The URL of the webpage to extract text from. |
|
|
max_retries (int, optional): Maximum number of retry attempts. Defaults to 3. |
|
|
preserve_sections (bool, optional): Whether to preserve section separations. Defaults to True. |
|
|
min_section_length (int, optional): Minimum length of text sections to include. Defaults to 30. |
|
|
allowed_tags (list, optional): Specific HTML tags to extract text from. |
|
|
If None, uses a default set of content-rich tags. |
|
|
|
|
|
Returns: |
|
|
str: Extracted text content from the webpage |
|
|
|
|
|
Raises: |
|
|
ValueError: If URL cannot be fetched after maximum retries |
|
|
requests.RequestException: For network-related errors |
|
|
|
|
|
Examples: |
|
|
>>> extractor = URLTextExtractor() |
|
|
>>> text = extractor.extract_text_from_url('https://example.com') |
|
|
>>> text = extractor.extract_text_from_url('https://example.com', preserve_sections=False) |
|
|
""" |
|
|
|
|
|
if allowed_tags is None: |
|
|
allowed_tags = [ |
|
|
"p", |
|
|
"div", |
|
|
"article", |
|
|
"section", |
|
|
"main", |
|
|
"h1", |
|
|
"h2", |
|
|
"h3", |
|
|
"h4", |
|
|
"h5", |
|
|
"h6", |
|
|
] |
|
|
|
|
|
|
|
|
try: |
|
|
parsed_url = urlparse(url) |
|
|
if not all([parsed_url.scheme, parsed_url.netloc]): |
|
|
|
|
|
return None |
|
|
except Exception as e: |
|
|
self.logger.error(f"URL parsing error: {e}") |
|
|
raise |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
headers = { |
|
|
"User-Agent": random.choice(self.USER_AGENTS), |
|
|
"Accept-Language": "en-US,en;q=0.9", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", |
|
|
} |
|
|
|
|
|
|
|
|
response = requests.get( |
|
|
url, headers=headers, timeout=10, allow_redirects=True |
|
|
) |
|
|
|
|
|
|
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
self.logger.info(f"Successfully fetched URL: {url}") |
|
|
|
|
|
|
|
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
|
|
|
|
|
for script in soup( |
|
|
["script", "style", "head", "header", "footer", "nav"] |
|
|
): |
|
|
script.decompose() |
|
|
|
|
|
|
|
|
if preserve_sections: |
|
|
|
|
|
sections = [] |
|
|
for tag in allowed_tags: |
|
|
for element in soup.find_all(tag): |
|
|
|
|
|
section_text = self._process_element_text(element) |
|
|
|
|
|
|
|
|
if len(section_text) >= min_section_length: |
|
|
sections.append(section_text) |
|
|
|
|
|
|
|
|
text = "\n".join(sections) |
|
|
else: |
|
|
|
|
|
text = " ".join( |
|
|
self._process_element_text(element) |
|
|
for tag in allowed_tags |
|
|
for element in soup.find_all(tag) |
|
|
) |
|
|
|
|
|
|
|
|
text = "\n".join( |
|
|
line.strip() for line in text.split("\n") if line.strip() |
|
|
) |
|
|
|
|
|
return text |
|
|
|
|
|
except (requests.RequestException, ValueError) as e: |
|
|
|
|
|
self.logger.warning(f"Attempt {attempt + 1} failed: {e}") |
|
|
|
|
|
|
|
|
if attempt == max_retries - 1: |
|
|
self.logger.error( |
|
|
f"Failed to fetch URL after {max_retries} attempts" |
|
|
) |
|
|
raise ValueError( |
|
|
f"Error fetching URL after {max_retries} attempts: {e}" |
|
|
) |
|
|
|
|
|
|
|
|
wait_time = 2**attempt |
|
|
self.logger.info(f"Waiting {wait_time} seconds before retry") |
|
|
time.sleep(wait_time) |
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def extract_text_from_pptx(file_path): |
|
|
prs = Presentation(file_path) |
|
|
text_content = [] |
|
|
|
|
|
for slide in prs.slides: |
|
|
slide_text = [] |
|
|
for shape in slide.shapes: |
|
|
if hasattr(shape, "text"): |
|
|
slide_text.append(shape.text) |
|
|
text_content.append("\n".join(slide_text)) |
|
|
|
|
|
return "\n\n".join(text_content) |
|
|
|
|
|
|
|
|
def is_meaningful_text(text: str) -> bool: |
|
|
if not text or len(text) < 3: |
|
|
return False |
|
|
|
|
|
junk_patterns = [ |
|
|
r'^[^a-zA-Z]*$', |
|
|
r'^\W+$', |
|
|
r'^.{1,2}$', |
|
|
] |
|
|
if any(re.match(p, text) for p in junk_patterns): |
|
|
return False |
|
|
|
|
|
if re.search(r'[^\x20-\x7E]', text): |
|
|
return False |
|
|
|
|
|
letters = sum(1 for c in text if c.isalpha()) |
|
|
return letters / len(text) >= 0.3 |
|
|
|
|
|
def extract_using_unicode_search(path: str) -> str: |
|
|
with open(path, "rb") as file: |
|
|
data = file.read() |
|
|
|
|
|
text_blocks, current_text = [], b"" |
|
|
i = 0 |
|
|
while i < len(data) - 1: |
|
|
b1, b2 = data[i], data[i + 1] |
|
|
|
|
|
if 32 <= b1 <= 126 and b2 == 0: |
|
|
current_text += bytes([b1]) |
|
|
i += 2 |
|
|
elif b1 == 0 and current_text: |
|
|
try: |
|
|
text = current_text.decode("ascii", errors="ignore").strip() |
|
|
if is_meaningful_text(text): |
|
|
text_blocks.append(text) |
|
|
except: |
|
|
pass |
|
|
current_text = b"" |
|
|
i += 1 |
|
|
else: |
|
|
if current_text: |
|
|
try: |
|
|
text = current_text.decode("ascii", errors="ignore").strip() |
|
|
if is_meaningful_text(text): |
|
|
text_blocks.append(text) |
|
|
except: |
|
|
pass |
|
|
current_text = b"" |
|
|
i += 1 |
|
|
|
|
|
if current_text: |
|
|
try: |
|
|
text = current_text.decode("ascii", errors="ignore").strip() |
|
|
if is_meaningful_text(text): |
|
|
text_blocks.append(text) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
unique, seen = [], set() |
|
|
for block in text_blocks: |
|
|
cleaned = re.sub(r"[^\w\s\.,;:!?\-]", "", block) |
|
|
if cleaned not in seen and len(cleaned) > 5: |
|
|
unique.append(block) |
|
|
seen.add(cleaned) |
|
|
|
|
|
return "\n".join(unique[:30]) if unique else "No text found" |
|
|
|
|
|
|
|
|
def extract_text_from_ppt(file_path: str) -> str: |
|
|
""" |
|
|
Extract text from legacy PowerPoint (.ppt) files using Unicode pattern search. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the .ppt file |
|
|
|
|
|
Returns: |
|
|
str: Extracted text from the presentation, or None if extraction fails |
|
|
|
|
|
Raises: |
|
|
FileNotFoundError: If the file doesn't exist |
|
|
ValueError: If the file is not a valid .ppt file |
|
|
""" |
|
|
if not os.path.exists(file_path): |
|
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
|
|
if not file_path.lower().endswith(".ppt"): |
|
|
raise ValueError(f"Unsupported file format: {file_path}. Only .ppt files are supported.") |
|
|
|
|
|
try: |
|
|
return extract_using_unicode_search(file_path) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error extracting text from {file_path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def convert_pdf_to_image(file): |
|
|
images = convert_from_path(file) |
|
|
return images |
|
|
|
|
|
|
|
|
def extract_text_from_pdf(file): |
|
|
text = "" |
|
|
with pdfplumber.open(file) as pdf: |
|
|
for page in pdf.pages: |
|
|
text += page.extract_text() + "\n" |
|
|
return text |
|
|
|
|
|
|
|
|
def extract_text_from_docx(file_path): |
|
|
text = "" |
|
|
doc = Document(file_path.name) |
|
|
for paragraph in doc.paragraphs: |
|
|
text += paragraph.text + "\n" |
|
|
return text |
|
|
|
|
|
|
|
|
def convert_doc_to_text(file_path): |
|
|
try: |
|
|
result = subprocess.run( |
|
|
["antiword", file_path], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
check=True, |
|
|
) |
|
|
text = result.stdout.lstrip("\ufeff") |
|
|
return text |
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"Error converting {file_path} to text: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
def generate_random_string(length=23): |
|
|
characters = string.ascii_letters + string.digits |
|
|
random_string = "".join(random.choice(characters) for _ in range(length)) |
|
|
return random_string |
|
|
|
|
|
|
|
|
|
|
|
def handle_json_output(json_list: list): |
|
|
n = len(json_list) |
|
|
for i in range(n): |
|
|
|
|
|
random_string1 = generate_random_string() |
|
|
random_string2 = generate_random_string() |
|
|
element = json_list[i] |
|
|
front = element["frontText"] |
|
|
back = element["backText"] |
|
|
element["frontHTML"] = ( |
|
|
f'<div id="element-richtextarea-{random_string1}" style="position:absolute;left:100px;top:50px;width:800px;height:300px;text-align:center;display:flex;align-items:center;font-size:40px;">' |
|
|
f"<p>{front}</p></div>" |
|
|
) |
|
|
element["backHTML"] = ( |
|
|
f'<div id="element-richtextarea-{random_string2}" style="position:absolute;left:100px;top:50px;width:800px;height:300px;text-align:center;display:flex;align-items:center;font-size:40px;">' |
|
|
f"<p>{back}</p></div>" |
|
|
) |
|
|
element["termType"] = "basic" |
|
|
cloze_matches = re.findall(r"_{2,}", front) |
|
|
|
|
|
if (cloze_matches != []) & (len(cloze_matches) <= 2): |
|
|
|
|
|
element["termType"] = "cloze" |
|
|
|
|
|
|
|
|
def replace_cloze(match): |
|
|
return f'</p><p><span class="closure">{back}</span></p><p>' |
|
|
|
|
|
front_html = re.sub(r"_{2,}", replace_cloze, front) |
|
|
element["frontHTML"] = ( |
|
|
f'<div id="element-richtextarea-{random_string1}" style="position:absolute;left:100px;top:50px;width:800px;height:300px;text-align:center;display:flex;align-items:center;font-size:40px;">' |
|
|
f"<p>{front_html}</p></div>" |
|
|
) |
|
|
|
|
|
def replace_underscores(match): |
|
|
return f" {back} " |
|
|
|
|
|
element["frontText"] = re.sub(r"_{2,}", replace_underscores, front) |
|
|
element["backText"] = "" |
|
|
|
|
|
element["backHTML"] = ( |
|
|
f'<div id="element-richtextarea-{random_string2}" style="position:absolute;left:100px;top:50px;width:800px;height:300px;text-align:center;display:flex;align-items:center;font-size:40px;">' |
|
|
f"<p><br></p></div>" |
|
|
) |
|
|
|
|
|
return json_list |
|
|
|
|
|
|
|
|
def sanitize_list_of_lists(text: str) -> Optional[List[List]]: |
|
|
left = text.find("[") |
|
|
right = text.rfind("]") |
|
|
text = text[left : right + 1] |
|
|
try: |
|
|
|
|
|
list_of_lists = eval(text) |
|
|
if isinstance(list_of_lists, list): |
|
|
out = [] |
|
|
try: |
|
|
|
|
|
for front, back in list_of_lists: |
|
|
out.append({"frontText": front, "backText": back}) |
|
|
return handle_json_output(out) |
|
|
|
|
|
except Exception as e: |
|
|
print(e) |
|
|
|
|
|
if out != []: |
|
|
return handle_json_output(out) |
|
|
|
|
|
else: |
|
|
return None |
|
|
else: |
|
|
print("The evaluated object is not a list.") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"Error parsing the list of lists: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
extractor = URLTextExtractor() |
|
|
|
|
|
|
|
|
def parse_url(url): |
|
|
return extractor.extract_text_from_url(url) |
|
|
|