Spaces:
Sleeping
Sleeping
| # file: app.py | |
| import gradio as gr | |
| import requests | |
| import json | |
| import concurrent.futures | |
| from concurrent.futures import ThreadPoolExecutor | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_community.document_loaders import WebBaseLoader | |
| from langdetect import detect_langs | |
| from PyPDF2 import PdfReader | |
| from io import BytesIO | |
| import logging | |
| from dotenv import load_dotenv | |
| import os | |
| load_dotenv() | |
| data = False | |
| seen = set() | |
| main_url = "https://similar-products-api.vercel.app/search/all" | |
| main_product = "Samsung Galaxy" | |
| API_URL = "https://api-inference.huggingface.co/models/google/flan-t5-xxl" | |
| headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACE_API_TOKEN')}"} | |
| logging.basicConfig(level=logging.INFO) | |
| def get_links(product): | |
| params = { | |
| "API_KEY": "12345", | |
| "product": f"{product}", | |
| } | |
| response = requests.get(main_url, params=params) | |
| if response.status_code == 200: | |
| results = response.json() | |
| return results | |
| else: | |
| return {} | |
| def language_preprocess(text): | |
| try: | |
| if detect_langs(text)[0].lang == 'en': | |
| return True | |
| return False | |
| except Exception as e: | |
| logging.error(f"Language detection error: {e}") | |
| return False | |
| def relevant(product, similar_product, content): | |
| try: | |
| payload = {"inputs": f'''Do you think that the given content is similar to {similar_product} and {product}, just Respond True or False \nContent for similar product: {content[:700]}'''} | |
| response = requests.post(API_URL, headers=headers, json=payload) | |
| output = response.json() | |
| return bool(output[0]['generated_text']) | |
| except Exception as e: | |
| logging.error(f"Relevance checking error: {e}") | |
| return False | |
| def download_pdf(url, timeout=10): | |
| try: | |
| response = requests.get(url, timeout=timeout) | |
| response.raise_for_status() | |
| return BytesIO(response.content) | |
| except requests.RequestException as e: | |
| logging.error(f"PDF download error: {e}") | |
| return None | |
| def extract_text_from_pdf(pdf_file, pages): | |
| reader = PdfReader(pdf_file) | |
| extracted_text = "" | |
| try: | |
| for page_num in pages: | |
| if page_num < len(reader.pages): | |
| page = reader.pages[page_num] | |
| extracted_text += page.extract_text() + "\n" | |
| else: | |
| logging.warning(f"Page {page_num} does not exist in the document.") | |
| return extracted_text | |
| except Exception as e: | |
| logging.error(f"PDF text extraction error: {e}") | |
| return 'हे चालत नाही' | |
| def extract_text_online(link): | |
| loader = WebBaseLoader(link) | |
| pages = loader.load_and_split() | |
| text = '' | |
| for page in pages[:3]: | |
| text+=page.page_content | |
| return text | |
| def process_link(link, similar_product): | |
| if link in seen: | |
| return None | |
| seen.add(link) | |
| try: | |
| if link[-3:]=='.md': | |
| text = extract_text_online(link) | |
| else: | |
| pdf_file = download_pdf(link) | |
| text = extract_text_from_pdf(pdf_file, [0, 2, 4]) | |
| if language_preprocess(text): | |
| if relevant(main_product, similar_product, text): | |
| return link | |
| except: | |
| pass | |
| return None | |
| def filtering(urls, similar_product): | |
| res = [] | |
| with ThreadPoolExecutor() as executor: | |
| futures = {executor.submit(process_link, link, similar_product): link for link in urls} | |
| for future in concurrent.futures.as_completed(futures): | |
| result = future.result() | |
| if result is not None: | |
| res.append(result) | |
| return res | |
| def wikipedia_url(product): | |
| api_url = "https://en.wikipedia.org/w/api.php" | |
| params = { | |
| "action": "opensearch", | |
| "search": product, | |
| "limit": 5, | |
| "namespace": 0, | |
| "format": "json" | |
| } | |
| try: | |
| response = requests.get(api_url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data and len(data) > 3 and len(data[3]) > 0: | |
| return data[3] | |
| else: | |
| return [] | |
| except requests.RequestException as e: | |
| logging.error(f"Error fetching Wikipedia URLs: {e}") | |
| return [] | |
| def preprocess_initial(product): | |
| return get_links(product) | |
| def preprocess_filter(product, data): | |
| for similar_product in data: | |
| # if similar_product != product: | |
| if list(data[similar_product][0])[0] == 'duckduckgo': | |
| s = set(('duckduckgo', 'google', 'archive')) | |
| temp = [] | |
| for idx, item in enumerate(data[similar_product]): | |
| if list(item)[0] in s: | |
| urls = data[similar_product][idx][list(item)[0]] | |
| temp += filtering(urls, similar_product) | |
| else: | |
| temp += data[similar_product][idx][list(item)[0]] | |
| data[similar_product] = temp | |
| data[similar_product] += wikipedia_url(similar_product) | |
| else: | |
| urls = data[similar_product] | |
| data[similar_product] = filtering(urls, similar_product) | |
| data[similar_product] += wikipedia_url(similar_product) | |
| logging.info('Filtering completed') | |
| return data | |
| def main(product_name): | |
| return preprocess_initial(product_name) | |
| def filter_links(product_name, initial_data): | |
| return preprocess_filter(product_name, initial_data) | |
| with gr.Blocks() as demo: | |
| product_name = gr.Textbox(label="Product Name") | |
| get_links_btn = gr.Button("Get Links") | |
| initial_links_output = gr.JSON() | |
| filter_btn = gr.Button("Filter Links") | |
| filtered_links_output = gr.JSON() | |
| get_links_btn.click(fn=main, inputs=product_name, outputs=initial_links_output) | |
| filter_btn.click(fn=filter_links, inputs=[product_name, initial_links_output], outputs=filtered_links_output) | |
| if __name__ == "__main__": | |
| demo.launch() | |