deep_research / file_processor.py
OzanSevindir's picture
Upload folder using huggingface_hub
96ad218 verified
"""
File processor for attachment feature
Supports: txt, md, py, js, json, csv, pdf, docx, xlsx
"""
import os
from typing import Dict, Optional
import datetime
def process_file(file_path: str) -> Optional[Dict]:
"""
Extract text content from uploaded file
Args:
file_path: Path to the uploaded file
Returns:
Dictionary with file metadata and content, or None if processing failed
"""
try:
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
file_ext = os.path.splitext(filename)[1].lower()
# Process based on file type
content = extract_content(file_path, file_ext)
if content is None:
return None
# Truncate if too large (keep first 20,000 chars for context)
original_length = len(content)
max_chars = 20000
if original_length > max_chars:
content = content[:max_chars]
content += f"\n\n[πŸ“ Content truncated - original file was {original_length:,} characters, showing first {max_chars:,}]"
return {
"filename": filename,
"content": content,
"size_bytes": file_size,
"file_type": file_ext[1:], # Remove the dot
"uploaded_at": datetime.datetime.now().isoformat(),
"char_count": len(content)
}
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
return None
def extract_content(file_path: str, file_ext: str) -> Optional[str]:
"""Extract text content based on file extension"""
# Plain text files
if file_ext in ['.txt', '.md', '.log', '.json', '.html', '.xml', '.css', '.sql']:
return read_text_file(file_path)
# Code files
elif file_ext in ['.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.cpp', '.c', '.h',
'.cs', '.php', '.rb', '.go', '.rs', '.swift', '.kt', '.sh', '.yml', '.yaml']:
return read_text_file(file_path)
# CSV files
elif file_ext == '.csv':
return read_csv_file(file_path)
# PDF files
elif file_ext == '.pdf':
return read_pdf_file(file_path)
# Word documents
elif file_ext in ['.docx', '.doc']:
return read_docx_file(file_path)
# Excel files
elif file_ext in ['.xlsx', '.xls']:
return read_excel_file(file_path)
else:
return f"[❌ Unsupported file type: {file_ext}]"
def read_text_file(file_path: str) -> Optional[str]:
"""Read plain text file with multiple encoding attempts"""
encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
except Exception as e:
print(f"Error reading text file with {encoding}: {e}")
continue
return "[❌ Could not decode text file - unsupported encoding]"
def read_csv_file(file_path: str) -> Optional[str]:
"""Read CSV file and convert to formatted text"""
try:
import csv
with open(file_path, 'r', encoding='utf-8', newline='') as f:
reader = csv.reader(f)
rows = list(reader)
if not rows:
return "[Empty CSV file]"
# Format as text with column alignment
output = []
output.append(f"CSV Data ({len(rows)} rows):\n")
output.append("=" * 50)
# Header
if rows:
output.append(" | ".join(rows[0]))
output.append("-" * 50)
# Data rows (limit to first 100 rows for context)
for row in rows[1:101]:
output.append(" | ".join(str(cell) for cell in row))
if len(rows) > 101:
output.append(f"\n[... {len(rows) - 101} more rows]")
return "\n".join(output)
except Exception as e:
return f"[❌ Error reading CSV: {str(e)}]"
def read_pdf_file(file_path: str) -> Optional[str]:
"""Extract text from PDF file"""
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
pages_text = []
for i, page in enumerate(pdf.pages[:50]): # Limit to first 50 pages
text = page.extract_text()
if text:
pages_text.append(f"--- Page {i + 1} ---\n{text}")
if len(pdf.pages) > 50:
pages_text.append(f"\n[... {len(pdf.pages) - 50} more pages not shown]")
content = "\n\n".join(pages_text)
if not content.strip():
return "[❌ PDF appears to be empty or contains only images]"
return content
except ImportError:
return "[❌ pdfplumber not installed - run: pip install pdfplumber]"
except Exception as e:
return f"[❌ Error reading PDF: {str(e)}]"
def read_docx_file(file_path: str) -> Optional[str]:
"""Extract text from Word document"""
try:
from docx import Document
doc = Document(file_path)
paragraphs = []
for para in doc.paragraphs:
if para.text.strip():
paragraphs.append(para.text)
# Also extract text from tables
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells)
if row_text.strip():
paragraphs.append(row_text)
content = "\n\n".join(paragraphs)
if not content.strip():
return "[❌ Word document appears to be empty]"
return content
except ImportError:
return "[❌ python-docx not installed - run: pip install python-docx]"
except Exception as e:
return f"[❌ Error reading Word document: {str(e)}]"
def read_excel_file(file_path: str) -> Optional[str]:
"""Extract text from Excel file"""
try:
import pandas as pd
# Read all sheets
excel_file = pd.ExcelFile(file_path)
output = []
output.append(f"Excel File - {len(excel_file.sheet_names)} sheet(s)\n")
output.append("=" * 50)
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
output.append(f"\nπŸ“Š Sheet: {sheet_name}")
output.append(f"Dimensions: {df.shape[0]} rows Γ— {df.shape[1]} columns")
output.append("-" * 50)
# Convert to string representation (limit rows)
if len(df) > 50:
output.append(df.head(50).to_string(index=False))
output.append(f"\n[... {len(df) - 50} more rows]")
else:
output.append(df.to_string(index=False))
output.append("\n")
return "\n".join(output)
except ImportError:
return "[❌ pandas/openpyxl not installed - run: pip install pandas openpyxl]"
except Exception as e:
return f"[❌ Error reading Excel file: {str(e)}]"
def get_file_icon(file_type: str) -> str:
"""Return emoji icon for file type"""
icons = {
'txt': 'πŸ“„', 'md': 'πŸ“', 'pdf': 'πŸ“•', 'doc': 'πŸ“˜', 'docx': 'πŸ“˜',
'xls': 'πŸ“Š', 'xlsx': 'πŸ“Š', 'csv': 'πŸ“Š',
'json': 'πŸ“‹', 'xml': 'πŸ“‹', 'html': '🌐',
'py': '🐍', 'js': 'πŸ“œ', 'ts': 'πŸ“œ', 'java': 'β˜•', 'cpp': 'βš™οΈ',
'log': 'πŸ“‹', 'sql': 'πŸ—„οΈ', 'yml': 'βš™οΈ', 'yaml': 'βš™οΈ'
}
return icons.get(file_type, 'πŸ“Ž')
def format_file_size(size_bytes: int) -> str:
"""Format file size in human-readable format"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
else:
return f"{size_bytes / (1024 * 1024):.1f} MB"