import os import logging import re import shutil import tempfile import zipfile import requests from typing import Optional from bs4 import BeautifulSoup import gdown from pypdf import PdfReader import docx as python_docx logger = logging.getLogger(__name__) def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]: logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}") text_content = None try: if file_type == 'pdf': reader = PdfReader(file_path) text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text()) elif file_type == 'docx': doc = python_docx.Document(file_path) text_content = "\n".join(para.text for para in doc.paragraphs if para.text) elif file_type == 'txt': with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text_content = f.read() else: logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}") return None if not text_content or not text_content.strip(): logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}") return None return text_content.strip() except Exception as e: logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True) return None FAISS_RAG_SUPPORTED_EXTENSIONS = { 'pdf': lambda path: extract_text_from_file(path, 'pdf'), 'docx': lambda path: extract_text_from_file(path, 'docx'), 'txt': lambda path: extract_text_from_file(path, 'txt'), 'csv': lambda path: "CSV_HANDLED_NATIVELY" # Bypassed directly in components to allow row-by-row chunks } def fetch_and_clean_url(url: str, output_txt_path: str) -> bool: """Fetches HTML from a URL, cleans it, and saves it as pure text.""" try: logger.info(f"[URL_FETCH] Fetching and cleaning data from: {url}") headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} response = requests.get(url, headers=headers, timeout=15) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Remove non-content tags completely for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]): tag.decompose() # Attempt to find specific content divs if they exist (generalizes rentry and others) entry = soup.find('div', class_='entry-text') if entry: text = entry.get_text(separator='\n', strip=True) else: # Fallback for other sites or layout changes text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True) # Clean up excessive blank lines text = re.sub(r'\n\s*\n', '\n\n', text) # Ensure directory exists os.makedirs(os.path.dirname(output_txt_path), exist_ok=True) with open(output_txt_path, 'w', encoding='utf-8') as f: f.write(text) logger.info(f"[URL_FETCH] Success! Text saved to {output_txt_path}") return True except Exception as e: logger.error(f"[URL_FETCH] Error fetching/cleaning URL {url}: {e}", exc_info=True) return False def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]: if not url_or_id: return None match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id) if match_folder: return match_folder.group(1) match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id) if match_file_d: return match_file_d.group(1) match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id) if match_uc: return match_uc.group(1) return url_or_id if len(url_or_id) > 10 else None def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool: logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}") file_id = get_id_from_gdrive_input(file_id_or_url) if not file_id: return False try: os.makedirs(os.path.dirname(target_path), exist_ok=True) gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True) if os.path.exists(target_path) and os.path.getsize(target_path) > 0: return True return False except Exception as e: logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True) return False def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool: logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}") folder_id = get_id_from_gdrive_input(folder_id_or_url) if not folder_id: return False temp_dir = tempfile.mkdtemp() try: gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False) if not os.path.exists(target_dir_for_contents): os.makedirs(target_dir_for_contents) src_root = temp_dir if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])): src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0]) for item in os.listdir(src_root): shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item)) return True except Exception as e: logger.error(f"[GDRIVE] Error: {e}", exc_info=True) return False finally: shutil.rmtree(temp_dir, ignore_errors=True) def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool: logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}") file_id = get_id_from_gdrive_input(file_id_or_url) if not file_id: return False temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip") try: gdown.download(id=file_id, output=temp_zip, quiet=False) with zipfile.ZipFile(temp_zip, 'r') as zip_ref: zip_ref.extractall(target_extraction_dir) return True except Exception as e: logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True) return False