# helpers/text_blocks.py from typing import List, Dict from bs4 import BeautifulSoup import httpx import re from helpers.utils import clean_text_block class TextExtractor: def __init__(self): self.headers = { 'User-Agent': ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/91.0.4472.124 Safari/537.36' ) } async def extract_text_from_url(self, url: str, timeout: int = 10) -> List[Dict[str, str]]: """ استخراج جميع النصوص من صفحة ويب """ try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.get(url, headers=self.headers) resp.raise_for_status() html_content = resp.text soup = BeautifulSoup(html_content, 'html.parser') # تنظيف HTML self._clean_html(soup) # استخراج النصوص raw_texts = self._extract_texts(soup) # تنظيف وتحويل الأرقام الهندية إلى عربية cleaned_texts = [clean_text_block(t) for t in raw_texts if self._is_valid_text(t)] # إزالة التكرار unique_texts = self._remove_duplicates(cleaned_texts) return unique_texts except httpx.RequestError as e: raise httpx.RequestError(f"Error fetching URL: {str(e)}") except Exception as e: raise Exception(f"Error processing content: {str(e)}") def _clean_html(self, soup: BeautifulSoup) -> None: """إزالة العناصر غير المرغوب فيها من HTML""" unwanted_tags = ['script', 'style', 'meta', 'link', 'noscript', 'header', 'footer', 'nav'] for tag in unwanted_tags: for element in soup.find_all(tag): element.decompose() def _extract_texts(self, soup: BeautifulSoup) -> List[str]: """استخراج نصوص خام من HTML""" return [element.strip() for element in soup.find_all(text=True) if element.strip()] def _is_valid_text(self, text: str) -> bool: """تصفية النصوص غير المفيدة""" if not text or len(text.strip()) < 2: return False if text.isspace(): return False # تجاهل النصوص التي تحتوي على رموز فقط if re.match(r'^[^\w\u0600-\u06FF]+$', text): return False return True def _remove_duplicates(self, texts: List[str]) -> List[Dict[str, str]]: """إزالة النصوص المكررة""" seen = set() unique = [] for t in texts: if t not in seen: seen.add(t) unique.append({"text": t}) return unique # دالة سريعة للاستخدام async def extract_text_from_url(url: str, timeout: int = 10) -> List[Dict[str, str]]: extractor = TextExtractor() return await extractor.extract_text_from_url(url, timeout)