import os import logging import re import shutil import tempfile import zipfile import requests from typing import Optional from bs4 import BeautifulSoup import gdown from pypdf import PdfReader import docx as python_docx logger = logging.getLogger(__name__) def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]: logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}") text_content = None try: if file_type == 'pdf': reader = PdfReader(file_path) text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text()) elif file_type == 'docx': doc = python_docx.Document(file_path) text_content = "\n".join(para.text for para in doc.paragraphs if para.text) elif file_type == 'txt': with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text_content = f.read() else: logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}") return None if not text_content or not text_content.strip(): logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}") return None return text_content.strip() except Exception as e: logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True) return None FAISS_RAG_SUPPORTED_EXTENSIONS = { 'pdf': lambda path: extract_text_from_file(path, 'pdf'), 'docx': lambda path: extract_text_from_file(path, 'docx'), 'txt': lambda path: extract_text_from_file(path, 'txt'), } def fetch_and_clean_rentry(url: str, output_txt_path: str) -> bool: """Fetches HTML from a URL (specifically rentry.co), cleans it, and saves it as pure text.""" try: logger.info(f"[URL_FETCH] Fetching and cleaning data from: {url}") headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} response = requests.get(url, headers=headers, timeout=15) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Remove non-content tags completely for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]): tag.decompose() # Attempt to find the specific rentry content div entry = soup.find('div', class_='entry-text') if entry: text = entry.get_text(separator='\n', strip=True) else: # Fallback for other sites or layout changes text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True) # Clean up excessive blank lines text = re.sub(r'\n\s*\n', '\n\n', text) # Ensure Q&A identifiers always start on a new line to act as strong chunk boundaries text = re.sub(r'(? Optional[str]: if not url_or_id: return None match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id) if match_folder: return match_folder.group(1) match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id) if match_file_d: return match_file_d.group(1) match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id) if match_uc: return match_uc.group(1) return url_or_id if len(url_or_id) > 10 else None def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool: logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}") file_id = get_id_from_gdrive_input(file_id_or_url) if not file_id: return False try: os.makedirs(os.path.dirname(target_path), exist_ok=True) gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True) if os.path.exists(target_path) and os.path.getsize(target_path) > 0: return True return False except Exception as e: logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True) return False def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool: logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}") folder_id = get_id_from_gdrive_input(folder_id_or_url) if not folder_id: return False temp_dir = tempfile.mkdtemp() try: gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False) if not os.path.exists(target_dir_for_contents): os.makedirs(target_dir_for_contents) src_root = temp_dir if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])): src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0]) for item in os.listdir(src_root): shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item)) return True except Exception as e: logger.error(f"[GDRIVE] Error: {e}", exc_info=True) return False finally: shutil.rmtree(temp_dir, ignore_errors=True) def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool: logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}") file_id = get_id_from_gdrive_input(file_id_or_url) if not file_id: return False temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip") try: gdown.download(id=file_id, output=temp_zip, quiet=False) with zipfile.ZipFile(temp_zip, 'r') as zip_ref: zip_ref.extractall(target_extraction_dir) return True except Exception as e: logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True) return False