Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import requests | |
| from pathlib import Path | |
| from typing import List, Dict | |
| import PyPDF2 | |
| from docx import Document | |
| import docx2txt | |
| class BulkFileUploader: | |
| """Handle bulk file uploads from folder path or Google Drive""" | |
| SUPPORTED_EXTENSIONS = ['*.pdf', '*.docx', '*.doc'] | |
| def load_from_folder(folder_path: str) -> List[Dict]: | |
| """Load all supported document files from a local folder""" | |
| files = [] | |
| try: | |
| folder = Path(folder_path) | |
| if not folder.exists(): | |
| return [] | |
| # Load all supported file types | |
| for extension in BulkFileUploader.SUPPORTED_EXTENSIONS: | |
| for file in folder.glob(extension): | |
| with open(file, 'rb') as f: | |
| files.append({ | |
| 'name': file.name, | |
| 'content': f.read(), | |
| 'type': file.suffix.lower() | |
| }) | |
| return files | |
| except Exception as e: | |
| print(f"Error loading files from folder: {str(e)}") | |
| return [] | |
| def extract_drive_folder_id(drive_link: str) -> str: | |
| """Extract folder ID from Google Drive link""" | |
| # Handle different Google Drive URL formats | |
| if '/folders/' in drive_link: | |
| folder_id = drive_link.split('/folders/')[-1].split('?')[0] | |
| return folder_id | |
| return drive_link | |
| def load_from_google_drive(drive_link: str) -> List[Dict]: | |
| """ | |
| Load files from Google Drive folder | |
| Note: This requires the folder to be publicly accessible | |
| """ | |
| files = [] | |
| try: | |
| folder_id = BulkFileUploader.extract_drive_folder_id(drive_link) | |
| # Google Drive API endpoint for listing files | |
| # Note: This is a simplified version. For production, use proper OAuth2 | |
| api_url = f"https://www.googleapis.com/drive/v3/files" | |
| # Support multiple MIME types | |
| mime_types = [ | |
| 'application/pdf', | |
| 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx | |
| 'application/msword' # .doc | |
| ] | |
| mime_query = ' or '.join([f"mimeType='{mime}'" for mime in mime_types]) | |
| params = { | |
| 'q': f"'{folder_id}' in parents and ({mime_query})", | |
| 'key': os.getenv('GOOGLE_DRIVE_API_KEY', '') | |
| } | |
| response = requests.get(api_url, params=params) | |
| if response.status_code == 200: | |
| file_list = response.json().get('files', []) | |
| for file_info in file_list: | |
| file_id = file_info['id'] | |
| file_name = file_info['name'] | |
| # Download file content | |
| download_url = f"https://www.googleapis.com/drive/v3/files/{file_id}?alt=media" | |
| file_response = requests.get(download_url, params={'key': params['key']}) | |
| if file_response.status_code == 200: | |
| file_ext = os.path.splitext(file_name)[1].lower() | |
| files.append({ | |
| 'name': file_name, | |
| 'content': file_response.content, | |
| 'type': file_ext | |
| }) | |
| return files | |
| except Exception as e: | |
| print(f"Error loading files from Google Drive: {str(e)}") | |
| return [] | |
| def create_file_object(file_data: Dict): | |
| """Create a file-like object from file data for Streamlit compatibility""" | |
| class FileObject: | |
| def __init__(self, name, content, file_type): | |
| self.name = name | |
| self._content = content | |
| self.content = io.BytesIO(content) | |
| self.file_type = file_type | |
| # Set MIME type based on file extension | |
| if file_type == '.pdf': | |
| self.type = "application/pdf" | |
| elif file_type == '.docx': | |
| self.type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| elif file_type == '.doc': | |
| self.type = "application/msword" | |
| else: | |
| self.type = "application/octet-stream" | |
| def read(self, size=-1): | |
| """Read from the content buffer""" | |
| return self.content.read(size) | |
| def seek(self, position, whence=0): | |
| """Seek to a position in the content buffer""" | |
| return self.content.seek(position, whence) | |
| def tell(self): | |
| """Return current position in the content buffer""" | |
| return self.content.tell() | |
| def getvalue(self): | |
| """Get the entire content""" | |
| return self._content | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| pass | |
| return FileObject(file_data['name'], file_data['content'], file_data.get('type', '.pdf')) | |