| import gradio as gr |
| import requests |
| import re |
| import os |
| import json |
| import time |
| import threading |
| from googleapiclient.discovery import build |
| from huggingface_hub import InferenceClient |
| from pytube import YouTube |
| import whisper |
| import logging |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
|
|
| |
| model = whisper.load_model("base") |
|
|
| |
| API_KEY = 'AIzaSyDUz3wkGal0ewRtPlzeMit88bV4hS4ZIVY' |
|
|
| |
| youtube = build('youtube', 'v3', developerKey=API_KEY) |
|
|
| |
| client = InferenceClient(model="meta-llama/Meta-Llama-3-70B-Instruct", token=os.getenv("HF_TOKEN")) |
|
|
| WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTZhMDYzMDA0MzA1MjZhNTUzMzUxM2Ii_pc" |
| COMMENTS_FILE = 'comments.json' |
|
|
| DEFAULT_SYSTEM_PROMPT = "λνμ λ°λμ λμ μ΄λ¦ 'GPTube'λ₯Ό λ°νλ©° νκΈλ‘ μΈμ¬λ₯ΌνλΌ. λ°λμ 'νκΈ'(νκ΅μ΄)λ‘ 250 ν ν° μ΄λ΄λ‘ λ΅λ³μ μμ±νκ³ μΆλ ₯νλΌ. Respond to the following YouTube comment in a friendly and helpful manner:" |
|
|
| stop_event = threading.Event() |
|
|
| def load_existing_comments(): |
| if os.path.exists(COMMENTS_FILE): |
| with open(COMMENTS_FILE, 'r') as file: |
| return json.load(file) |
| return [] |
|
|
| def save_comments(comments): |
| with open(COMMENTS_FILE, 'w') as file: |
| json.dump(comments, file) |
|
|
| def download_audio(video_url): |
| try: |
| yt = YouTube(video_url) |
| audio = yt.streams.filter(only_audio=True).first() |
| if audio is None: |
| logging.error('μ€λμ€ μ€νΈλ¦Όμ μ°Ύμ μ μμ΅λλ€.') |
| return None |
| audio_path = audio.download(output_path=".") |
| |
| file_stats = os.stat(audio_path) |
| logging.info(f'μ€λμ€ νμΌ ν¬κΈ°(Bytes): {file_stats.st_size}') |
| |
| if file_stats.st_size <= 30000000: |
| base, ext = os.path.splitext(audio_path) |
| new_file = base + '.mp3' |
| os.rename(audio_path, new_file) |
| return new_file |
| else: |
| logging.error('νμΌ ν¬κΈ°κ° λ무 ν½λλ€. 1.5μκ° μ΄νμ λΉλμ€λ§ μ§μλ©λλ€.') |
| return None |
| except Exception as e: |
| logging.error(f"μ€λμ€ λ€μ΄λ‘λ μ€ μ€λ₯ λ°μ: {str(e)}") |
| return None |
|
|
| def generate_transcript(audio_path): |
| try: |
| if not audio_path or not os.path.exists(audio_path): |
| raise ValueError("μ ν¨ν μ€λμ€ νμΌ κ²½λ‘κ° μλλλ€.") |
| |
| result = model.transcribe(audio_path) |
| return result['text'].strip() |
| except Exception as e: |
| logging.error(f"μ μ¬ μ€ μ€λ₯ λ°μ: {str(e)}") |
| return f"μ μ¬ μ€ μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}" |
|
|
| def generate_reply(comment_text, system_prompt): |
| prompt = f"{system_prompt}\n\nComment: {comment_text}\n\nReply:" |
| response = client.text_generation( |
| prompt=prompt, |
| max_new_tokens=250, |
| temperature=0.7, |
| top_p=0.9 |
| ) |
| if isinstance(response, dict) and 'generated_text' in response: |
| return response['generated_text'] |
| return response |
|
|
| def send_webhook(data): |
| response = requests.post(WEBHOOK_URL, json=data) |
| return response.status_code, response.text |
|
|
| def get_video_comments(video_id): |
| try: |
| comments = [] |
| request = youtube.commentThreads().list( |
| part='snippet', |
| videoId=video_id, |
| maxResults=100, |
| textFormat='plainText' |
| ) |
| response = request.execute() |
| while request is not None: |
| for item in response['items']: |
| snippet = item['snippet']['topLevelComment']['snippet'] |
| comment = { |
| 'comment_id': item['snippet']['topLevelComment']['id'], |
| 'author': snippet['authorDisplayName'], |
| 'published_at': snippet['publishedAt'], |
| 'text': snippet['textDisplay'], |
| 'reply_count': item['snippet']['totalReplyCount'] |
| } |
| comments.append(comment) |
| if 'nextPageToken' in response: |
| request = youtube.commentThreads().list( |
| part='snippet', |
| videoId=video_id, |
| pageToken=response['nextPageToken'], |
| maxResults=100, |
| textFormat='plainText' |
| ) |
| response = request.execute() |
| else: |
| break |
| return comments |
| except Exception as e: |
| return [{'error': str(e)}] |
|
|
| def fetch_comments(video_url, system_prompt): |
| log_entries = [] |
| video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url) |
| if video_id_match: |
| video_id = video_id_match.group(1) |
| audio_path = download_audio(video_url) |
| if not audio_path: |
| return "μ€λμ€λ₯Ό λ€μ΄λ‘λν μ μμ΅λλ€." |
| |
| transcript = generate_transcript(audio_path) |
| |
| existing_comments = load_existing_comments() |
| new_comments = get_video_comments(video_id) |
| |
| if not new_comments or 'error' in new_comments[0]: |
| return "λκΈμ μ°Ύμ μ μκ±°λ μ€λ₯κ° λ°μνμ΅λλ€." |
| |
| recent_new_comments = [c for c in new_comments if c['comment_id'] not in {c['comment_id'] for c in existing_comments} and c['reply_count'] == 0] |
| |
| if recent_new_comments: |
| for most_recent_comment in recent_new_comments: |
| combined_prompt = f"{transcript}\n\n{system_prompt}" |
| reply_text = generate_reply(most_recent_comment['text'], combined_prompt) |
| webhook_data = { |
| "comment_id": most_recent_comment['comment_id'], |
| "author": most_recent_comment['author'], |
| "published_at": most_recent_comment['published_at'], |
| "text": most_recent_comment['text'], |
| "reply_text": reply_text |
| } |
| webhook_status, webhook_response = send_webhook(webhook_data) |
| log_entries.append(f"μ΅κ·Ό λκΈ: {most_recent_comment['text']}\n\nλ΅λ³ μμ±: {reply_text}\n\nμΉν
μλ΅: {webhook_status} - {webhook_response}") |
| existing_comments.append(most_recent_comment) |
| save_comments(existing_comments) |
| else: |
| log_entries.append("μλ‘μ΄ λκΈμ΄ μμ΅λλ€.") |
| else: |
| log_entries.append("μ ν¨νμ§ μμ YouTube URLμ
λλ€.") |
| return "\n\n".join(log_entries) |
|
|
| def background_fetch_comments(): |
| while not stop_event.is_set(): |
| result = fetch_comments("https://www.youtube.com/watch?v=dQw4w9WgXcQ", DEFAULT_SYSTEM_PROMPT) |
| print(result) |
| time.sleep(10) |
|
|
| def start_background_fetch(): |
| threading.Thread(target=background_fetch_comments).start() |
|
|
| def stop_background_fetch(): |
| stop_event.set() |
|
|
| def get_text(video_url): |
| audio_path = download_audio(video_url) |
| if not audio_path: |
| return "μ€λμ€λ₯Ό λ€μ΄λ‘λν μ μμ΅λλ€." |
| |
| transcript = generate_transcript(audio_path) |
| return transcript |
|
|
| |
| demo = gr.Blocks() |
|
|
| with demo: |
| gr.Markdown("<h1><center>GPTube</center></h1>") |
| |
| with gr.Row(): |
| input_text_url = gr.Textbox(placeholder='YouTube video URL', label='YouTube URL') |
| input_text_prompt = gr.Textbox(placeholder='μμ€ν
ν둬ννΈ', label='μμ€ν
ν둬ννΈ', value=DEFAULT_SYSTEM_PROMPT, lines=5) |
| |
| with gr.Row(): |
| result_button_transcribe = gr.Button('Transcribe') |
| result_button_comments = gr.Button('Fetch Comments and Generate Reply') |
| |
| with gr.Row(): |
| output_text_transcribe = gr.Textbox(placeholder='Transcript of the YouTube video.', label='Transcript', lines=20) |
| output_text_prompt = gr.Textbox(placeholder='μλ΅ ν
μ€νΈ', label='μλ΅ ν
μ€νΈ', lines=20) |
| |
| result_button_transcribe.click(get_text, inputs=input_text_url, outputs=output_text_transcribe, api_name="transcribe_api") |
| result_button_comments.click(fetch_comments, inputs=[input_text_url, input_text_prompt], outputs=output_text_prompt, api_name="fetch_comments_api") |
|
|
| |
| demo.launch() |
|
|
|
|