from __future__ import annotations import re from pathlib import Path import fitz import httpx from bs4 import BeautifulSoup from docx import Document as DocxDocument from app.config import settings ALLOWED_EXTENSIONS = {".pdf", ".docx", ".txt", ".md"} USER_AGENT = "Mozilla/5.0 (compatible; StudysonBot/1.0; +https://github.com/)" class DocumentProcessor: @staticmethod def validate_file_type(filename: str) -> bool: return Path(filename).suffix.lower() in ALLOWED_EXTENSIONS @staticmethod async def extract_text(file_path: Path) -> str: suffix = file_path.suffix.lower() if suffix == ".pdf": return DocumentProcessor._extract_pdf(file_path) if suffix == ".docx": return DocumentProcessor._extract_docx(file_path) if suffix in {".txt", ".md"}: return file_path.read_text(encoding="utf-8", errors="replace") raise ValueError(f"Unsupported file type: {suffix}") @staticmethod def _extract_pdf(file_path: Path) -> str: with fitz.open(str(file_path)) as doc: return "\n\n".join(page.get_text() for page in doc) @staticmethod def _extract_docx(file_path: Path) -> str: document = DocxDocument(str(file_path)) return "\n".join(p.text for p in document.paragraphs if p.text) @staticmethod async def scrape_url(url: str) -> tuple[str, str]: timeout = httpx.Timeout(settings.scrape_timeout_seconds) headers = {"User-Agent": USER_AGENT, "Accept": "text/html,*/*"} async with httpx.AsyncClient( timeout=timeout, headers=headers, follow_redirects=True, max_redirects=5, ) as client: async with client.stream("GET", url) as response: response.raise_for_status() content_type = response.headers.get("content-type", "") if "html" not in content_type and "text" not in content_type: raise ValueError(f"Unsupported content-type: {content_type}") chunks: list[bytes] = [] total = 0 async for chunk in response.aiter_bytes(): total += len(chunk) if total > settings.max_scrape_bytes: raise ValueError( f"Page exceeds {settings.max_scrape_bytes} byte limit" ) chunks.append(chunk) html = b"".join(chunks).decode( response.encoding or "utf-8", errors="replace" ) soup = BeautifulSoup(html, "html.parser") for tag in soup(["script", "style", "nav", "footer", "header", "aside", "noscript"]): tag.decompose() title_tag = soup.find("title") title = title_tag.get_text(strip=True) if title_tag else "Web Document" text = soup.get_text(separator="\n", strip=True) return title, text @staticmethod def clean_text(text: str) -> str: text = re.sub(r"[ \t]+", " ", text) lines = (line.strip() for line in text.splitlines()) return "\n".join(line for line in lines if line)