import os import logging import re import shutil import tempfile import time from typing import Optional import zipfile import gdown from pypdf import PdfReader import docx as python_docx logger = logging.getLogger(__name__) def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]: logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}") text_content = None try: if file_type == 'pdf': reader = PdfReader(file_path) text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text()) elif file_type == 'docx': doc = python_docx.Document(file_path) text_content = "\n".join(para.text for para in doc.paragraphs if para.text) elif file_type == 'txt': with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text_content = f.read() else: logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}") return None if not text_content or not text_content.strip(): logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}") return None return text_content.strip() except Exception as e: logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True) return None FAISS_RAG_SUPPORTED_EXTENSIONS = { 'pdf': lambda path: extract_text_from_file(path, 'pdf'), 'docx': lambda path: extract_text_from_file(path, 'docx'), 'txt': lambda path: extract_text_from_file(path, 'txt'), } def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]: if not url_or_id: return None match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id) if match_folder: return match_folder.group(1) match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id) if match_file_d: return match_file_d.group(1) match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id) if match_uc: return match_uc.group(1) return url_or_id if len(url_or_id) > 10 else None def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool: """ Downloads a single file (like users.csv) from GDrive to a specific path. """ logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}") file_id = get_id_from_gdrive_input(file_id_or_url) if not file_id: logger.error("[GDRIVE_SINGLE] Invalid ID") return False try: # Ensure dir exists os.makedirs(os.path.dirname(target_path), exist_ok=True) # fuzzy=True allows gdown to handle permissions more gracefully gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True) if os.path.exists(target_path) and os.path.getsize(target_path) > 0: logger.info("[GDRIVE_SINGLE] Success.") return True else: logger.error("[GDRIVE_SINGLE] Downloaded file is empty or missing.") return False except Exception as e: logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True) return False def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool: logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}") folder_id = get_id_from_gdrive_input(folder_id_or_url) if not folder_id: return False temp_dir = tempfile.mkdtemp() try: gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False) if not os.path.exists(target_dir_for_contents): os.makedirs(target_dir_for_contents) src_root = temp_dir if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])): src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0]) for item in os.listdir(src_root): shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item)) logger.info(f"[GDRIVE] Download complete.") return True except Exception as e: logger.error(f"[GDRIVE] Error: {e}", exc_info=True) return False finally: shutil.rmtree(temp_dir, ignore_errors=True) def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool: logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}") file_id = get_id_from_gdrive_input(file_id_or_url) if not file_id: return False temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip") try: gdown.download(id=file_id, output=temp_zip, quiet=False) with zipfile.ZipFile(temp_zip, 'r') as zip_ref: zip_ref.extractall(target_extraction_dir) return True except Exception as e: logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True) return False