| from typing import List, Dict, Any, Optional |
| import aiohttp |
| from bs4 import BeautifulSoup |
| import faiss |
| import logging |
| from config.config import settings |
| import asyncio |
| from urllib.parse import urljoin |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class FAQService: |
| def __init__(self, model_service): |
| self.embedder = model_service.embedder |
| self.faiss_index = None |
| self.faq_data = [] |
| self.visited_urls = set() |
| self.base_url = "https://www.bofrost.de/faq/" |
|
|
| async def fetch_faq_pages(self) -> List[Dict[str, Any]]: |
| async with aiohttp.ClientSession() as session: |
| try: |
| |
| pages = await self.crawl_faq_pages(self.base_url, session) |
| return [page for page in pages if page] |
| except Exception as e: |
| logger.error(f"Error fetching FAQ pages: {e}") |
| return [] |
|
|
| async def crawl_faq_pages(self, url: str, session: aiohttp.ClientSession) -> List[Dict[str, Any]]: |
| if url in self.visited_urls or not url.startswith(self.base_url): |
| return [] |
|
|
| self.visited_urls.add(url) |
| pages = [] |
|
|
| try: |
| async with session.get(url, timeout=settings.TIMEOUT) as response: |
| if response.status == 200: |
| content = await response.text() |
| soup = BeautifulSoup(content, 'html.parser') |
| |
| |
| page_content = await self.parse_faq_content(soup, url) |
| if page_content: |
| pages.append(page_content) |
|
|
| |
| tasks = [] |
| for link in soup.find_all('a', href=True): |
| href = link['href'] |
| full_url = urljoin(url, href) |
| |
| if (full_url.startswith(self.base_url) and |
| full_url not in self.visited_urls): |
| tasks.append(self.crawl_faq_pages(full_url, session)) |
| |
| if tasks: |
| results = await asyncio.gather(*tasks) |
| for result in results: |
| pages.extend(result) |
|
|
| except Exception as e: |
| logger.error(f"Error crawling FAQ page {url}: {e}") |
|
|
| return pages |
|
|
| async def parse_faq_content(self, soup: BeautifulSoup, url: str) -> Optional[Dict[str, Any]]: |
| try: |
| faqs = [] |
| faq_items = soup.find_all('div', class_='faq-item') |
| |
| for item in faq_items: |
| |
| question_elem = item.find('a', class_='headline-collapse') |
| if not question_elem: |
| continue |
| |
| question = question_elem.find('span') |
| if not question: |
| continue |
| |
| question_text = question.text.strip() |
| |
| |
| content_elem = item.find('div', class_='content-collapse') |
| if not content_elem: |
| continue |
| |
| wysiwyg = content_elem.find('div', class_='wysiwyg-content') |
| if not wysiwyg: |
| continue |
| |
| |
| answer_parts = [] |
| for elem in wysiwyg.find_all(['p', 'li']): |
| text = elem.get_text(strip=True) |
| if text: |
| answer_parts.append(text) |
| |
| answer_text = ' '.join(answer_parts) |
| |
| if question_text and answer_text: |
| faqs.append({ |
| "question": question_text, |
| "answer": answer_text |
| }) |
| |
| if faqs: |
| return { |
| "url": url, |
| "faqs": faqs |
| } |
| |
| except Exception as e: |
| logger.error(f"Error parsing FAQ content from {url}: {e}") |
| |
| return None |
|
|
| async def index_faqs(self): |
| faq_pages = await self.fetch_faq_pages() |
| |
| self.faq_data = [] |
| all_texts = [] |
| |
| for faq_page in faq_pages: |
| for item in faq_page['faqs']: |
| |
| combined_text = f"{item['question']} {item['answer']}" |
| all_texts.append(combined_text) |
| self.faq_data.append({ |
| "question": item['question'], |
| "answer": item['answer'], |
| "source": faq_page['url'] |
| }) |
| |
| if not all_texts: |
| logger.warning("No FAQ content found to index") |
| return |
| |
| |
| embeddings = self.embedder.encode(all_texts, convert_to_tensor=True).cpu().detach().numpy() |
| dimension = embeddings.shape[1] |
| self.faiss_index = faiss.IndexFlatL2(dimension) |
| self.faiss_index.add(embeddings) |
|
|
| async def search_faqs(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: |
| if not self.faiss_index: |
| await self.index_faqs() |
| |
| if not self.faq_data: |
| logger.warning("No FAQ data available for search") |
| return [] |
| |
| query_embedding = self.embedder.encode([query], convert_to_tensor=True).cpu().detach().numpy() |
| distances, indices = self.faiss_index.search(query_embedding, top_k) |
| |
| results = [] |
| for i, idx in enumerate(indices[0]): |
| if idx < len(self.faq_data): |
| result = self.faq_data[idx].copy() |
| result["score"] = float(distances[0][i]) |
| results.append(result) |
| |
| return results |