| import os
|
| import logging
|
| import re
|
| import shutil
|
| import tempfile
|
| import zipfile
|
| import requests
|
| from typing import Optional
|
| from bs4 import BeautifulSoup
|
|
|
| import gdown
|
| from pypdf import PdfReader
|
| import docx as python_docx
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]:
|
| logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}")
|
| text_content = None
|
| try:
|
| if file_type == 'pdf':
|
| reader = PdfReader(file_path)
|
| text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text())
|
| elif file_type == 'docx':
|
| doc = python_docx.Document(file_path)
|
| text_content = "\n".join(para.text for para in doc.paragraphs if para.text)
|
| elif file_type == 'txt':
|
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
| text_content = f.read()
|
| else:
|
| logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type}")
|
| return None
|
|
|
| if not text_content or not text_content.strip():
|
| logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}")
|
| return None
|
|
|
| return text_content.strip()
|
| except Exception as e:
|
| logger.error(f"[TEXT_EXTRACTION] Error extracting text: {e}", exc_info=True)
|
| return None
|
|
|
| FAISS_RAG_SUPPORTED_EXTENSIONS = {
|
| 'pdf': lambda path: extract_text_from_file(path, 'pdf'),
|
| 'docx': lambda path: extract_text_from_file(path, 'docx'),
|
| 'txt': lambda path: extract_text_from_file(path, 'txt'),
|
| }
|
|
|
| def fetch_and_clean_rentry(url: str, output_txt_path: str) -> bool:
|
| """Fetches HTML from a URL (specifically rentry.co), cleans it, and saves it as pure text."""
|
| try:
|
| logger.info(f"[URL_FETCH] Fetching and cleaning data from: {url}")
|
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
|
| response = requests.get(url, headers=headers, timeout=15)
|
| response.raise_for_status()
|
|
|
| soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
|
| for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]):
|
| tag.decompose()
|
|
|
|
|
| entry = soup.find('div', class_='entry-text')
|
| if entry:
|
| text = entry.get_text(separator='\n', strip=True)
|
| else:
|
|
|
| text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True)
|
|
|
|
|
| text = re.sub(r'\n\s*\n', '\n\n', text)
|
|
|
|
|
| text = re.sub(r'(?<!\n)([Qq](?:uestion|UESTION)?\s*\d*\s*:)', r'\n\n\1', text)
|
| text = re.sub(r'\n{3,}', '\n\n', text)
|
|
|
|
|
| os.makedirs(os.path.dirname(output_txt_path), exist_ok=True)
|
|
|
| with open(output_txt_path, 'w', encoding='utf-8') as f:
|
| f.write(text.strip())
|
|
|
| logger.info(f"[URL_FETCH] Success! Text saved to {output_txt_path}")
|
| return True
|
| except Exception as e:
|
| logger.error(f"[URL_FETCH] Error fetching/cleaning URL {url}: {e}", exc_info=True)
|
| return False
|
|
|
| def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]:
|
| if not url_or_id: return None
|
| match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id)
|
| if match_folder: return match_folder.group(1)
|
| match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id)
|
| if match_file_d: return match_file_d.group(1)
|
| match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id)
|
| if match_uc: return match_uc.group(1)
|
| return url_or_id if len(url_or_id) > 10 else None
|
|
|
| def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool:
|
| logger.info(f"[GDRIVE_SINGLE] Downloading file. Input: {file_id_or_url}")
|
| file_id = get_id_from_gdrive_input(file_id_or_url)
|
| if not file_id: return False
|
|
|
| try:
|
| os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
| gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True)
|
|
|
| if os.path.exists(target_path) and os.path.getsize(target_path) > 0:
|
| return True
|
| return False
|
| except Exception as e:
|
| logger.error(f"[GDRIVE_SINGLE] Error: {e}", exc_info=True)
|
| return False
|
|
|
| def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool:
|
| logger.info(f"[GDRIVE] Downloading folder. Input: {folder_id_or_url}")
|
| folder_id = get_id_from_gdrive_input(folder_id_or_url)
|
| if not folder_id: return False
|
|
|
| temp_dir = tempfile.mkdtemp()
|
| try:
|
| gdown.download_folder(id=folder_id, output=temp_dir, quiet=False, use_cookies=False)
|
| if not os.path.exists(target_dir_for_contents):
|
| os.makedirs(target_dir_for_contents)
|
|
|
| src_root = temp_dir
|
| if len(os.listdir(temp_dir)) == 1 and os.path.isdir(os.path.join(temp_dir, os.listdir(temp_dir)[0])):
|
| src_root = os.path.join(temp_dir, os.listdir(temp_dir)[0])
|
|
|
| for item in os.listdir(src_root):
|
| shutil.move(os.path.join(src_root, item), os.path.join(target_dir_for_contents, item))
|
| return True
|
| except Exception as e:
|
| logger.error(f"[GDRIVE] Error: {e}", exc_info=True)
|
| return False
|
| finally:
|
| shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
| def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool:
|
| logger.info(f"[GDRIVE_ZIP] Downloading ZIP. Input: {file_id_or_url}")
|
| file_id = get_id_from_gdrive_input(file_id_or_url)
|
| if not file_id: return False
|
|
|
| temp_zip = os.path.join(tempfile.gettempdir(), "temp_download.zip")
|
| try:
|
| gdown.download(id=file_id, output=temp_zip, quiet=False)
|
| with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
|
| zip_ref.extractall(target_extraction_dir)
|
| return True
|
| except Exception as e:
|
| logger.error(f"[GDRIVE_ZIP] Error: {e}", exc_info=True)
|
| return False |