| import requests |
| from bs4 import BeautifulSoup |
| import html2text |
| import re |
| import os |
| from modules import app_constants, file_utils, app_logger |
| import json |
| from langchain_openai import ChatOpenAI |
| from langchain.schema import HumanMessage, SystemMessage |
| import spacy |
| from duckduckgo_search import DDGS |
| nlp = spacy.load("en_core_web_sm") |
|
|
| |
| app_logger = app_logger.app_logger |
|
|
| TMP_DIRECTORY = app_constants.WORKSPACE_DIRECTORY + 'tmp' |
| DEFAULT_SEARCH_COUNT = app_constants.SEARCH_COUNT |
|
|
| def download_and_clean(url): |
| try: |
| headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:99.0) Gecko/20100101 Firefox/99.0"} |
| response = requests.get(url, headers=headers) |
| response.raise_for_status() |
|
|
| soup = BeautifulSoup(response.content, 'html.parser') |
| for script in soup(["script", "style", "img", "a"]): |
| script.extract() |
|
|
| body_text = soup.get_text() |
| h = html2text.HTML2Text() |
| h.ignore_links = True |
| h.ignore_images = True |
| h.ignore_emphasis = True |
| h.ignore_tables = True |
| clean_text = h.handle(body_text) |
| clean_text = re.sub(r'[^\w\s\n<>/\.]+', '', clean_text) |
| clean_text = re.sub(r'\s+', ' ', clean_text).strip() |
| return clean_text |
|
|
| except requests.exceptions.RequestException as e: |
| app_logger.error(f"Error while downloading and cleaning URL {url}: {str(e)}") |
| return None |
|
|
| def save_notes_to_file(topic, note, source_url): |
| |
| doc = nlp(note) |
|
|
| |
| if not os.path.exists(TMP_DIRECTORY): |
| os.makedirs(TMP_DIRECTORY) |
|
|
| |
| sanitized_filename = file_utils.sanitize_filename(topic)+'.jsonl' |
| full_path = os.path.join(TMP_DIRECTORY, sanitized_filename) |
|
|
| |
| text_block = "" |
| word_count = 0 |
|
|
| |
| with open(full_path, 'a') as file: |
| for sent in doc.sents: |
| sentence_word_count = len(sent.text.split()) |
| if word_count + sentence_word_count > 240: |
| |
| if word_count >= 120: |
| data = { |
| "note": text_block, |
| "source_url": source_url |
| } |
| file.write(json.dumps(data) + '\n') |
| |
| text_block = sent.text |
| word_count = sentence_word_count |
| else: |
| |
| text_block += ' ' + sent.text if text_block else sent.text |
| word_count += sentence_word_count |
|
|
| |
| if word_count >= 300: |
| data = { |
| "note": text_block, |
| "source_url": source_url |
| } |
| file.write(json.dumps(data) + '\n') |
|
|
| app_logger.info(f"Notes saved to file {full_path}") |
| return full_path |
|
|
|
|
| def url_list_downloader(url_list, topic): |
| notes_file = None |
| for url in url_list: |
| try: |
| text = download_and_clean(url) |
| if text: |
| notes_file = save_notes_to_file(topic, text, url) |
| except Exception as e: |
| app_logger.error(f"Error during processing for URL {url}: {e}") |
| return notes_file |
|
|
| def search_term_ddg(topic,count=DEFAULT_SEARCH_COUNT): |
| try: |
| llm = ChatOpenAI( |
| model_name=app_constants.MODEL_NAME, |
| openai_api_key=app_constants.openai_api_key, |
| base_url=app_constants.local_model_uri, |
| streaming=True |
| ) |
| prompt = [ |
| SystemMessage(content="Generate 5 plain keywords in comma separated based on user input. For example ['cat','bat','monkey','donkey','eagel']"), |
| HumanMessage(content=topic), |
| ] |
| response = llm(prompt) |
| |
| if hasattr(response, 'content'): |
| search_keywords = response.content |
| else: |
| raise ValueError("Invalid response format") |
| |
| |
| search_keywords = [keyword.strip() for keyword in search_keywords.split(',')] |
| |
| |
| search_keywords = search_keywords[:8] |
|
|
| urls = [] |
| |
| with DDGS(timeout=3) as ddgs: |
| for term in search_keywords: |
| |
| results = ddgs.text(f"{topic} {term}", max_results=count) |
| for result in results: |
| url = result['href'] |
| if not url.endswith(('.pdf', '.ppt', '.pptx', '.doc', '.docx')): |
| urls.append(url) |
| return sorted(set(urls)) |
|
|
| except Exception as e: |
| app_logger.error(f"An error occurred while searching for topic {topic}: {e}") |
| return [] |
|
|
| def explore_url_on_internet(topic, count=DEFAULT_SEARCH_COUNT): |
| app_logger.info(f"Starting research on topic {topic}") |
| |
| sanitized_filename = file_utils.sanitize_filename(topic)+'.jsonl' |
| full_path = os.path.join(TMP_DIRECTORY, sanitized_filename) |
|
|
| |
| if os.path.exists(full_path): |
| app_logger.info(f"File already exists skipping download: ",full_path) |
| note_file = full_path |
| else: |
| url_list = search_term_ddg(topic,count) |
| note_file = url_list_downloader(url_list, topic) |
| app_logger.info(f"Research on Internet completed for {topic}, file: {note_file}") |
| return note_file |
|
|