Case-Studies-Agent / files_process.py
abjasrees's picture
Rename file_process.py to files_process.py
c1f0ab8 verified
# files_process.py
import pathlib
from typing import Union
from pypdf import PdfReader
from docx import Document
def _read_file_by_ext(p: pathlib.Path) -> str:
ext = p.suffix.lower()
if ext == ".txt":
return p.read_text(encoding="utf-8", errors="ignore")
if ext == ".docx":
doc = Document(str(p))
return "\n".join(paragraph.text for paragraph in doc.paragraphs)
if ext == ".pdf":
reader = PdfReader(str(p))
pages = []
for page in reader.pages:
t = page.extract_text()
if t:
pages.append(t)
return "\n".join(pages)
raise ValueError(f"Unsupported file extension: {ext}. Use .txt / .docx / .pdf.")
def load_input_text(input_arg: Union[str, pathlib.Path]) -> str:
"""
Load text from a string, or from a file path (.txt, .docx, .pdf).
- If the argument looks like plain text (contains newlines or is very long), return it as-is.
- Otherwise, if it resolves to an existing file, read it by extension.
- On any OSError from filesystem probing (e.g., Errno 36), treat as raw text.
"""
if input_arg is None:
raise ValueError("input_arg is required")
if isinstance(input_arg, pathlib.Path):
try:
if input_arg.exists():
return _read_file_by_ext(input_arg)
return str(input_arg)
except OSError:
return str(input_arg)
s = str(input_arg)
if ("\n" in s) or ("\r" in s) or (len(s) > 512):
return s
p = pathlib.Path(s)
try:
if p.exists():
return _read_file_by_ext(p)
return s
except OSError:
return s
def prepare_input_arg(text_value: str | None, file_obj) -> str:
"""
Combine textbox text and a single uploaded file (.txt/.docx/.pdf).
If both present, concatenate into a temp text file and return its path.
Compatible with Gradio/Scripts where file_obj may have a .name attribute or be a dict.
"""
text = (text_value or "").strip()
if file_obj is None and not text:
raise ValueError("Provide either text or upload a .txt/.docx/.pdf")
# If only text
if file_obj is None:
return text
# Best-effort path extraction
if hasattr(file_obj, "name") and isinstance(file_obj.name, str):
up_path = pathlib.Path(file_obj.name)
elif isinstance(file_obj, dict) and "name" in file_obj:
up_path = pathlib.Path(file_obj["name"])
else:
# As a fallback, write bytes if available
data = getattr(file_obj, "read", None)
if callable(data):
content = file_obj.read()
up_path = pathlib.Path("/tmp/upload.bin")
up_path.write_bytes(content)
else:
raise ValueError("Unsupported uploaded file object; missing .name or .read()")
if text:
tmp = pathlib.Path("/tmp/_concat_input.txt")
tmp.write_text(text + "\n\n", encoding="utf-8")
appended = load_input_text(str(up_path))
tmp.write_text(tmp.read_text(encoding="utf-8") + appended, encoding="utf-8")
return str(tmp)
return str(up_path)