gakrchat1 / backend /file_processor.py
extraplus's picture
Upload 2 files
5bf401b verified
"""
File processing module for documents, code, data files, and images.
"""
import csv
import io
import json
import mimetypes
import os
import time
from pathlib import Path
from typing import Dict, Any, List, Tuple
from uuid import uuid4
from PyPDF2 import PdfReader
from docx import Document
from PIL import Image, ExifTags
try:
import openpyxl
OPENPYXL_AVAILABLE = True
except Exception:
OPENPYXL_AVAILABLE = False
try:
import xlrd
XLRD_AVAILABLE = True
except Exception:
XLRD_AVAILABLE = False
try:
import pytesseract
PYTESSERACT_AVAILABLE = True
except Exception:
PYTESSERACT_AVAILABLE = False
from config import settings
# Ensure upload directory exists (with error handling)
try:
settings.UPLOAD_DIR.mkdir(exist_ok=True, parents=True)
except (OSError, PermissionError):
# Directory might already exist or parent is read-only
pass
TEXT_FILE_EXTENSIONS = {
".txt", ".md", ".markdown", ".rst",
".py", ".java", ".js", ".jsx", ".ts", ".tsx",
".html", ".htm", ".css", ".scss", ".sass",
".yaml", ".yml", ".xml", ".ini", ".cfg", ".conf", ".toml",
".sql", ".log", ".sh", ".bat", ".ps1"
}
JSON_EXTENSIONS = {".json"}
CSV_EXTENSIONS = {".csv"}
DOCX_EXTENSIONS = {".docx"}
EXCEL_EXTENSIONS = {".xlsx", ".xlsm", ".xltx", ".xltm"}
LEGACY_EXCEL_EXTENSIONS = {".xls"}
PDF_EXTENSIONS = {".pdf"}
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"}
def get_file_extension(filename: str) -> str:
"""Get file extension from filename"""
return Path(filename).suffix.lower()
def is_allowed_file(filename: str) -> bool:
"""Check if file type is allowed"""
ext = get_file_extension(filename)
return ext in settings.ALLOWED_EXTENSIONS
def detect_mime_type(file_content: bytes, filename: str) -> str:
"""Detect MIME type from file content and filename"""
mime_type, _ = mimetypes.guess_type(filename)
if mime_type:
return mime_type
if file_content.startswith(b"%PDF"):
return "application/pdf"
if file_content.startswith(b"\x89PNG"):
return "image/png"
if file_content.startswith(b"\xff\xd8"):
return "image/jpeg"
if file_content.startswith(b"GIF8"):
return "image/gif"
if file_content.startswith(b"PK"):
return "application/zip"
return "application/octet-stream"
def decode_text_bytes(file_content: bytes) -> str:
"""Decode text bytes with tolerant fallbacks."""
for encoding in ("utf-8", "utf-8-sig", "utf-16", "latin-1"):
try:
return file_content.decode(encoding)
except UnicodeDecodeError:
continue
return ""
def extract_text_from_txt(file_content: bytes) -> str:
"""Extract text from plain text/code/config files"""
content = decode_text_bytes(file_content)
if content == "":
return "[Error: Could not decode text file]"
return content
def extract_text_from_json(file_content: bytes) -> str:
"""Extract pretty JSON text when possible"""
raw = decode_text_bytes(file_content)
if not raw.strip():
return "[No content found in JSON file]"
try:
parsed = json.loads(raw)
return json.dumps(parsed, indent=2, ensure_ascii=False)
except Exception:
# Keep raw content if JSON is not perfectly valid.
return raw
def extract_text_from_csv(file_content: bytes, max_rows: int = 250) -> str:
"""Extract CSV as plain tabular text"""
raw = decode_text_bytes(file_content)
if not raw.strip():
return "[No content found in CSV file]"
try:
reader = csv.reader(io.StringIO(raw))
lines = []
for idx, row in enumerate(reader):
if idx >= max_rows:
lines.append(f"[CSV truncated to first {max_rows} rows]")
break
lines.append(" | ".join(str(cell).strip() for cell in row))
return "\n".join(lines) if lines else "[No rows found in CSV file]"
except Exception as exc:
return f"[Error reading CSV: {exc}]"
def extract_text_from_pdf(file_content: bytes) -> str:
"""Extract text from PDF file"""
try:
reader = PdfReader(io.BytesIO(file_content))
text_parts = []
for i, page in enumerate(reader.pages):
try:
page_text = page.extract_text() or ""
page_text = page_text.strip()
if page_text:
text_parts.append(f"--- Page {i + 1} ---\n{page_text}")
except Exception as exc:
text_parts.append(f"[Error extracting page {i + 1}: {exc}]")
if text_parts:
return "\n\n".join(text_parts)
return (
"[No extractable text found in PDF. "
"This PDF may be scanned/image-based, so OCR may be required.]"
)
except Exception as exc:
return f"[Error reading PDF: {exc}]"
def extract_text_from_docx(file_content: bytes) -> str:
"""Extract text from DOCX file"""
try:
doc = Document(io.BytesIO(file_content))
text_parts = []
for para in doc.paragraphs:
if para.text.strip():
text_parts.append(para.text.strip())
for table in doc.tables:
for row in table.rows:
row_cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if row_cells:
text_parts.append(" | ".join(row_cells))
return "\n\n".join(text_parts) if text_parts else "[No text content found in DOCX]"
except Exception as exc:
return f"[Error reading DOCX: {exc}]"
def extract_text_from_excel_xlsx(file_content: bytes, max_rows_per_sheet: int = 250) -> str:
"""Extract text from modern Excel files (.xlsx/.xlsm)"""
if not OPENPYXL_AVAILABLE:
return "[Error: openpyxl not installed. Install openpyxl to read .xlsx files.]"
try:
workbook = openpyxl.load_workbook(io.BytesIO(file_content), data_only=True, read_only=True)
parts = []
for sheet in workbook.worksheets:
parts.append(f"--- Sheet: {sheet.title} ---")
row_count = 0
for row in sheet.iter_rows(values_only=True):
row_count += 1
if row_count > max_rows_per_sheet:
parts.append(f"[Sheet truncated to first {max_rows_per_sheet} rows]")
break
values = ["" if value is None else str(value) for value in row]
if any(v.strip() for v in values):
parts.append(" | ".join(values))
return "\n".join(parts) if parts else "[No content found in Excel workbook]"
except Exception as exc:
return f"[Error reading Excel workbook: {exc}]"
def extract_text_from_excel_xls(file_content: bytes, max_rows_per_sheet: int = 250) -> str:
"""Extract text from legacy Excel .xls files"""
if not XLRD_AVAILABLE:
return (
"[Error: xlrd not installed. Legacy .xls is not available in this setup. "
"Please convert .xls to .xlsx for best support.]"
)
try:
workbook = xlrd.open_workbook(file_contents=file_content)
parts = []
for sheet_idx in range(workbook.nsheets):
sheet = workbook.sheet_by_index(sheet_idx)
parts.append(f"--- Sheet: {sheet.name} ---")
for row_idx in range(min(sheet.nrows, max_rows_per_sheet)):
values = [str(sheet.cell_value(row_idx, col_idx)) for col_idx in range(sheet.ncols)]
if any(v.strip() for v in values):
parts.append(" | ".join(values))
if sheet.nrows > max_rows_per_sheet:
parts.append(f"[Sheet truncated to first {max_rows_per_sheet} rows]")
return "\n".join(parts) if parts else "[No content found in legacy Excel file]"
except Exception as exc:
return f"[Error reading legacy Excel file: {exc}]"
def extract_image_info(file_content: bytes, filename: str) -> str:
"""Extract image metadata and OCR text (if OCR is available)."""
try:
image_file = io.BytesIO(file_content)
img = Image.open(image_file)
info_parts = [
f"Image: {filename}",
f"Format: {img.format}",
f"Size: {img.size[0]}x{img.size[1]} pixels",
f"Mode: {img.mode}",
]
# EXIF metadata
try:
exif = img._getexif() # pylint: disable=protected-access
if exif:
exif_data = []
for tag_id, value in exif.items():
tag = ExifTags.TAGS.get(tag_id, tag_id)
if tag in {"DateTime", "Make", "Model", "LensModel", "Software"}:
exif_data.append(f"{tag}: {value}")
if exif_data:
info_parts.append("EXIF: " + ", ".join(exif_data))
except Exception:
pass
ocr_text = ""
if PYTESSERACT_AVAILABLE:
try:
ocr_text = pytesseract.image_to_string(img).strip()
except Exception as exc:
info_parts.append(f"[OCR unavailable at runtime: {exc}]")
else:
info_parts.append("[OCR library not installed. Install pytesseract + Tesseract OCR for image text extraction.]")
if ocr_text:
info_parts.append("\nOCR Text:\n" + ocr_text)
else:
info_parts.append("\n[No OCR text extracted from image.]")
return "\n".join(info_parts)
except Exception as exc:
return f"[Error reading image: {exc}]"
def process_file(file_content: bytes, filename: str) -> Dict[str, Any]:
"""Process a file and extract usable text content."""
mime_type = detect_mime_type(file_content, filename)
ext = get_file_extension(filename)
if len(file_content) > settings.MAX_FILE_SIZE:
return {
"filename": filename,
"mime_type": mime_type,
"size": len(file_content),
"content": f"[Error: File exceeds maximum size of {settings.MAX_FILE_SIZE / (1024 * 1024):.1f}MB]",
"success": False,
}
if not is_allowed_file(filename):
return {
"filename": filename,
"mime_type": mime_type,
"size": len(file_content),
"content": f"[Error: File type '{ext}' is not supported. Allowed types: {', '.join(sorted(settings.ALLOWED_EXTENSIONS))}]",
"success": False,
}
success = True
if ext in TEXT_FILE_EXTENSIONS:
content = extract_text_from_txt(file_content)
elif ext in JSON_EXTENSIONS:
content = extract_text_from_json(file_content)
elif ext in CSV_EXTENSIONS:
content = extract_text_from_csv(file_content)
elif ext in PDF_EXTENSIONS:
content = extract_text_from_pdf(file_content)
elif ext in DOCX_EXTENSIONS:
content = extract_text_from_docx(file_content)
elif ext in EXCEL_EXTENSIONS:
content = extract_text_from_excel_xlsx(file_content)
elif ext in LEGACY_EXCEL_EXTENSIONS:
content = extract_text_from_excel_xls(file_content)
elif ext in IMAGE_EXTENSIONS:
content = extract_image_info(file_content, filename)
elif mime_type.startswith("text/"):
content = extract_text_from_txt(file_content)
else:
content = f"[Error: Unsupported file type '{mime_type}' ({ext})]"
success = False
if content.startswith("[Error:"):
success = False
max_chars = 80000
if len(content) > max_chars:
original_len = len(content)
content = content[:max_chars] + f"\n\n[Content truncated. Original length: {original_len} characters]"
return {
"filename": filename,
"mime_type": mime_type,
"size": len(file_content),
"content": content,
"success": success,
}
def process_multiple_files(files: List[Tuple[bytes, str]]) -> List[Dict[str, Any]]:
"""Process multiple files"""
return [process_file(file_content, filename) for file_content, filename in files]
def format_files_for_prompt(file_results: List[Dict[str, Any]]) -> str:
"""Format processed files for inclusion in the model prompt"""
if not file_results:
return ""
formatted_parts = [
"=" * 50,
"ATTACHED FILES",
"=" * 50,
]
for i, file_result in enumerate(file_results, 1):
formatted_parts.append(f"\n--- File {i}: {file_result['filename']} ---")
formatted_parts.append(f"Type: {file_result['mime_type']}")
formatted_parts.append(f"Size: {file_result['size']} bytes")
formatted_parts.append(f"Status: {'Success' if file_result['success'] else 'Failed'}")
formatted_parts.append("\nContent:")
formatted_parts.append(file_result["content"])
formatted_parts.append("-" * 40)
return "\n".join(formatted_parts)
def save_uploaded_file(file_content: bytes, filename: str) -> Path:
"""Save uploaded file to disk with a unique, stable filename."""
original_name = Path(filename).name
stem = Path(original_name).stem or "upload"
suffix = Path(original_name).suffix
timestamp = int(time.time() * 1000)
unique_id = uuid4().hex[:8]
safe_filename = f"{timestamp}_{unique_id}_{stem}{suffix}"
file_path = settings.UPLOAD_DIR / safe_filename
with open(file_path, "wb") as file_obj:
file_obj.write(file_content)
return file_path
def cleanup_old_uploads(max_age_hours: int = 24):
"""Clean up old uploaded files"""
import time
current_time = time.time()
max_age_seconds = max_age_hours * 3600
for file_path in settings.UPLOAD_DIR.iterdir():
if not file_path.is_file():
continue
file_age = current_time - os.path.getmtime(file_path)
if file_age > max_age_seconds:
try:
file_path.unlink()
print(f"Deleted old upload: {file_path.name}")
except Exception as exc:
print(f"Error deleting {file_path}: {exc}")