Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import re | |
| import shutil | |
| import tempfile | |
| import time | |
| from typing import Optional | |
| import zipfile | |
| import gdown | |
| from pypdf import PdfReader | |
| import docx as python_docx | |
| logger = logging.getLogger(__name__) | |
| def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]: | |
| logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}") | |
| text_content = None | |
| try: | |
| if file_type == 'pdf': | |
| reader = PdfReader(file_path) | |
| text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text()) | |
| elif file_type == 'docx': | |
| doc = python_docx.Document(file_path) | |
| text_content = "\n".join(para.text for para in doc.paragraphs if para.text) | |
| elif file_type == 'txt': | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| text_content = f.read() | |
| else: | |
| logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}") | |
| return None | |
| if not text_content or not text_content.strip(): | |
| logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}") | |
| return None | |
| return text_content.strip() | |
| except Exception as e: | |
| logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True) | |
| return None | |
| FAISS_RAG_SUPPORTED_EXTENSIONS = { | |
| 'pdf': lambda path: extract_text_from_file(path, 'pdf'), | |
| 'docx': lambda path: extract_text_from_file(path, 'docx'), | |
| 'txt': lambda path: extract_text_from_file(path, 'txt'), | |
| } | |
| def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]: | |
| if not url_or_id: return None | |
| match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_folder: return match_folder.group(1) | |
| match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_file_d: return match_file_d.group(1) | |
| match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_uc: return match_uc.group(1) | |
| return url_or_id if len(url_or_id) > 10 else None | |
| def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool: | |
| """ | |
| Downloads a single file (like users.csv) from GDrive to a specific path. | |
| """ | |
| logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}") | |
| file_id = get_id_from_gdrive_input(file_id_or_url) | |
| if not file_id: | |
| logger.error("[GDRIVE_SINGLE] Invalid ID") | |
| return False | |
| try: | |
| # Ensure dir exists | |
| os.makedirs(os.path.dirname(target_path), exist_ok=True) | |
| # fuzzy=True allows gdown to handle permissions more gracefully | |
| gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True) | |
| if os.path.exists(target_path) and os.path.getsize(target_path) > 0: | |
| logger.info("[GDRIVE_SINGLE] Success.") | |
| return True | |
| else: | |
| logger.error("[GDRIVE_SINGLE] Downloaded file is empty or missing.") | |
| return False | |
| except Exception as e: | |
| logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True) | |
| return False | |
| def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool: | |
| logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}") | |
| folder_id = get_id_from_gdrive_input(folder_id_or_url) | |
| if not folder_id: return False | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False) | |
| if not os.path.exists(target_dir_for_contents): | |
| os.makedirs(target_dir_for_contents) | |
| src_root = temp_dir | |
| if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])): | |
| src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0]) | |
| for item in os.listdir(src_root): | |
| shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item)) | |
| logger.info(f"[GDRIVE] Download complete.") | |
| return True | |
| except Exception as e: | |
| logger.error(f"[GDRIVE] Error: {e}", exc_info=True) | |
| return False | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool: | |
| logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}") | |
| file_id = get_id_from_gdrive_input(file_id_or_url) | |
| if not file_id: return False | |
| temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip") | |
| try: | |
| gdown.download(id=file_id, output=temp_zip, quiet=False) | |
| with zipfile.ZipFile(temp_zip, 'r') as zip_ref: | |
| zip_ref.extractall(target_extraction_dir) | |
| return True | |
| except Exception as e: | |
| logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True) | |
| return False |