Shastradoc / utils /file_readers.py
Rahul-Samedavar's picture
fixed bugs
9120765
from zipfile import ZipFile
from lxml import etree
from pathlib import Path
from email import policy
from email.parser import BytesParser
import html2text
from striprtf.striprtf import rtf_to_text
from unstructured.partition.text import partition_text
from pathlib import Path
import requests
import io
from urllib.parse import urlparse
import subprocess
import tempfile
def extract_pdf_poppler(pdf_path) -> str:
pdf_path = str(pdf_path)
output_path = str(Path(pdf_path).with_suffix(".txt"))
result = subprocess.run(
["pdftotext", "-layout", pdf_path, output_path],
capture_output=True,
text=True
)
if result.returncode != 0:
print("PDF extraction failed:", result.stderr)
return ""
with open(output_path, "r", encoding="utf-8") as f:
return f.read()
def annotate_tables(text: str) -> str:
lines = text.splitlines()
annotated = []
inside_table = False
for line in lines:
is_table_line = line.count(" ") >= 2
if is_table_line and not inside_table:
annotated.append("<|TABLE|>")
inside_table = True
if not is_table_line and inside_table:
annotated.append("<|ENDTABLE|>")
inside_table = False
annotated.append(line)
if inside_table:
annotated.append("<|ENDTABLE|>")
return "\n".join(annotated)
def extract_docx(docx_input) -> str:
if isinstance(docx_input, (str, Path)):
zipf = ZipFile(docx_input)
elif isinstance(docx_input, io.BytesIO):
zipf = ZipFile(docx_input)
else:
raise ValueError("Unsupported input type for extract_docx")
xml_content = zipf.read("word/document.xml")
tree = etree.fromstring(xml_content)
ns = {
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main",
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
"wps": "http://schemas.microsoft.com/office/word/2010/wordprocessingShape"
}
text_blocks = []
# Extract normal paragraphs
paragraphs = tree.xpath("//w:body/w:p", namespaces=ns)
for p in paragraphs:
texts = p.xpath(".//w:t", namespaces=ns)
para_text = "".join(t.text for t in texts if t.text)
if para_text.strip():
text_blocks.append(para_text.strip())
# Extract text from tables
tables = tree.xpath("//w:tbl", namespaces=ns)
for tbl in tables:
for row in tbl.xpath(".//w:tr", namespaces=ns):
row_text = []
for cell in row.xpath(".//w:tc", namespaces=ns):
texts = cell.xpath(".//w:t", namespaces=ns)
cell_text = "".join(t.text for t in texts if t.text)
row_text.append(cell_text.strip())
if row_text:
text_blocks.append(" | ".join(row_text))
# Extract text from textboxes
tb_contents = tree.xpath("//w:txbxContent", namespaces=ns)
for tb in tb_contents:
texts = tb.xpath(".//w:t", namespaces=ns)
tb_text = "".join(t.text for t in texts if t.text)
if tb_text.strip():
text_blocks.append(tb_text.strip())
return "\n\n".join(text_blocks)
def extract_pdf(pdf_input) -> str:
blocks = []
if isinstance(pdf_input, (str, Path)):
file_path = str(pdf_input)
elif isinstance(pdf_input, io.BytesIO):
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(pdf_input.read())
tmp.flush()
file_path = tmp.name
else:
raise ValueError("Unsupported input type for extract_pdf")
text = extract_pdf_poppler(file_path)
text = annotate_tables(text)
return text
def extract_eml(eml_input) -> str:
if isinstance(eml_input, (str, Path)):
with open(eml_input, 'rb') as f:
msg = BytesParser(policy=policy.default).parse(f)
elif isinstance(eml_input, io.BytesIO):
msg = BytesParser(policy=policy.default).parse(eml_input)
else:
raise ValueError("Unsupported input type for extract_eml")
parts = []
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/plain':
parts.append(part.get_content())
else:
parts.append(msg.get_content())
return "\n".join(parts)
def extract_html(html_input) -> str:
if isinstance(html_input, (str, Path)):
with open(html_input, 'r', encoding='utf-8') as f:
content = f.read()
elif isinstance(html_input, io.BytesIO):
content = html_input.read().decode("utf-8", errors="ignore")
else:
raise ValueError("Unsupported input type for extract_html")
return html2text.html2text(content)
def extract_rtf(rtf_input) -> str:
if isinstance(rtf_input, (str, Path)):
with open(rtf_input, 'r', encoding='utf-8') as f:
content = f.read()
elif isinstance(rtf_input, io.BytesIO):
content = rtf_input.read().decode("utf-8", errors="ignore")
else:
raise ValueError("Unsupported input type for extract_rtf")
return rtf_to_text(content)
def convert_google_docs_url(url: str) -> str:
if "docs.google.com" in url:
# Extract document ID from various Google Docs URL formats
if "/document/d/" in url:
doc_id = url.split("/document/d/")[1].split("/")[0]
return f"https://docs.google.com/document/d/{doc_id}/export?format=pdf"
elif "id=" in url:
doc_id = url.split("id=")[1].split("&")[0]
return f"https://docs.google.com/document/d/{doc_id}/export?format=pdf"
# Handle URLs like the one you provided with complex parameters
elif "?usp=drive_link" in url or "rtpof=true" in url:
# Extract doc ID from the full URL
if "/d/" in url:
doc_id = url.split("/d/")[1].split("/")[0]
return f"https://docs.google.com/document/d/{doc_id}/export?format=pdf"
return url
def detect_file_type_from_bytes(content: bytes) -> str:
if content.startswith(b'%PDF'):
return "pdf"
if content[0:2] == b'PK' and b'word/' in content:
return "docx"
if b"Subject:" in content[:1000] or b"From:" in content[:1000]:
return "eml"
if b"<html" in content[:1000].lower() or b"<!doctype html" in content[:1000].lower():
return "html"
if content.strip().startswith(b'{\\rtf'):
return "rtf"
if all(chr(b).isprintable() or chr(b).isspace() for b in content[:100]):
return "txt"
return None
def extract(file_path_or_url: str):
is_url = urlparse(file_path_or_url).scheme in ("http", "https")
if is_url:
file_path_or_url = convert_google_docs_url(file_path_or_url)
try:
response = requests.get(file_path_or_url)
response.raise_for_status()
content = response.content
file_type = detect_file_type_from_bytes(content)
file_like = io.BytesIO(content)
except Exception as e:
raise ValueError(f"Failed to fetch file: {e}")
else:
file_type = Path(file_path_or_url).suffix.lower().lstrip(".")
file_like = file_path_or_url # keep as path for local files
if file_type == "pdf":
text = extract_pdf(file_like if is_url else file_path_or_url)
elif file_type == "docx":
text = extract_docx(file_like if is_url else file_path_or_url)
elif file_type == "txt":
if is_url:
text = content.decode("utf-8", errors="ignore")
else:
with open(file_path_or_url, 'r', encoding='utf-8') as f:
text = f.read()
elif file_type == "eml":
text = extract_eml(file_like if is_url else file_path_or_url)
elif file_type == "html":
text = extract_html(file_like if is_url else file_path_or_url)
elif file_type == "rtf":
text = extract_rtf(file_like if is_url else file_path_or_url)
else:
raise ValueError("Unsupported or undetectable file type.")
elements = partition_text(text=text)
chunks = []
section = "Unknown"
for i, el in enumerate(elements):
if el.category == "Title":
section = el.text.strip()
elif el.category in ["NarrativeText", "ListItem"]:
chunks.append({
"clause_id": f"auto_{i}",
"section_title": section,
"raw_text": el.text.strip(),
"source_file": (
Path(file_path_or_url).name if not is_url else file_path_or_url.split("/")[-1]
),
"position_in_doc": i
})
return chunks