Spaces:
Sleeping
Sleeping
| from zipfile import ZipFile | |
| from lxml import etree | |
| from pathlib import Path | |
| from email import policy | |
| from email.parser import BytesParser | |
| import html2text | |
| from striprtf.striprtf import rtf_to_text | |
| from unstructured.partition.text import partition_text | |
| from pathlib import Path | |
| import requests | |
| import io | |
| from urllib.parse import urlparse | |
| import subprocess | |
| import tempfile | |
| def extract_pdf_poppler(pdf_path) -> str: | |
| pdf_path = str(pdf_path) | |
| output_path = str(Path(pdf_path).with_suffix(".txt")) | |
| result = subprocess.run( | |
| ["pdftotext", "-layout", pdf_path, output_path], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode != 0: | |
| print("PDF extraction failed:", result.stderr) | |
| return "" | |
| with open(output_path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| def annotate_tables(text: str) -> str: | |
| lines = text.splitlines() | |
| annotated = [] | |
| inside_table = False | |
| for line in lines: | |
| is_table_line = line.count(" ") >= 2 | |
| if is_table_line and not inside_table: | |
| annotated.append("<|TABLE|>") | |
| inside_table = True | |
| if not is_table_line and inside_table: | |
| annotated.append("<|ENDTABLE|>") | |
| inside_table = False | |
| annotated.append(line) | |
| if inside_table: | |
| annotated.append("<|ENDTABLE|>") | |
| return "\n".join(annotated) | |
| def extract_docx(docx_input) -> str: | |
| if isinstance(docx_input, (str, Path)): | |
| zipf = ZipFile(docx_input) | |
| elif isinstance(docx_input, io.BytesIO): | |
| zipf = ZipFile(docx_input) | |
| else: | |
| raise ValueError("Unsupported input type for extract_docx") | |
| xml_content = zipf.read("word/document.xml") | |
| tree = etree.fromstring(xml_content) | |
| ns = { | |
| "w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main", | |
| "a": "http://schemas.openxmlformats.org/drawingml/2006/main", | |
| "wps": "http://schemas.microsoft.com/office/word/2010/wordprocessingShape" | |
| } | |
| text_blocks = [] | |
| # Extract normal paragraphs | |
| paragraphs = tree.xpath("//w:body/w:p", namespaces=ns) | |
| for p in paragraphs: | |
| texts = p.xpath(".//w:t", namespaces=ns) | |
| para_text = "".join(t.text for t in texts if t.text) | |
| if para_text.strip(): | |
| text_blocks.append(para_text.strip()) | |
| # Extract text from tables | |
| tables = tree.xpath("//w:tbl", namespaces=ns) | |
| for tbl in tables: | |
| for row in tbl.xpath(".//w:tr", namespaces=ns): | |
| row_text = [] | |
| for cell in row.xpath(".//w:tc", namespaces=ns): | |
| texts = cell.xpath(".//w:t", namespaces=ns) | |
| cell_text = "".join(t.text for t in texts if t.text) | |
| row_text.append(cell_text.strip()) | |
| if row_text: | |
| text_blocks.append(" | ".join(row_text)) | |
| # Extract text from textboxes | |
| tb_contents = tree.xpath("//w:txbxContent", namespaces=ns) | |
| for tb in tb_contents: | |
| texts = tb.xpath(".//w:t", namespaces=ns) | |
| tb_text = "".join(t.text for t in texts if t.text) | |
| if tb_text.strip(): | |
| text_blocks.append(tb_text.strip()) | |
| return "\n\n".join(text_blocks) | |
| def extract_pdf(pdf_input) -> str: | |
| blocks = [] | |
| if isinstance(pdf_input, (str, Path)): | |
| file_path = str(pdf_input) | |
| elif isinstance(pdf_input, io.BytesIO): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: | |
| tmp.write(pdf_input.read()) | |
| tmp.flush() | |
| file_path = tmp.name | |
| else: | |
| raise ValueError("Unsupported input type for extract_pdf") | |
| text = extract_pdf_poppler(file_path) | |
| text = annotate_tables(text) | |
| return text | |
| def extract_eml(eml_input) -> str: | |
| if isinstance(eml_input, (str, Path)): | |
| with open(eml_input, 'rb') as f: | |
| msg = BytesParser(policy=policy.default).parse(f) | |
| elif isinstance(eml_input, io.BytesIO): | |
| msg = BytesParser(policy=policy.default).parse(eml_input) | |
| else: | |
| raise ValueError("Unsupported input type for extract_eml") | |
| parts = [] | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| if part.get_content_type() == 'text/plain': | |
| parts.append(part.get_content()) | |
| else: | |
| parts.append(msg.get_content()) | |
| return "\n".join(parts) | |
| def extract_html(html_input) -> str: | |
| if isinstance(html_input, (str, Path)): | |
| with open(html_input, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| elif isinstance(html_input, io.BytesIO): | |
| content = html_input.read().decode("utf-8", errors="ignore") | |
| else: | |
| raise ValueError("Unsupported input type for extract_html") | |
| return html2text.html2text(content) | |
| def extract_rtf(rtf_input) -> str: | |
| if isinstance(rtf_input, (str, Path)): | |
| with open(rtf_input, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| elif isinstance(rtf_input, io.BytesIO): | |
| content = rtf_input.read().decode("utf-8", errors="ignore") | |
| else: | |
| raise ValueError("Unsupported input type for extract_rtf") | |
| return rtf_to_text(content) | |
| def convert_google_docs_url(url: str) -> str: | |
| if "docs.google.com" in url: | |
| # Extract document ID from various Google Docs URL formats | |
| if "/document/d/" in url: | |
| doc_id = url.split("/document/d/")[1].split("/")[0] | |
| return f"https://docs.google.com/document/d/{doc_id}/export?format=pdf" | |
| elif "id=" in url: | |
| doc_id = url.split("id=")[1].split("&")[0] | |
| return f"https://docs.google.com/document/d/{doc_id}/export?format=pdf" | |
| # Handle URLs like the one you provided with complex parameters | |
| elif "?usp=drive_link" in url or "rtpof=true" in url: | |
| # Extract doc ID from the full URL | |
| if "/d/" in url: | |
| doc_id = url.split("/d/")[1].split("/")[0] | |
| return f"https://docs.google.com/document/d/{doc_id}/export?format=pdf" | |
| return url | |
| def detect_file_type_from_bytes(content: bytes) -> str: | |
| if content.startswith(b'%PDF'): | |
| return "pdf" | |
| if content[0:2] == b'PK' and b'word/' in content: | |
| return "docx" | |
| if b"Subject:" in content[:1000] or b"From:" in content[:1000]: | |
| return "eml" | |
| if b"<html" in content[:1000].lower() or b"<!doctype html" in content[:1000].lower(): | |
| return "html" | |
| if content.strip().startswith(b'{\\rtf'): | |
| return "rtf" | |
| if all(chr(b).isprintable() or chr(b).isspace() for b in content[:100]): | |
| return "txt" | |
| return None | |
| def extract(file_path_or_url: str): | |
| is_url = urlparse(file_path_or_url).scheme in ("http", "https") | |
| if is_url: | |
| file_path_or_url = convert_google_docs_url(file_path_or_url) | |
| try: | |
| response = requests.get(file_path_or_url) | |
| response.raise_for_status() | |
| content = response.content | |
| file_type = detect_file_type_from_bytes(content) | |
| file_like = io.BytesIO(content) | |
| except Exception as e: | |
| raise ValueError(f"Failed to fetch file: {e}") | |
| else: | |
| file_type = Path(file_path_or_url).suffix.lower().lstrip(".") | |
| file_like = file_path_or_url # keep as path for local files | |
| if file_type == "pdf": | |
| text = extract_pdf(file_like if is_url else file_path_or_url) | |
| elif file_type == "docx": | |
| text = extract_docx(file_like if is_url else file_path_or_url) | |
| elif file_type == "txt": | |
| if is_url: | |
| text = content.decode("utf-8", errors="ignore") | |
| else: | |
| with open(file_path_or_url, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| elif file_type == "eml": | |
| text = extract_eml(file_like if is_url else file_path_or_url) | |
| elif file_type == "html": | |
| text = extract_html(file_like if is_url else file_path_or_url) | |
| elif file_type == "rtf": | |
| text = extract_rtf(file_like if is_url else file_path_or_url) | |
| else: | |
| raise ValueError("Unsupported or undetectable file type.") | |
| elements = partition_text(text=text) | |
| chunks = [] | |
| section = "Unknown" | |
| for i, el in enumerate(elements): | |
| if el.category == "Title": | |
| section = el.text.strip() | |
| elif el.category in ["NarrativeText", "ListItem"]: | |
| chunks.append({ | |
| "clause_id": f"auto_{i}", | |
| "section_title": section, | |
| "raw_text": el.text.strip(), | |
| "source_file": ( | |
| Path(file_path_or_url).name if not is_url else file_path_or_url.split("/")[-1] | |
| ), | |
| "position_in_doc": i | |
| }) | |
| return chunks | |