from datetime import datetime, timezone, timedelta from dateutil.tz import tzoffset, tzutc from urllib.parse import urlparse from bs4 import BeautifulSoup import gradio as gr import humanize import html import json import time import re import os from _vendor import vendor_llm_endpoint, vendor_llm_model from _spider import rss_spider # ==================================================================================================== def get_main_domain(url): if "nld.com.vn" in url: return "nld.com.vn" if "sggp.org.vn" in url: return "sggp.org.vn" hostname = urlparse(url).hostname if hostname is None: return None parts = hostname.split('.') if len(parts) <= 2: return hostname return '.'.join(parts[-2:]) def humanize_ago(dt): now = datetime.now(timezone.utc) dt_utc = dt.astimezone(timezone.utc) return humanize.naturaltime(now - dt_utc).capitalize() def clean_text(str_html): # Fix missing the leading & str_html = html.unescape(re.sub(r'#(\d+);', r'&#\1;', str_html)) # Remove all img tags soup = BeautifulSoup(str_html, "html.parser") for img in soup.find_all("img"): img.decompose() str_html = str(soup) # Return return str_html def clean_text_2(text): # Remove all a tags soup = BeautifulSoup(text, "html.parser") for e in soup.find_all("a"): e.decompose() text = str(soup) # Remove all HTML tags soup = BeautifulSoup(text, "html.parser") text = soup.get_text(separator=" ", strip=True) # Remove all newline characters return text.replace("\n", " ").replace("\t", " ").replace("
", " ") # ==================================================================================================== theme = gr.themes.Base( primary_hue="neutral", secondary_hue="neutral", neutral_hue="neutral", text_size="lg", font=[gr.themes.GoogleFont('Inter')], font_mono=[gr.themes.GoogleFont('Ubuntu Mono')], ) head = """ """ # * { -ms-overflow-style: none; scrollbar-width: none; } # *::-webkit-scrollbar { display: none; } css = """ #huggingface-space-header { display: none !important; } footer { display: none !important; } main { padding: 0 !important; max-width: 100% !important; } textarea { padding-top: 5px !important; padding-bottom: 6px !important; } .row { gap: 0 !important; } .gr_Markdown { background: transparent !important; border: none !important; padding: 14px 16px 16px 16px !important; text-align: justify; } .gr_Button { margin: 32px !important; width: initial !important; font-size: 16px !important; padding: 10px 12px 12px 12px !important; } /* ---------- Scrollbar ---------- */ ::-webkit-scrollbar { background: transparent; width: 8px; border-radius: 999px; } ::-webkit-scrollbar-track { background: transparent; border-radius: 999px; } ::-webkit-scrollbar-thumb { background: hsla(0, 0%, 50%, 0.5); border-radius: 999px; } ::-webkit-scrollbar-thumb:hover { background: hsla(0, 0%, 50%, 0.9); } /* ---------- Desktop/Mobile Only ---------- */ .desktop-only { display: block; } @media only screen and (max-width: 1000px) { .desktop-only { display: none; } } .mobile-only { display: block; } @media only screen and (min-width: 1000px) { .mobile-only { display: none; } } /* ---------- ---------- */ #all-news-items { display: flex; flex-direction: column; gap: 16px; height: 100svh; overflow-y: scroll; border-left: solid 1px hsla(0, 0%, 50%, .2); border-right: solid 1px hsla(0, 0%, 50%, .2); padding: 32px 24px 32px 32px; } .news-item { border-radius: 8px; background: hsla(0, 0%, 100%, 0.05); border: solid 1px hsla(0, 0%, 100%, 0.05); padding: 16px 18px; } .news-item a { padding: 0 !important; text-align: left !important; } .news-item a .news-title { font-size: 20px !important; font-weight: 600 !important; line-height: 1.3 !important; margin: 0 !important; color: white !important; } .news-info, .news-info * { font-size: 14px !important; color: grey !important; } .news-summary, .news-summary * { font-size: 14px !important; color: grey !important; margin: 0 !important; line-height: 1.5 !important; text-align: justify; } .news-info { margin-bottom: 4px !important; } .news-summary { margin-top: 8px !important; } """ offspellcheck = gr.InputHTMLAttributes(autocorrect="off", spellcheck=False) # ==================================================================================================== def fetch_all_rss(): print(f"> Fetching new RSS...") # time.sleep(5) return rss_spider() # ==================================================================================================== NEWS_ALL_ENTRIES = [] NEWS_LAST_UPDATE = datetime(2001, 1, 1) SUMMARY_LLM = "" SUMMARY_LAST_UPDATE = datetime(2001, 1, 1) def fn_btn_manual_fetch(): # ---------------------------------------------------------------------------------------------------- global NEWS_ALL_ENTRIES global NEWS_LAST_UPDATE if datetime.now() - NEWS_LAST_UPDATE > timedelta(minutes=10): NEWS_LAST_UPDATE = datetime.now() NEWS_ALL_ENTRIES = fetch_all_rss() else: time.sleep(2) # ---------------------------------------------------------------------------------------------------- return display_all_entries() def display_all_entries(): # ---------------------------------------------------------------------------------------------------- global NEWS_ALL_ENTRIES global NEWS_LAST_UPDATE if datetime.now() - NEWS_LAST_UPDATE > timedelta(minutes=60): NEWS_LAST_UPDATE = datetime.now() NEWS_ALL_ENTRIES = fetch_all_rss() else: pass # ---------------------------------------------------------------------------------------------------- html_content = "" html_content += "
" for e in NEWS_ALL_ENTRIES: html_content += f"""
{humanize_ago(e['time'])} • {get_main_domain(e['link'])}

{clean_text(e['title'])} ↗

{clean_text(e['summary'])}
""" html_content += "
" return html_content def fn_llm_summarize(): # ---------------------------------------------------------------------------------------------------- global NEWS_ALL_ENTRIES global SUMMARY_LLM global SUMMARY_LAST_UPDATE if datetime.now() - SUMMARY_LAST_UPDATE > timedelta(minutes=5): SUMMARY_LAST_UPDATE = datetime.now() # ---------- inputtext_news = "" for e in NEWS_ALL_ENTRIES[:50]: inputtext_news += f"""{humanize_ago(e['time'])} - {get_main_domain(e['link'])} - "{clean_text(e['title'])}" ({clean_text_2(clean_text(e['summary']))})\n""" # ---------- my_prompt = f"""\ Dưới đây là những tiêu đề báo mới nhất. Tóm tắt và phân tích tình hình thị trường một cách chuyên nghiệp. Không chào hỏi, không giới thiệu, không tương tác với người dùng; chỉ tập trung vào việc tóm tắt và phân tích. ----- {inputtext_news}\ -----\ """ # print("--------------------------------------------------") # print(my_prompt) # print("--------------------------------------------------") # ---------- SUMMARY_LLM = "" llm_res_stream = vendor_llm_endpoint.chat.completions.create( model=vendor_llm_model, messages=[{"role": "user", "content": my_prompt}], stream=True, ) for event in llm_res_stream: SUMMARY_LLM += event.choices[0].delta.content yield SUMMARY_LLM # ---------- else: time.sleep(2) # ---------------------------------------------------------------------------------------------------- yield SUMMARY_LLM # ==================================================================================================== with gr.Blocks(title="Tracking Spider") as demo: with gr.Row(): with gr.Column(scale=2): btn_manual_fetch = gr.Button("🕷 Tracking Spider", elem_classes="gr_Button desktop-only") gr.HTML(f""" """) with gr.Column(scale=4): display_all_news = gr.HTML(container=False) with gr.Column(scale=2): display_llm_summary = gr.Markdown(container=True, height="100svh", elem_classes="gr_Markdown") # ----- demo.load( fn=lambda: display_all_entries(), inputs=[], outputs=[display_all_news], show_progress="full", ).then( fn=fn_llm_summarize, inputs=[], outputs=[display_llm_summary], show_progress="full", ) # ----- gr.on( triggers=btn_manual_fetch.click, fn=fn_btn_manual_fetch, inputs=[], outputs=[display_all_news], show_progress="full", ).then( fn=fn_llm_summarize, inputs=[], outputs=[display_llm_summary], show_progress="full", ) demo.launch(head=head, css=css, theme=theme)