studyrag / app /utils /document_processor.py
beerohan
Flatten directory structure for deployment
5ac3946
from __future__ import annotations
import re
from pathlib import Path
import fitz
import httpx
from bs4 import BeautifulSoup
from docx import Document as DocxDocument
from app.config import settings
ALLOWED_EXTENSIONS = {".pdf", ".docx", ".txt", ".md"}
USER_AGENT = "Mozilla/5.0 (compatible; StudysonBot/1.0; +https://github.com/)"
class DocumentProcessor:
@staticmethod
def validate_file_type(filename: str) -> bool:
return Path(filename).suffix.lower() in ALLOWED_EXTENSIONS
@staticmethod
async def extract_text(file_path: Path) -> str:
suffix = file_path.suffix.lower()
if suffix == ".pdf":
return DocumentProcessor._extract_pdf(file_path)
if suffix == ".docx":
return DocumentProcessor._extract_docx(file_path)
if suffix in {".txt", ".md"}:
return file_path.read_text(encoding="utf-8", errors="replace")
raise ValueError(f"Unsupported file type: {suffix}")
@staticmethod
def _extract_pdf(file_path: Path) -> str:
with fitz.open(str(file_path)) as doc:
return "\n\n".join(page.get_text() for page in doc)
@staticmethod
def _extract_docx(file_path: Path) -> str:
document = DocxDocument(str(file_path))
return "\n".join(p.text for p in document.paragraphs if p.text)
@staticmethod
async def scrape_url(url: str) -> tuple[str, str]:
timeout = httpx.Timeout(settings.scrape_timeout_seconds)
headers = {"User-Agent": USER_AGENT, "Accept": "text/html,*/*"}
async with httpx.AsyncClient(
timeout=timeout,
headers=headers,
follow_redirects=True,
max_redirects=5,
) as client:
async with client.stream("GET", url) as response:
response.raise_for_status()
content_type = response.headers.get("content-type", "")
if "html" not in content_type and "text" not in content_type:
raise ValueError(f"Unsupported content-type: {content_type}")
chunks: list[bytes] = []
total = 0
async for chunk in response.aiter_bytes():
total += len(chunk)
if total > settings.max_scrape_bytes:
raise ValueError(
f"Page exceeds {settings.max_scrape_bytes} byte limit"
)
chunks.append(chunk)
html = b"".join(chunks).decode(
response.encoding or "utf-8", errors="replace"
)
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header", "aside", "noscript"]):
tag.decompose()
title_tag = soup.find("title")
title = title_tag.get_text(strip=True) if title_tag else "Web Document"
text = soup.get_text(separator="\n", strip=True)
return title, text
@staticmethod
def clean_text(text: str) -> str:
text = re.sub(r"[ \t]+", " ", text)
lines = (line.strip() for line in text.splitlines())
return "\n".join(line for line in lines if line)