Spaces:
Running
Running
| import os | |
| import logging | |
| import re | |
| import shutil | |
| import tempfile | |
| import zipfile | |
| import requests | |
| from typing import Optional | |
| from bs4 import BeautifulSoup | |
| import gdown | |
| from pypdf import PdfReader | |
| import docx as python_docx | |
| logger = logging.getLogger(__name__) | |
| def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]: | |
| logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}") | |
| text_content = None | |
| try: | |
| if file_type == 'pdf': | |
| reader = PdfReader(file_path) | |
| text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text()) | |
| elif file_type == 'docx': | |
| doc = python_docx.Document(file_path) | |
| text_content = "\n".join(para.text for para in doc.paragraphs if para.text) | |
| elif file_type == 'txt': | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| text_content = f.read() | |
| else: | |
| logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}") | |
| return None | |
| if not text_content or not text_content.strip(): | |
| logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}") | |
| return None | |
| return text_content.strip() | |
| except Exception as e: | |
| logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True) | |
| return None | |
| FAISS_RAG_SUPPORTED_EXTENSIONS = { | |
| 'pdf': lambda path: extract_text_from_file(path, 'pdf'), | |
| 'docx': lambda path: extract_text_from_file(path, 'docx'), | |
| 'txt': lambda path: extract_text_from_file(path, 'txt'), | |
| 'csv': lambda path: "CSV_HANDLED_NATIVELY" # Bypassed directly in components to allow row-by-row chunks | |
| } | |
| def fetch_and_clean_url(url: str, output_txt_path: str) -> bool: | |
| """Fetches HTML from a URL, cleans it, and saves it as pure text.""" | |
| try: | |
| logger.info(f"[URL_FETCH] Fetching and cleaning data from: {url}") | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} | |
| response = requests.get(url, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Remove non-content tags completely | |
| for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]): | |
| tag.decompose() | |
| # Attempt to find specific content divs if they exist (generalizes rentry and others) | |
| entry = soup.find('div', class_='entry-text') | |
| if entry: | |
| text = entry.get_text(separator='\n', strip=True) | |
| else: | |
| # Fallback for other sites or layout changes | |
| text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True) | |
| # Clean up excessive blank lines | |
| text = re.sub(r'\n\s*\n', '\n\n', text) | |
| # Ensure directory exists | |
| os.makedirs(os.path.dirname(output_txt_path), exist_ok=True) | |
| with open(output_txt_path, 'w', encoding='utf-8') as f: | |
| f.write(text) | |
| logger.info(f"[URL_FETCH] Success! Text saved to {output_txt_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"[URL_FETCH] Error fetching/cleaning URL {url}: {e}", exc_info=True) | |
| return False | |
| def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]: | |
| if not url_or_id: return None | |
| match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_folder: return match_folder.group(1) | |
| match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_file_d: return match_file_d.group(1) | |
| match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id) | |
| if match_uc: return match_uc.group(1) | |
| return url_or_id if len(url_or_id) > 10 else None | |
| def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool: | |
| logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}") | |
| file_id = get_id_from_gdrive_input(file_id_or_url) | |
| if not file_id: return False | |
| try: | |
| os.makedirs(os.path.dirname(target_path), exist_ok=True) | |
| gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True) | |
| if os.path.exists(target_path) and os.path.getsize(target_path) > 0: | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True) | |
| return False | |
| def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool: | |
| logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}") | |
| folder_id = get_id_from_gdrive_input(folder_id_or_url) | |
| if not folder_id: return False | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False) | |
| if not os.path.exists(target_dir_for_contents): | |
| os.makedirs(target_dir_for_contents) | |
| src_root = temp_dir | |
| if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])): | |
| src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0]) | |
| for item in os.listdir(src_root): | |
| shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item)) | |
| return True | |
| except Exception as e: | |
| logger.error(f"[GDRIVE] Error: {e}", exc_info=True) | |
| return False | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool: | |
| logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}") | |
| file_id = get_id_from_gdrive_input(file_id_or_url) | |
| if not file_id: return False | |
| temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip") | |
| try: | |
| gdown.download(id=file_id, output=temp_zip, quiet=False) | |
| with zipfile.ZipFile(temp_zip, 'r') as zip_ref: | |
| zip_ref.extractall(target_extraction_dir) | |
| return True | |
| except Exception as e: | |
| logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True) | |
| return False |