Spaces:
Running
Running
| import streamlit as st | |
| import pandas as pd | |
| import xml.etree.ElementTree as ET | |
| import requests | |
| from datetime import datetime, timedelta | |
| from streamlit_autorefresh import st_autorefresh | |
| import json | |
| import hmac | |
| import os | |
| import time | |
| from PIL import Image | |
| from azure.cosmos import CosmosClient | |
| TREND_TOPICS = { | |
| 1: "Autos and Vehicles", | |
| 2: "Beauty and Fashion", | |
| 3: "Business and Finance", | |
| 20: "Climate", | |
| 4: "Entertainment", | |
| 5: "Food and Drink", | |
| 6: "Games", | |
| 7: "Health", | |
| 8: "Hobbies and Leisure", | |
| 9: "Jobs and Education", | |
| 10: "Law and Government", | |
| 11: "Other", | |
| 13: "Pets and Animals", | |
| 14: "Politics", | |
| 15: "Science", | |
| 16: "Shopping", | |
| 17: "Sports", | |
| 18: "Technology", | |
| 19: "Travel and Transportation" | |
| } | |
| client = CosmosClient(os.getenv("COSMOS_ENDPOINT"), os.getenv("COSMOS_KEY")) | |
| database = client.get_database_client(os.getenv("DATABASE_NAME")) | |
| container = database.get_container_client(os.getenv("CONTAINER_NAME")) | |
| def parse_url(url): | |
| response = requests.get(url) | |
| root = ET.fromstring(response.content) | |
| return root | |
| def get_latest_trend(country: str): | |
| """ | |
| Gibt das neueste Dokument für das angegebene Land zurück, | |
| basierend auf dem timestamp-Feld. | |
| """ | |
| query = f"SELECT * FROM c WHERE c.country = '{country}' ORDER BY c.timestamp DESC" | |
| items = list(container.query_items(query=query, enable_cross_partition_query=True)) | |
| return items[0] if items else None | |
| def convert_into_dict(req_json): | |
| result = {} | |
| # Iterate over each category in the JSON data | |
| for category, entries in req_json.items(): | |
| # Initialize the category if not already in result | |
| if category not in result: | |
| result[category] = {} | |
| for entry in entries: | |
| # Extract 'entityName' and 'searchQueries' from 'static_data' | |
| static_data = entry.get("static_data", []) | |
| if static_data and len(static_data[0]) >= 4: | |
| entity_name = static_data[0][0] # First element | |
| search_queries = static_data[0][3] # Fourth element | |
| else: | |
| entity_name = None | |
| search_queries = None | |
| # Initialize the entity under the category if not already present | |
| if entity_name: | |
| if entity_name not in result[category]: | |
| result[category][entity_name] = { | |
| "searchQueries": search_queries, | |
| "articles": [] | |
| } | |
| # Extract articles from 'dynamic_data' | |
| articles = entry.get("dynamic_data", {}).get("article", []) | |
| for article in articles: | |
| href = article.get("href") | |
| article_title = article.get("title") | |
| # Append the article information to the corresponding entity's article list | |
| result[category][entity_name]["articles"].append({ | |
| "href": href, | |
| "title": article_title | |
| }) | |
| return result | |
| def find_details(req_json, gewünschter_titel): | |
| gewünschte_details = [] | |
| for trend_info in req_json: | |
| if trend_info['title'] == gewünschter_titel: | |
| for article in trend_info['articles']: | |
| article_details = { | |
| 'url': article['url'], | |
| 'snippet': article['snippet'], | |
| 'articleTitle': article['articleTitle'], | |
| 'time': article['time'], | |
| 'source' : article['source'] | |
| } | |
| gewünschte_details.append(article_details) | |
| return gewünschte_details | |
| def find_details2(req_json): | |
| gewünschte_details = [] | |
| for article in req_json: | |
| article_details = { | |
| 'url': article['url'], | |
| 'snippet': article['snippet'], | |
| 'articleTitle': article['title'], | |
| 'source' : article['source'] | |
| } | |
| gewünschte_details.append(article_details) | |
| return gewünschte_details | |
| if 'reset' not in st.session_state: | |
| st.session_state.reset = False | |
| # Function to display articles for a specific category | |
| def display_articles_for_category(category): | |
| checkbox_statuses = {} | |
| urls = [] | |
| trending_data = st.session_state["real_trending_searches"][selected_country][category] | |
| #st.write(trending_data) | |
| active_string = " | :chart_with_upwards_trend: :green[Aktiver Trend]" | |
| if st.session_state.get("reset", False): | |
| for idx, (topic, data) in enumerate(trending_data.items()): | |
| for article_index, _ in enumerate(data["articles"]): | |
| checkbox_label = f"{category}_{idx}_{article_index + 1}" | |
| st.session_state[checkbox_label] = False | |
| for idx, (topic, data) in enumerate(trending_data.items()): | |
| with st.expander(f"{idx + 1}• {topic}" + (active_string if not data["is_trend_finished"] else "")): | |
| if data["related_queries"]: | |
| st.markdown("**Related Keywords:**") | |
| related_queries_string = f"{', '.join(data['related_queries'])}" | |
| st.markdown(related_queries_string) | |
| for article_index, article in enumerate(data["articles"], start=1): | |
| checkbox_label = f"{category}_{idx}_{article_index}" | |
| current_value = st.session_state.get(checkbox_label, False) | |
| checkbox_statuses[checkbox_label] = current_value | |
| disabled = (not current_value) and (sum(checkbox_statuses.values()) >= MAX_CHECKED) | |
| dt = datetime.fromisoformat(article["time"]) | |
| formatted_dt = dt.strftime("%d.%m.%Y %H:%M") | |
| st.markdown(f"{article_index}• {article['source']} / {formatted_dt}:") | |
| checkbox_statuses[checkbox_label] = st.checkbox( | |
| f"{article['title']} | [Go To →]({article['href']})", | |
| value=current_value, | |
| key=checkbox_label, | |
| disabled=disabled | |
| ) | |
| if checkbox_statuses[checkbox_label]: | |
| urls.append(article["href"]) | |
| base_url = os.getenv("url", "https://example.com/?") | |
| query_params = "&".join([f"article-links[]={u}" for u in urls]) | |
| full_url = f"{base_url}{query_params}" | |
| st.link_button("Open All Links", url=full_url) | |
| country_list = { | |
| "Germamy" : "DE", | |
| "Austria" : "AT" | |
| } | |
| if 'base_load_finished' not in st.session_state: | |
| st.session_state["real_trending_searches"] = {} | |
| st.session_state["base_data"] = {} | |
| st.session_state["pn"] = "AT" | |
| if 'base_load_finished' not in st.session_state or st.session_state.reset: | |
| with st.spinner("Loading Trends"): | |
| st.session_state["today"] = {} | |
| st.session_state["base"] = {} | |
| for country_name, pn_option in country_list.items(): | |
| st.session_state["base_data"][pn_option] = {} | |
| st.session_state["real_trending_searches"][pn_option] = {} | |
| try: | |
| st.session_state["real_trending_searches"][pn_option] = get_latest_trend(pn_option).get("trends") | |
| except: | |
| st.warning("No data found in the Trendcrawl" | |
| ) | |
| st.stop() | |
| st.session_state["_"] = 0 | |
| st.session_state["base_load_finished"]= True | |
| st.session_state["start_time"] = datetime.now() | |
| MAX_CHECKED = 3 | |
| def check_password(): | |
| """Returns `True` if the user had the correct password.""" | |
| def password_entered(): | |
| """Checks whether a password entered by the user is correct.""" | |
| if hmac.compare_digest(st.session_state["password"], os.environ.get("PASSWORD")): | |
| st.session_state["password_correct"] = True | |
| del st.session_state["password"] # Don't store the password. | |
| else: | |
| st.session_state["password_correct"] = False | |
| # Return True if the password is validated. | |
| if st.session_state.get("password_correct", False): | |
| return True | |
| # Show input for password. | |
| st.text_input( | |
| "Password", type="password", on_change=password_entered, key="password" | |
| ) | |
| if "password_correct" in st.session_state: | |
| st.error("😕 Password incorrect") | |
| return False | |
| if not check_password(): | |
| st.stop() # Do not continue if check_password is not True. | |
| fixed_order = [ | |
| "All categories", | |
| "Autos and Vehicles", | |
| "Beauty and Fashion", | |
| "Business and Finance", | |
| "Climate", | |
| "Entertainment", | |
| "Food and Drink", | |
| "Games", | |
| "Health", | |
| "Hobbies and Leisure", | |
| "Jobs and Education", | |
| "Law and Government", | |
| "Other", | |
| "Pets and Animals", | |
| "Politics", | |
| "Science", | |
| "Shopping", | |
| "Sports", | |
| "Technology", | |
| "Travel and Transportation", | |
| ] | |
| if 'selected_option' not in st.session_state: | |
| st.session_state['selected_option'] = "default_value" | |
| img = Image.open(r"heute_tensora.png") | |
| st.sidebar.image(img) | |
| elapsed_time = datetime.now() - st.session_state["start_time"] | |
| if elapsed_time > timedelta(minutes=10): | |
| st.markdown( | |
| """ | |
| <style> | |
| /* Zentriert den Button und vergrößert ihn */ | |
| div.stButton > button { | |
| width: 300px; | |
| height: 70px; | |
| font-size: 20px; | |
| margin: auto; | |
| display: block; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| if st.button(f"Letzte Aktualisierung {st.session_state['start_time'].strftime('%H:%M')}\n- Für Update Hier Klicken"): | |
| st.session_state["reset"] = True | |
| st.session_state["base_load_finished"] = False | |
| st.rerun() | |
| # Selectbox to choose a country | |
| selected_country = st.sidebar.selectbox("Choose a Country", ["AT", "DE"]) | |
| st.warning("Die aufgelisteten Keywörter für erhöhte Reichweite in den Überschriften verwenden") | |
| st.warning("Der Hinweis „Aktiver Trend“ signalisiert, dass zurzeit signifikant mehr Suchanfragen zu diesem Thema erfolgen. Bitte beachten Sie, dass auch andere Themen weiterhin von Relevanz sind.") | |
| available_topics = st.session_state["real_trending_searches"][selected_country].keys() | |
| filtered_topics = [topic for topic in TREND_TOPICS.values() if topic in available_topics] | |
| all_topics = ["All categories"] + filtered_topics | |
| auswahl = st.selectbox("Select Ressort", all_topics, index=0) | |
| display_articles_for_category(auswahl) | |
| if st.session_state["base_load_finished"]: | |
| st.session_state["_"] = st_autorefresh(interval=600000, limit=5, key=f"autorefresh_{st.session_state['start_time']}") | |
| st.markdown( | |
| """ | |
| <style> | |
| [data-testid="stToolbar"] { | |
| display: none; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| if st.session_state.reset: | |
| st.session_state["reset"] = False |