| import os |
| import time |
| import base64 |
| import io |
| import traceback |
| import threading |
| import atexit |
| from datetime import datetime |
| from typing import Optional, Tuple |
|
|
| import gradio as gr |
| import anthropic |
| from selenium import webdriver |
| from selenium.webdriver.chrome.options import Options |
| from selenium.webdriver.chrome.service import Service |
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.support.ui import WebDriverWait, Select |
| from selenium.webdriver.support import expected_conditions as EC |
| from selenium.common.exceptions import TimeoutException, WebDriverException |
| from webdriver_manager.chrome import ChromeDriverManager |
| from PIL import Image |
| import requests |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| |
| driver = None |
| driver_lock = threading.Lock() |
| is_browser_ready = False |
|
|
| |
| def get_claude_client(): |
| api_key = os.getenv("ANTHROPIC_API_KEY") |
| if not api_key: |
| raise ValueError("ANTHROPIC_API_KEY ํ๊ฒฝ๋ณ์๊ฐ ์ค์ ๋์ง ์์์ต๋๋ค.") |
| |
| try: |
| |
| client = anthropic.Anthropic( |
| api_key=api_key, |
| max_retries=2, |
| timeout=60.0 |
| ) |
| return client |
| except Exception as e: |
| print(f"Claude ํด๋ผ์ด์ธํธ ์ด๊ธฐํ ์ค๋ฅ: {e}") |
| |
| return anthropic.Anthropic(api_key=api_key) |
|
|
| def init_browser(): |
| """๋ธ๋ผ์ฐ์ ๋ฅผ ์ด๊ธฐํํ๊ณ Google Trends ํ์ด์ง๋ก ์ด๋""" |
| global driver, is_browser_ready |
| |
| try: |
| print("Chrome ๋ธ๋ผ์ฐ์ ์ด๊ธฐํ ์ค...") |
| |
| |
| chrome_options = Options() |
| chrome_options.add_argument("--headless") |
| chrome_options.add_argument("--no-sandbox") |
| chrome_options.add_argument("--disable-dev-shm-usage") |
| chrome_options.add_argument("--disable-gpu") |
| chrome_options.add_argument("--disable-extensions") |
| chrome_options.add_argument("--disable-web-security") |
| chrome_options.add_argument("--allow-running-insecure-content") |
| chrome_options.add_argument("--disable-blink-features=AutomationControlled") |
| chrome_options.add_argument("--disable-features=VizDisplayCompositor") |
| chrome_options.add_argument("--window-size=1920,1080") |
| chrome_options.add_argument("--disable-background-timer-throttling") |
| chrome_options.add_argument("--disable-backgrounding-occluded-windows") |
| chrome_options.add_argument("--disable-renderer-backgrounding") |
| chrome_options.add_argument("--disable-features=TranslateUI") |
| chrome_options.add_argument("--disable-ipc-flooding-protection") |
| chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36") |
| |
| |
| service = Service(ChromeDriverManager().install()) |
| |
| |
| driver = webdriver.Chrome(service=service, options=chrome_options) |
| driver.set_page_load_timeout(30) |
| driver.implicitly_wait(10) |
| |
| print("Google Trends ํ์ด์ง๋ก ์ด๋ ์ค...") |
| driver.get("https://trends.google.com/trending?geo=KR") |
| |
| |
| WebDriverWait(driver, 20).until( |
| EC.presence_of_element_located((By.TAG_NAME, "body")) |
| ) |
| |
| |
| time.sleep(5) |
| |
| is_browser_ready = True |
| print("๋ธ๋ผ์ฐ์ ์ด๊ธฐํ ์๋ฃ!") |
| |
| except Exception as e: |
| print(f"๋ธ๋ผ์ฐ์ ์ด๊ธฐํ ์คํจ: {str(e)}") |
| print(traceback.format_exc()) |
| is_browser_ready = False |
| if driver: |
| try: |
| driver.quit() |
| except: |
| pass |
| driver = None |
|
|
| def cleanup_browser(): |
| """๋ธ๋ผ์ฐ์ ์ ๋ฆฌ""" |
| global driver |
| if driver: |
| try: |
| driver.quit() |
| print("๋ธ๋ผ์ฐ์ ์ ๋ฆฌ ์๋ฃ") |
| except: |
| pass |
| driver = None |
|
|
| def apply_filters(geo: str, time_range: str, category: str) -> bool: |
| """Google Trends ํํฐ ์ ์ฉ""" |
| global driver |
| |
| if not driver or not is_browser_ready: |
| return False |
| |
| try: |
| with driver_lock: |
| |
| try: |
| |
| if geo != "KR": |
| geo_elements = driver.find_elements(By.XPATH, f"//div[contains(text(), '{geo}')]") |
| if geo_elements: |
| driver.execute_script("arguments[0].click();", geo_elements[0]) |
| time.sleep(1) |
| |
| |
| if time_range != "24์๊ฐ": |
| time_elements = driver.find_elements(By.XPATH, f"//div[contains(text(), '{time_range}')]") |
| if time_elements: |
| driver.execute_script("arguments[0].click();", time_elements[0]) |
| time.sleep(1) |
| |
| |
| if category != "๋ชจ๋ ์นดํ
๊ณ ๋ฆฌ": |
| category_elements = driver.find_elements(By.XPATH, f"//div[contains(text(), '{category}')]") |
| if category_elements: |
| driver.execute_script("arguments[0].click();", category_elements[0]) |
| time.sleep(1) |
| |
| |
| time.sleep(3) |
| return True |
| |
| except Exception as filter_error: |
| print(f"ํํฐ ์ ์ฉ ์ค ์ค๋ฅ: {filter_error}") |
| return True |
| |
| except Exception as e: |
| print(f"ํํฐ ์ ์ฉ ์คํจ: {str(e)}") |
| return False |
|
|
| def capture_screenshot() -> Optional[Image.Image]: |
| """ํ์ฌ ํ์ด์ง์ ์คํฌ๋ฆฐ์ท ์บก์ฒ""" |
| global driver |
| |
| if not driver or not is_browser_ready: |
| return None |
| |
| try: |
| with driver_lock: |
| |
| time.sleep(2) |
| |
| |
| screenshot_data = driver.get_screenshot_as_png() |
| screenshot = Image.open(io.BytesIO(screenshot_data)) |
| |
| return screenshot |
| |
| except Exception as e: |
| print(f"์คํฌ๋ฆฐ์ท ์บก์ฒ ์คํจ: {str(e)}") |
| return None |
|
|
| def analyze_with_claude(image: Image.Image, user_prompt: str = "") -> str: |
| """Claude API๋ฅผ ์ฌ์ฉํ์ฌ ์ด๋ฏธ์ง ๋ถ์""" |
| try: |
| client = get_claude_client() |
| |
| |
| buffer = io.BytesIO() |
| image.save(buffer, format="PNG") |
| image_base64 = base64.b64encode(buffer.getvalue()).decode() |
| |
| |
| default_prompt = """ |
| ์ด Google Trends ์ค์๊ฐ ์ธ๊ธฐ ๊ฒ์์ด ์คํฌ๋ฆฐ์ท์ ๋ถ์ํด์ฃผ์ธ์. |
| |
| ๋ค์ ๋ด์ฉ์ ํฌํจํด์ ๋ถ์ํด์ฃผ์ธ์: |
| 1. ํ์ฌ ๊ฐ์ฅ ์ธ๊ธฐ ์๋ ๊ฒ์์ด๋ค๊ณผ ๊ทธ ์์ |
| 2. ๊ฐ ๊ฒ์์ด์ ์ฆ๊ฐ ์ถ์ธ๋ ํน์ด์ฌํญ |
| 3. ๊ฒ์์ด๋ค์์ ๋ฐ๊ฒฌ๋๋ ์ฃผ์ ํธ๋ ๋๋ ํจํด |
| 4. ์์ฌ์ /๋ฌธํ์ ๋งฅ๋ฝ์์์ ํด์ |
| |
| ํ๊ตญ์ด๋ก ์์ธํ๊ณ ๊ตฌ์ฒด์ ์ผ๋ก ๋ถ์ํด์ฃผ์ธ์. |
| """ |
| |
| prompt = user_prompt if user_prompt.strip() else default_prompt |
| |
| response = client.messages.create( |
| model="claude-3-5-sonnet-20241022", |
| max_tokens=1500, |
| messages=[ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "text": prompt |
| }, |
| { |
| "type": "image", |
| "source": { |
| "type": "base64", |
| "media_type": "image/png", |
| "data": image_base64 |
| } |
| } |
| ] |
| } |
| ] |
| ) |
| |
| return response.content[0].text |
| |
| except Exception as e: |
| error_msg = f"Claude API ๋ถ์ ์คํจ: {str(e)}" |
| print(error_msg) |
| return error_msg |
|
|
| def reinit_browser(): |
| """๋ธ๋ผ์ฐ์ ์ฌ์ด๊ธฐํ""" |
| global driver, is_browser_ready |
| |
| print("๋ธ๋ผ์ฐ์ ์ฌ์ด๊ธฐํ ์์...") |
| |
| |
| cleanup_browser() |
| is_browser_ready = False |
| |
| |
| init_browser() |
| |
| if is_browser_ready: |
| return "๋ธ๋ผ์ฐ์ ์ฌ์ด๊ธฐํ ์๋ฃ!" |
| else: |
| return "๋ธ๋ผ์ฐ์ ์ฌ์ด๊ธฐํ ์คํจ. ๋ค์ ์๋ํด์ฃผ์ธ์." |
|
|
| def refresh_trends_page(): |
| """Google Trends ํ์ด์ง ์๋ก๊ณ ์นจ""" |
| global driver |
| |
| if not driver or not is_browser_ready: |
| return "๋ธ๋ผ์ฐ์ ๊ฐ ์ค๋น๋์ง ์์์ต๋๋ค." |
| |
| try: |
| with driver_lock: |
| driver.refresh() |
| time.sleep(3) |
| return "ํ์ด์ง๊ฐ ์๋ก๊ณ ์นจ๋์์ต๋๋ค." |
| except Exception as e: |
| return f"ํ์ด์ง ์๋ก๊ณ ์นจ ์คํจ: {str(e)}" |
|
|
| def main_process(geo: str, time_range: str, category: str, user_prompt: str) -> Tuple[Optional[Image.Image], str]: |
| """๋ฉ์ธ ์ฒ๋ฆฌ ํจ์""" |
| if not is_browser_ready: |
| return None, "๋ธ๋ผ์ฐ์ ๊ฐ ์ค๋น๋์ง ์์์ต๋๋ค. ํ์ด์ง๋ฅผ ์๋ก๊ณ ์นจํด์ฃผ์ธ์." |
| |
| try: |
| |
| print(f"ํํฐ ์ ์ฉ ์ค: ์ง์ญ={geo}, ์๊ฐ={time_range}, ์นดํ
๊ณ ๋ฆฌ={category}") |
| filter_success = apply_filters(geo, time_range, category) |
| |
| if not filter_success: |
| return None, "ํํฐ ์ ์ฉ์ ์คํจํ์ต๋๋ค." |
| |
| |
| print("์คํฌ๋ฆฐ์ท ์บก์ฒ ์ค...") |
| screenshot = capture_screenshot() |
| |
| if screenshot is None: |
| return None, "์คํฌ๋ฆฐ์ท ์บก์ฒ์ ์คํจํ์ต๋๋ค." |
| |
| |
| print("Claude API๋ก ๋ถ์ ์ค...") |
| analysis_result = analyze_with_claude(screenshot, user_prompt) |
| |
| return screenshot, analysis_result |
| |
| except Exception as e: |
| error_msg = f"์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}" |
| print(error_msg) |
| print(traceback.format_exc()) |
| return None, error_msg |
|
|
| def check_browser_status(): |
| """๋ธ๋ผ์ฐ์ ์ํ ํ์ธ""" |
| global is_browser_ready, driver |
| |
| if not driver: |
| return "๋ธ๋ผ์ฐ์ ๊ฐ ์ด๊ธฐํ๋์ง ์์์ต๋๋ค." |
| |
| if not is_browser_ready: |
| return "๋ธ๋ผ์ฐ์ ์ด๊ธฐํ ์ค์
๋๋ค..." |
| |
| try: |
| with driver_lock: |
| current_url = driver.current_url |
| return f"๋ธ๋ผ์ฐ์ ์ค๋น ์๋ฃ - ํ์ฌ URL: {current_url}" |
| except: |
| return "๋ธ๋ผ์ฐ์ ์ฐ๊ฒฐ์ ๋ฌธ์ ๊ฐ ์์ต๋๋ค." |
|
|
| |
| def create_interface(): |
| """Gradio ์ธํฐํ์ด์ค ์์ฑ""" |
| |
| with gr.Blocks(title="Google Trends ์ค์๊ฐ ๋ถ์๊ธฐ", theme=gr.themes.Soft()) as interface: |
| gr.Markdown("# Google Trends ์ค์๊ฐ ์ธ๊ธฐ ๊ฒ์์ด ๋ถ์๊ธฐ") |
| gr.Markdown("Google Trends์ ์ค์๊ฐ ์ธ๊ธฐ ๊ฒ์์ด๋ฅผ ์บก์ฒํ๊ณ Claude AI๋ก ๋ถ์ํฉ๋๋ค.") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| geo_dropdown = gr.Dropdown( |
| choices=["KR", "US", "JP", "GB", "DE", "FR"], |
| value="KR", |
| label="์ง์ญ ์ ํ" |
| ) |
| |
| time_dropdown = gr.Dropdown( |
| choices=["24์๊ฐ", "7์ผ", "30์ผ", "90์ผ", "12๊ฐ์"], |
| value="24์๊ฐ", |
| label="๊ธฐ๊ฐ ์ ํ" |
| ) |
| |
| category_dropdown = gr.Dropdown( |
| choices=["๋ชจ๋ ์นดํ
๊ณ ๋ฆฌ", "์ํฐํ
์ธ๋จผํธ", "์คํฌ์ธ ", "๋น์ฆ๋์ค", "๊ณผํ๊ธฐ์ ", "๊ฑด๊ฐ"], |
| value="๋ชจ๋ ์นดํ
๊ณ ๋ฆฌ", |
| label="์นดํ
๊ณ ๋ฆฌ ์ ํ" |
| ) |
| |
| user_prompt = gr.Textbox( |
| label="๋ถ์ ์์ฒญ์ฌํญ (์ ํ์ฌํญ)", |
| placeholder="ํน๋ณํ ๋ถ์ํ๊ณ ์ถ์ ๋ด์ฉ์ด ์๋ค๋ฉด ์
๋ ฅํ์ธ์...", |
| lines=3 |
| ) |
| |
| analyze_btn = gr.Button("๋ถ์ ์์", variant="primary") |
| refresh_btn = gr.Button("ํ์ด์ง ์๋ก๊ณ ์นจ", variant="secondary") |
| reinit_btn = gr.Button("๋ธ๋ผ์ฐ์ ์ฌ์ด๊ธฐํ", variant="secondary") |
| status_btn = gr.Button("๋ธ๋ผ์ฐ์ ์ํ ํ์ธ", variant="secondary") |
| |
| with gr.Column(scale=2): |
| screenshot_output = gr.Image(label="Google Trends ์คํฌ๋ฆฐ์ท") |
| analysis_output = gr.Textbox( |
| label="Claude AI ๋ถ์ ๊ฒฐ๊ณผ", |
| lines=10, |
| max_lines=20 |
| ) |
| status_output = gr.Textbox(label="์ํ ๋ฉ์์ง") |
| |
| |
| analyze_btn.click( |
| fn=main_process, |
| inputs=[geo_dropdown, time_dropdown, category_dropdown, user_prompt], |
| outputs=[screenshot_output, analysis_output] |
| ) |
| |
| refresh_btn.click( |
| fn=refresh_trends_page, |
| outputs=status_output |
| ) |
| |
| reinit_btn.click( |
| fn=reinit_browser, |
| outputs=status_output |
| ) |
| |
| status_btn.click( |
| fn=check_browser_status, |
| outputs=status_output |
| ) |
| |
| return interface |
|
|
| def main(): |
| """๋ฉ์ธ ํจ์""" |
| print("=" * 50) |
| print("Google Trends ์ค์๊ฐ ๋ถ์๊ธฐ ์์!") |
| print("=" * 50) |
| |
| |
| api_key = os.getenv("ANTHROPIC_API_KEY") |
| if not api_key: |
| print("โ ANTHROPIC_API_KEY ํ๊ฒฝ๋ณ์๊ฐ ์ค์ ๋์ง ์์์ต๋๋ค.") |
| return |
| else: |
| print("โ Claude API ํค ํ์ธ๋จ") |
| |
| print("โ Gradio ์น ์ธํฐํ์ด์ค ์ค๋น ์ค...") |
| |
| |
| if os.getenv("SPACE_ID"): |
| print("โ Hugging Face Space ํ๊ฒฝ์์ ์คํ ์ค...") |
| |
| |
| init_thread = threading.Thread(target=init_browser, daemon=True) |
| init_thread.start() |
| |
| |
| atexit.register(cleanup_browser) |
| |
| |
| interface = create_interface() |
| |
| |
| interface.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False, |
| show_error=True, |
| quiet=False |
| ) |
|
|
| if __name__ == "__main__": |
| main() |