Spaces:
Sleeping
Sleeping
File size: 6,306 Bytes
5628f48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
"""
File processing utilities for the AI Web Visualization Generator.
This module handles the processing of various file types (text, PDF, CSV, Excel)
uploaded by users and converts them into text descriptions suitable for LLM prompts.
"""
import io
from pathlib import Path
from typing import List
import pandas as pd
from fastapi import UploadFile
from pypdf import PdfReader
class FileProcessor:
"""
Processes uploaded files and extracts their content for LLM prompts.
Supports multiple file formats including text files, PDFs, CSVs, and Excel files.
Binary files (images, audio, etc.) are identified but not processed.
"""
# Maximum content length to include in prompt (to avoid huge prompts)
MAX_CONTENT_LENGTH = 4000
# Supported text-based file extensions
TEXT_EXTENSIONS = {'.txt', '.md', '.py', '.js', '.html', '.css', '.json'}
# Supported spreadsheet extensions
EXCEL_EXTENSIONS = {'.xlsx', '.xls'}
async def process_uploaded_files(self, files: List[UploadFile]) -> str:
"""
Process multiple uploaded files and create a comprehensive text description.
Args:
files: List of uploaded files to process
Returns:
str: Formatted text description of all file contents
"""
if not files:
return "No files were provided."
file_contexts = []
for file in files:
context = await self._process_single_file(file)
file_contexts.append(context)
return (
"The user has provided the following files. "
"Use their content as context for your response:\n\n"
+ "\n\n".join(file_contexts)
)
async def _process_single_file(self, file: UploadFile) -> str:
"""
Process a single uploaded file.
Args:
file: The file to process
Returns:
str: Formatted description of the file content
"""
file_description = f"--- START OF FILE: {file.filename} ---"
content_summary = (
"Content: This is a binary file (e.g., image, audio). "
"It cannot be displayed as text but should be referenced in the "
"code by its filename."
)
file_extension = Path(file.filename).suffix.lower()
try:
content_bytes = await file.read()
content_summary = await self._extract_content(
content_bytes,
file_extension,
file.filename
)
except Exception as e:
print(f"Could not process file {file.filename}: {e}")
# Keep the default binary file message
finally:
await file.seek(0) # Reset file pointer for potential reuse
return (
f"{file_description}\n"
f"{content_summary}\n"
f"--- END OF FILE: {file.filename} ---"
)
async def _extract_content(
self,
content_bytes: bytes,
file_extension: str,
filename: str
) -> str:
"""
Extract text content from file bytes based on file type.
Args:
content_bytes: Raw file content
file_extension: File extension (e.g., '.pdf', '.csv')
filename: Original filename
Returns:
str: Extracted and possibly truncated content
"""
content = None
# Text-based files
if file_extension in self.TEXT_EXTENSIONS:
content = content_bytes.decode('utf-8', errors='replace')
# CSV files
elif file_extension == '.csv':
content = self._process_csv(content_bytes)
# PDF files
elif file_extension == '.pdf':
content = self._process_pdf(content_bytes)
# Excel files
elif file_extension in self.EXCEL_EXTENSIONS:
content = self._process_excel(content_bytes)
# If no specific handler, return default message
if content is None:
return (
"Content: This is a binary file (e.g., image, audio). "
"It cannot be displayed as text but should be referenced in the "
"code by its filename."
)
# Truncate if necessary
if len(content) > self.MAX_CONTENT_LENGTH:
content = content[:self.MAX_CONTENT_LENGTH] + "\n... (content truncated)"
return content
def _process_csv(self, content_bytes: bytes) -> str:
"""
Process CSV file content.
Args:
content_bytes: Raw CSV file bytes
Returns:
str: CSV content as text
"""
df = pd.read_csv(io.BytesIO(content_bytes))
return "File content represented as CSV:\n" + df.to_csv(index=False)
def _process_pdf(self, content_bytes: bytes) -> str:
"""
Process PDF file content and extract text.
Args:
content_bytes: Raw PDF file bytes
Returns:
str: Extracted text from all PDF pages
"""
reader = PdfReader(io.BytesIO(content_bytes))
text_parts = [
page.extract_text()
for page in reader.pages
if page.extract_text()
]
return "Extracted text from PDF:\n" + "\n".join(text_parts)
def _process_excel(self, content_bytes: bytes) -> str:
"""
Process Excel file content.
Args:
content_bytes: Raw Excel file bytes
Returns:
str: Content from all sheets as CSV format
"""
xls = pd.ExcelFile(io.BytesIO(content_bytes))
text_parts = []
for sheet_name in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet_name)
text_parts.append(
f"Sheet: '{sheet_name}'\n{df.to_csv(index=False)}"
)
return (
"File content represented as CSV for each sheet:\n"
+ "\n\n".join(text_parts)
) |