Spaces:
Sleeping
Sleeping
| import urllib.parse | |
| import re | |
| import gradio as gr | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from starlette.responses import Response | |
| from starlette.types import ASGIApp | |
| from fastapi import status, FastAPI | |
| class BlockFileRedirectMiddleware(BaseHTTPMiddleware): | |
| def __init__(self, app: ASGIApp): | |
| super().__init__(app) | |
| async def dispatch(self, request, call_next): | |
| # URL decode the path first | |
| path = urllib.parse.unquote(request.url.path) | |
| # Check all possible file endpoint patterns | |
| if any( | |
| pattern in path | |
| for pattern in [ | |
| "/gradio_api/file=", | |
| "/gradio/api/file=", | |
| "/api/file=", | |
| "/file=", | |
| ] | |
| ): | |
| # Extract everything after file= | |
| file_part = path.split("file=", 1)[1] | |
| # Block if it's a URL (starts with http:// or https://) | |
| if file_part.lower().startswith(("http://", "https://")): | |
| return Response( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| content="Direct URL redirects are not allowed", | |
| ) | |
| return await call_next(request) | |
| import pandas as pd | |
| import requests | |
| from docx import Document | |
| import os | |
| from openai import OpenAI | |
| from groq import Groq | |
| import uuid | |
| from gtts import gTTS | |
| import math | |
| from pydub import AudioSegment | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from youtube_transcript_api.proxies import WebshareProxyConfig | |
| from youtube_transcript_api._errors import NoTranscriptFound | |
| import yt_dlp | |
| from moviepy.editor import VideoFileClip | |
| from pytube import YouTube | |
| import os | |
| import io | |
| import time | |
| import json | |
| from datetime import datetime, timezone, timedelta | |
| from urllib.parse import urlparse, parse_qs | |
| from google.cloud import storage | |
| from google.cloud import bigquery | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| from googleapiclient.http import MediaIoBaseDownload | |
| from googleapiclient.http import MediaIoBaseUpload | |
| from educational_material import EducationalMaterial | |
| from storage_service import GoogleCloudStorage | |
| from sheet_service import SheetService | |
| from google.oauth2.service_account import Credentials | |
| import vertexai | |
| from vertexai.generative_models import GenerativeModel, Part | |
| # import boto3 | |
| from chatbot import Chatbot | |
| is_env_local = os.getenv("IS_ENV_LOCAL", "false") == "true" | |
| print(f"is_env_local: {is_env_local}") | |
| print("===gr__version__===") | |
| print(gr.__version__) | |
| # KEY CONFIG | |
| if is_env_local: | |
| with open("local_config.json") as f: | |
| config = json.load(f) | |
| IS_ENV_PROD = "False" | |
| PASSWORD = config["PASSWORD"] | |
| GCS_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"]) | |
| DRIVE_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"]) | |
| GBQ_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"]) | |
| SHEET_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"]) | |
| OPEN_AI_KEY = config["OPEN_AI_KEY"] | |
| OPEN_AI_ASSISTANT_ID_GPT4_BOT1 = config["OPEN_AI_ASSISTANT_ID_GPT4_BOT1"] | |
| OPEN_AI_ASSISTANT_ID_GPT3_BOT1 = config["OPEN_AI_ASSISTANT_ID_GPT3_BOT1"] | |
| GROQ_API_KEY = config["GROQ_API_KEY"] | |
| PERPLEXITY_API_KEY = config["PERPLEXITY_API_KEY"] | |
| JUTOR_CHAT_KEY = config["JUTOR_CHAT_KEY"] | |
| AWS_ACCESS_KEY = config["AWS_ACCESS_KEY"] | |
| AWS_SECRET_KEY = config["AWS_SECRET_KEY"] | |
| AWS_REGION_NAME = config["AWS_REGION_NAME"] | |
| OUTPUT_PATH = config["OUTPUT_PATH"] | |
| PROXY_USERNAME = config["PROXY_USERNAME"] | |
| PROXY_PASSWORD = config["PROXY_PASSWORD"] | |
| else: | |
| IS_ENV_PROD = os.getenv("IS_ENV_PROD", "False") | |
| PASSWORD = os.getenv("PASSWORD") | |
| GCS_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") | |
| DRIVE_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") | |
| GBQ_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") | |
| SHEET_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") | |
| OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") | |
| OPEN_AI_ASSISTANT_ID_GPT4_BOT1 = os.getenv("OPEN_AI_ASSISTANT_ID_GPT4_BOT1") | |
| OPEN_AI_ASSISTANT_ID_GPT3_BOT1 = os.getenv("OPEN_AI_ASSISTANT_ID_GPT3_BOT1") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| PERPLEXITY_API_KEY = os.getenv("PERPLEXITY_API_KEY") | |
| JUTOR_CHAT_KEY = os.getenv("JUTOR_CHAT_KEY") | |
| AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY") | |
| AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY") | |
| AWS_REGION_NAME = 'us-west-2' | |
| OUTPUT_PATH = 'videos' | |
| PROXY_USERNAME = os.getenv("PROXY_USERNAME") | |
| PROXY_PASSWORD = os.getenv("PROXY_PASSWORD") | |
| TRANSCRIPTS = [] | |
| CURRENT_INDEX = 0 | |
| CHAT_LIMIT = 5 | |
| # Google aiplatform | |
| google_service_account_info_dict = json.loads(GBQ_KEY) | |
| GOOGPE_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"] | |
| google_creds = Credentials.from_service_account_info( | |
| google_service_account_info_dict, scopes=GOOGPE_SCOPES | |
| ) | |
| vertexai.init( | |
| project="junyiacademy", | |
| service_account=google_service_account_info_dict, | |
| credentials=google_creds, | |
| ) | |
| # CLIENTS CONFIG | |
| GBQ_CLIENT = bigquery.Client.from_service_account_info(json.loads(GBQ_KEY)) | |
| GROQ_CLIENT = Groq(api_key=GROQ_API_KEY) | |
| GCS_SERVICE = GoogleCloudStorage(GCS_KEY) | |
| GCS_CLIENT = GCS_SERVICE.client | |
| SHEET_SERVICE = SheetService(SHEET_KEY) | |
| PERPLEXITY_CLIENT = OpenAI(api_key=PERPLEXITY_API_KEY, base_url="https://api.perplexity.ai") | |
| # check open ai access | |
| def check_open_ai_access(open_ai_api_key): | |
| # set key in OpenAI client and run to check status, if it is work, return True | |
| client = OpenAI(api_key=open_ai_api_key) | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "user", "content": "This is a test."}, | |
| ], | |
| ) | |
| if response.choices[0].message.content: | |
| return True | |
| else: | |
| return False | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| return False | |
| open_ai_api_key_assistant_id_list = [ | |
| { | |
| "account":"bot1", | |
| "open_ai_api_key": OPEN_AI_KEY, | |
| "assistant_gpt4_id": OPEN_AI_ASSISTANT_ID_GPT4_BOT1, | |
| "assistant_gpt3_id": OPEN_AI_ASSISTANT_ID_GPT3_BOT1, | |
| }, | |
| ] | |
| for open_ai_api_key_assistant_id in open_ai_api_key_assistant_id_list: | |
| account = open_ai_api_key_assistant_id["account"] | |
| open_ai_api_key = open_ai_api_key_assistant_id["open_ai_api_key"] | |
| if check_open_ai_access(open_ai_api_key): | |
| OPEN_AI_CLIENT = OpenAI(api_key=open_ai_api_key) | |
| OPEN_AI_ASSISTANT_ID_GPT4 = open_ai_api_key_assistant_id["assistant_gpt4_id"] | |
| OPEN_AI_ASSISTANT_ID_GPT3 = open_ai_api_key_assistant_id["assistant_gpt3_id"] | |
| print(f"OpenAI access is OK, account: {account}") | |
| break | |
| # ้ฉ่ญ password | |
| def verify_password(password): | |
| if password == PASSWORD: | |
| return True | |
| else: | |
| raise gr.Error("ๅฏ็ขผ้ฏ่ชค") | |
| # # ====drive====ๅๅงๅ | |
| def init_drive_service(): | |
| credentials_json_string = DRIVE_KEY | |
| credentials_dict = json.loads(credentials_json_string) | |
| SCOPES = ['https://www.googleapis.com/auth/drive'] | |
| credentials = service_account.Credentials.from_service_account_info( | |
| credentials_dict, scopes=SCOPES) | |
| service = build('drive', 'v3', credentials=credentials) | |
| return service | |
| def create_folder_if_not_exists(service, folder_name, parent_id): | |
| print("ๆฃๆฅๆฏๅฆๅญๅจ็นๅฎๅ็งฐ็ๆไปถๅคน๏ผๅฆๆไธๅญๅจๅๅๅปบ") | |
| query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_id}' in parents and trashed=false" | |
| response = service.files().list(q=query, spaces='drive', fields="files(id, name)").execute() | |
| folders = response.get('files', []) | |
| if not folders: | |
| # ๆไปถๅคนไธๅญๅจ๏ผๅๅปบๆฐๆไปถๅคน | |
| file_metadata = { | |
| 'name': folder_name, | |
| 'mimeType': 'application/vnd.google-apps.folder', | |
| 'parents': [parent_id] | |
| } | |
| folder = service.files().create(body=file_metadata, fields='id').execute() | |
| return folder.get('id') | |
| else: | |
| # ๆไปถๅคนๅทฒๅญๅจ | |
| return folders[0]['id'] | |
| # ๆฃๆฅGoogle Driveไธๆฏๅฆๅญๅจๆไปถ | |
| def check_file_exists(service, folder_name, file_name): | |
| query = f"name = '{file_name}' and '{folder_name}' in parents and trashed = false" | |
| response = service.files().list(q=query).execute() | |
| files = response.get('files', []) | |
| return len(files) > 0, files[0]['id'] if files else None | |
| def upload_content_directly(service, file_name, folder_id, content): | |
| """ | |
| ็ดๆฅๅฐๅ ๅฎนไธไผ ๅฐGoogle Driveไธญ็ๆฐๆไปถใ | |
| """ | |
| if not file_name: | |
| raise ValueError("ๆไปถๅไธ่ฝไธบ็ฉบ") | |
| if not folder_id: | |
| raise ValueError("ๆไปถๅคนIDไธ่ฝไธบ็ฉบ") | |
| if content is None: # ๅ ่ฎธ็ฉบๅญ็ฌฆไธฒไธไผ ๏ผไฝไธๅ ่ฎธNone | |
| raise ValueError("ๅ ๅฎนไธ่ฝไธบ็ฉบ") | |
| file_metadata = {'name': file_name, 'parents': [folder_id]} | |
| # ไฝฟ็จio.BytesIOไธบๆๆฌๅ ๅฎนๅๅปบไธไธชๅ ๅญไธญ็ๆไปถๅฏน่ฑก | |
| try: | |
| with io.BytesIO(content.encode('utf-8')) as fh: | |
| media = MediaIoBaseUpload(fh, mimetype='text/plain', resumable=True) | |
| print("==content==") | |
| print(content) | |
| print("==content==") | |
| print("==media==") | |
| print(media) | |
| print("==media==") | |
| # ๆง่กไธไผ | |
| file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| return file.get('id') | |
| except Exception as e: | |
| print(f"ไธไผ ๆไปถๆถๅ็้่ฏฏ: {e}") | |
| raise # ้ๆฐๆๅบๅผๅธธ๏ผ่ฐ็จ่ ๅฏไปฅๆ นๆฎ้่ฆๅค็ๆๅฟฝ็ฅ | |
| def upload_file_directly(service, file_name, folder_id, file_path): | |
| # ไธๅณ .json to Google Drive | |
| file_metadata = {'name': file_name, 'parents': [folder_id]} | |
| media = MediaFileUpload(file_path, mimetype='application/json') | |
| file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| # return file.get('id') # ่ฟๅๆไปถID | |
| return True | |
| def upload_img_directly(service, file_name, folder_id, file_path): | |
| file_metadata = {'name': file_name, 'parents': [folder_id]} | |
| media = MediaFileUpload(file_path, mimetype='image/jpeg') | |
| file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| return file.get('id') # ่ฟๅๆไปถID | |
| def download_file_as_string(service, file_id): | |
| """ | |
| ไปGoogle Driveไธ่ฝฝๆไปถๅนถๅฐๅ ถไฝไธบๅญ็ฌฆไธฒ่ฟๅใ | |
| """ | |
| request = service.files().get_media(fileId=file_id) | |
| fh = io.BytesIO() | |
| downloader = MediaIoBaseDownload(fh, request) | |
| done = False | |
| while done is False: | |
| status, done = downloader.next_chunk() | |
| fh.seek(0) | |
| content = fh.read().decode('utf-8') | |
| return content | |
| def set_public_permission(service, file_id): | |
| service.permissions().create( | |
| fileId=file_id, | |
| body={"type": "anyone", "role": "reader"}, | |
| fields='id', | |
| ).execute() | |
| def update_file_on_drive(service, file_id, file_content): | |
| """ | |
| ๆดๆฐGoogle Driveไธ็ๆไปถๅ ๅฎนใ | |
| ๅๆฐ: | |
| - service: Google Drive APIๆๅกๅฎไพใ | |
| - file_id: ่ฆๆดๆฐ็ๆไปถ็IDใ | |
| - file_content: ๆฐ็ๆไปถๅ ๅฎน๏ผๅญ็ฌฆไธฒๆ ผๅผใ | |
| """ | |
| # ๅฐๆฐ็ๆไปถๅ ๅฎน่ฝฌๆขไธบๅญ่ๆต | |
| fh = io.BytesIO(file_content.encode('utf-8')) | |
| media = MediaIoBaseUpload(fh, mimetype='application/json', resumable=True) | |
| # ๆดๆฐๆไปถ | |
| updated_file = service.files().update( | |
| fileId=file_id, | |
| media_body=media | |
| ).execute() | |
| print(f"ๆไปถๅทฒๆดๆฐ๏ผๆไปถID: {updated_file['id']}") | |
| # ---- Text file ---- | |
| def process_file(password, file): | |
| verify_password(password) | |
| # ่ฏปๅๆไปถ | |
| if file.name.endswith('.csv'): | |
| df = pd.read_csv(file) | |
| text = df_to_text(df) | |
| elif file.name.endswith('.xlsx'): | |
| df = pd.read_excel(file) | |
| text = df_to_text(df) | |
| elif file.name.endswith('.docx'): | |
| text = docx_to_text(file) | |
| else: | |
| raise ValueError("Unsupported file type") | |
| df_string = df.to_string() | |
| # ๅฎ่ญ๏ผ็งป้ค@XX@็ฌฆๅท to | | |
| df_string = df_string.replace("@XX@", "|") | |
| # ๆ นๆฎไธไผ ็ๆไปถๅ ๅฎน็ๆ้ฎ้ข | |
| questions = generate_questions(df_string) | |
| summary = generate_summarise(df_string) | |
| # ่ฟๅๆ้ฎๆๆฌๅ DataFrame ๅญ็ฌฆไธฒ | |
| return questions[0] if len(questions) > 0 else "", \ | |
| questions[1] if len(questions) > 1 else "", \ | |
| questions[2] if len(questions) > 2 else "", \ | |
| summary, \ | |
| df_string | |
| def df_to_text(df): | |
| # ๅฐ DataFrame ่ฝฌๆขไธบ็บฏๆๆฌ | |
| return df.to_string() | |
| def docx_to_text(file): | |
| # ๅฐ Word ๆๆกฃ่ฝฌๆขไธบ็บฏๆๆฌ | |
| doc = Document(file) | |
| return "\n".join([para.text for para in doc.paragraphs]) | |
| # ---- YouTube link ---- | |
| def parse_time(time_str): | |
| """ๅฐๆ้ๅญ็ฌฆไธฒ 'HH:MM:SS' ๆ 'MM:SS' ่ฝๆ็บ timedelta ็ฉไปถใ""" | |
| parts = list(map(int, time_str.split(':'))) | |
| if len(parts) == 3: | |
| hours, minutes, seconds = parts | |
| elif len(parts) == 2: | |
| hours = 0 # ๆฒๆๅฐๆ้จๅๆ๏ผๅฐๅฐๆ่จญ็บ0 | |
| minutes, seconds = parts | |
| else: | |
| raise ValueError("ๆ้ๆ ผๅผไธๆญฃ็ขบ๏ผๆ็บ 'HH:MM:SS' ๆ 'MM:SS'") | |
| return timedelta(hours=hours, minutes=minutes, seconds=seconds) | |
| def format_seconds_to_time(seconds): | |
| """ๅฐ็งๆฐๆ ผๅผๅไธบ ๆถ:ๅ:็ง ็ๅฝขๅผ""" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| seconds = int(seconds % 60) | |
| return f"{hours:02}:{minutes:02}:{seconds:02}" | |
| def extract_youtube_id(url): | |
| parsed_url = urlparse(url) | |
| if "youtube.com" in parsed_url.netloc: | |
| # ๅฏนไบๆ ๅ้พๆฅ๏ผ่ง้ขIDๅจๆฅ่ฏขๅๆฐ'v'ไธญ | |
| query_params = parse_qs(parsed_url.query) | |
| return query_params.get("v")[0] if "v" in query_params else None | |
| elif "youtu.be" in parsed_url.netloc: | |
| # ๅฏนไบ็ญ้พๆฅ๏ผ่ง้ขIDๆฏ่ทฏๅพ็ไธ้จๅ | |
| return parsed_url.path.lstrip('/') | |
| else: | |
| return None | |
| def try_get_transcript(video_id, use_proxy=False): | |
| if use_proxy: | |
| proxy_config = WebshareProxyConfig( | |
| proxy_username=PROXY_USERNAME, | |
| proxy_password=PROXY_PASSWORD | |
| ) | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id, proxies=proxy_config) | |
| else: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| # ๆๅฎ่ช่จๅชๅ ้ ๅบ | |
| language_priority = ["en", "zh-TW", "zh-CN", "ja"] | |
| # ๅๅพ transcript_list ไธญๆ็่ช่จ๏ผไธฆไพ็ ง priority ๆๅบ | |
| available_languages = [t.language_code for t in transcript_list] | |
| languages = [lang for lang in language_priority if lang in available_languages] | |
| for language in languages: | |
| print("===language===") | |
| print(f"use language: {language}") | |
| print("===language===") | |
| try: | |
| if use_proxy: | |
| yt_api_transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language], proxies=proxy_config) | |
| else: | |
| yt_api_transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language]) | |
| print("===transcript===") | |
| print(yt_api_transcript) | |
| original_transcript = "" | |
| for entry in yt_api_transcript: | |
| transcript_part = (f"{entry['start']:.0f}s: {entry['text']}") | |
| print(transcript_part) | |
| original_transcript += f"{transcript_part} \n" | |
| print("===transcript===") | |
| transcript = convert_transcription_to_json(original_transcript) | |
| return transcript | |
| except NoTranscriptFound: | |
| continue | |
| raise NoTranscriptFound("No transcript found for supported languages.") | |
| def get_transcript_by_yt_api(video_id): | |
| print("====get_transcript_by_yt_api====") | |
| # ๅ ไธ็จ proxy | |
| try: | |
| print("====try_get_transcript without proxy====") | |
| return try_get_transcript(video_id, use_proxy=False) | |
| except Exception as e1: | |
| print(f"No proxy transcript error: {e1}") | |
| # ๅ็จ proxy | |
| try: | |
| print("====try_get_transcript with proxy====") | |
| return try_get_transcript(video_id, use_proxy=True) | |
| except Exception as e2: | |
| print(f"With proxy transcript error: {e2}") | |
| raise e2 | |
| def generate_transcription_by_gemini(video_id): | |
| """ไฝฟ็จ Google Gemini ็ๆๅฝฑ็้ๅญ็จฟ""" | |
| print("====generate_transcription_by_gemini====") | |
| # ๆบๅ YouTube ๅฝฑ็ URL | |
| video_url = f"https://www.youtube.com/watch?v={video_id}" | |
| model = vertexai.generative_models.GenerativeModel("gemini-2.5-flash") | |
| # ๅปบ็ซๅฝฑ็้จๅ | |
| video_part = Part.from_uri( | |
| uri=video_url, | |
| mime_type="video/*" | |
| ) | |
| # ่จญๅฎๆ็คบ่ฉ | |
| prompt = "็ตฆๆๅ ๅซๆ้่ปธ็ๅฎๆด้ๅญ็จฟ๏ผๅ ๅซๆ้่ปธ่ทๅๆๅ งๅฎน๏ผไธๅฅ่ฉฑไธ่ก" | |
| original_transcription = "" # ๅ ๅๅงๅ๏ผ้ฟๅ except referenced before assignment | |
| try: | |
| response = model.generate_content( | |
| contents=[video_part, prompt], | |
| generation_config=vertexai.generative_models.GenerationConfig( | |
| temperature=1.0, | |
| top_p=0.95, | |
| max_output_tokens=65535, | |
| candidate_count=1 | |
| ), | |
| stream=False | |
| ) | |
| original_transcription = response.candidates[0].content.parts[0].text | |
| print("===original_transcription===") | |
| print(original_transcription) | |
| print("===original_transcription===") | |
| # ่ฝๆๆ JSON ๆ ผๅผ | |
| transcript_json = convert_transcription_to_json(original_transcription) | |
| if transcript_json: | |
| return transcript_json | |
| else: | |
| raise Exception("็กๆณ่ฝๆ้ๅญ็จฟๆ ผๅผ") | |
| except Exception as e: | |
| print(f"็ๆ้ๅญ็จฟๆ็ผ็้ฏ่ชค๏ผ{str(e)}") | |
| return None | |
| def convert_transcription_to_json(original_transcription): | |
| """ | |
| ๅฐๅๅง้ๅญ็จฟ่ฝๆๆๆๅฎ็ JSON ๆ ผๅผ๏ผๆฏๆด้ทๆๆฌๅๆฎต่็ | |
| Args: | |
| original_transcription (str): ๅๅง้ๅญ็จฟๆๆฌ | |
| Returns: | |
| list: ๅ ๅซ้ๅญ็จฟๆฎต่ฝ็ๅ่กจ๏ผๆฏๅๆฎต่ฝๅ ๅซ text, start, end, duration | |
| """ | |
| if not original_transcription: | |
| print("ๅๅง้ๅญ็จฟ็บ็ฉบ") | |
| return None | |
| # ไฝฟ็จ Vertex AI ไพ่็่ฝๆ | |
| model = vertexai.generative_models.GenerativeModel("gemini-2.5-flash") | |
| # ่จญๅฎๆฏๆฎตๆๅคงๅญๆธ | |
| # ่ๆ ฎๅฐ๏ผ | |
| # 1. Gemini ่ผธๅบ้ๅถ็บ 65,535 tokens | |
| # 2. ้่ฆ้ ็็ฉบ้็ตฆ็ณป็ตฑๆ็คบ่ฉ | |
| # 3. JSON ๆ ผๅผๆๅขๅ ้กๅคๅญ็ฌฆ | |
| # 4. ไธญๆไธๅๅญ็ด็ญๆผ 2-3 ๅ tokens | |
| MAX_CHUNK_SIZE = 15000 | |
| # ๅๆฎต่็ | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| # ๆ่กๅๅฒๆๆฌ | |
| lines = original_transcription.split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # ๅฆๆ้ไธ่กๅ ๅ ฅๅพๆ่ถ ้้ๅถ๏ผๅฐฑๅ ่็็ถๅchunk | |
| if current_size + len(line) > MAX_CHUNK_SIZE and current_chunk: | |
| chunks.append('\n'.join(current_chunk)) | |
| current_chunk = [] | |
| current_size = 0 | |
| current_chunk.append(line) | |
| current_size += len(line) | |
| # ่็ๆๅพไธๅchunk | |
| if current_chunk: | |
| chunks.append('\n'.join(current_chunk)) | |
| # ็จๆผๅฒๅญๆๆ่็็ตๆ | |
| all_results = [] | |
| # ่็ๆฏๅchunk | |
| for i, chunk in enumerate(chunks): | |
| print(f"===chunk: {i+1}===") | |
| prompt = f""" | |
| ่ซๅฐไปฅไธ้ๅญ็จฟ่ฝๆๆ JSON ๆ ผๅผ: | |
| {chunk} | |
| ่ฝๆ่ฆๅ: | |
| 1. ๆฏๅๆฎต่ฝ้ๅ ๅซ text, start, end, duration, ้ๅพ้่ฆ๏ผ | |
| 2. ๆ้ๆ ผๅผ้่ฝๆ็บ็งๆธ(ไพๅฆ 1:02 ่ฝ็บ 62 ็ง)๏ผstart ๅ end ็ๆ ผๅผ่ฆไธๆจฃ๏ผ้ฝ่ฆๆดๆธ๏ผไธ่ฆๆๅฐๆธ้ป | |
| 3. duration ็บ end - start ็ๅทฎๅผ | |
| 4. ๅๅณๆ ผๅผ็บ JSON array | |
| 5. ๅ็็ๅไฝตๅฅๅญ๏ผไธ่ฆๆไธๅ็็ๆทๅฅ๏ผไธๅฅ่ฉฑ่ณๅฐ่ฆๆๅฎๆด็ไธป่ฉใ่ฌ่ฉ | |
| 6. ๆฏๅฅ่ฉฑ็ก้ๅจ 10~15 ๅๅญๅทฆๅณ๏ผไฝ่ฆไปฅๅฎๆด่ชๆ็บไธป | |
| 7. ๅฆๆ้ๅฐ [Music] [Laughter] [Crowd] [Cheering] [Applause]้้ก็ๆจ่จ๏ผๅฏไปฅ็ดๆฅๅฟฝ็ฅไธ่จ | |
| 8. ้ๆฏ็ฌฌ {i+1}/{len(chunks)} ๆฎต๏ผ่ซ็ขบไฟๆ้่ปธ็้ฃ็บๆง | |
| ่ซ็ดๆฅ่ฟๅ JSON ๆ ผๅผ๏ผไธ่ฆๅ ๅ ฅไปปไฝ่ชชๆๆๅญๆ markdown ๆจ่จใ | |
| """ | |
| try: | |
| response = model.generate_content(prompt) | |
| json_str = response.text | |
| print(f"===json_str for chunk {i+1}===") | |
| print(json_str) | |
| print(f"===json_str for chunk {i+1}===") | |
| # ็งป้คๅฏ่ฝ็ markdown ๆจ่จ | |
| json_str = json_str.replace("```json", "").replace("```", "").strip() | |
| # ่งฃๆ JSON | |
| chunk_result = json.loads(json_str) | |
| # ้ฉ่ญๆ ผๅผ | |
| for entry in chunk_result: | |
| if not all(k in entry for k in ["text", "start", "end", "duration"]): | |
| raise ValueError(f"JSON ๆ ผๅผ้ฏ่ชค๏ผ็ผบๅฐๅฟ ่ฆๆฌไฝ๏ผๅจ็ฌฌ {i+1} ๆฎต") | |
| all_results.extend(chunk_result) | |
| except Exception as e: | |
| print(f"่็็ฌฌ {i+1} ๆฎตๆ็ผ็้ฏ่ชค๏ผ{str(e)}") | |
| continue | |
| # ๅฆๆๆฒๆไปปไฝๆๆ็ตๆ๏ผ่ฟๅ None | |
| if not all_results: | |
| return None | |
| # ๆๆ้ๆๅบ | |
| all_results.sort(key=lambda x: x["start"]) | |
| return all_results | |
| def generate_transcription_by_whisper(video_id): | |
| print("====generate_transcription_by_whisper====") | |
| youtube_url = f'https://www.youtube.com/watch?v={video_id}' | |
| codec_name = "mp3" | |
| outtmpl = f"{OUTPUT_PATH}/{video_id}.%(ext)s" | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': codec_name, | |
| 'preferredquality': '192' | |
| }], | |
| 'outtmpl': outtmpl, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([youtube_url]) | |
| audio_path = f"{OUTPUT_PATH}/{video_id}.{codec_name}" | |
| full_audio = AudioSegment.from_mp3(audio_path) | |
| max_part_duration = 10 * 60 * 1000 # 10 minutes | |
| full_duration = len(full_audio) # in milliseconds | |
| parts = math.ceil(full_duration / max_part_duration) | |
| print(f"parts: {parts}") | |
| transcription = [] | |
| for i in range(parts): | |
| print(f"== i: {i}==") | |
| start_time = i * max_part_duration | |
| end_time = min((i + 1) * max_part_duration, full_duration) | |
| print(f"time: {start_time/1000} - {end_time/1000}") | |
| chunk = full_audio[start_time:end_time] | |
| chunk_path = f"{OUTPUT_PATH}/{video_id}_part_{i}.{codec_name}" | |
| chunk.export(chunk_path, format=codec_name) | |
| try: | |
| with open(chunk_path, "rb") as chunk_file: | |
| response = OPEN_AI_CLIENT.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=chunk_file, | |
| response_format="verbose_json", | |
| timestamp_granularities=["segment"], | |
| prompt="Transcribe the following audio file. if content is chinese, please using 'language: zh-TW' ", | |
| ) | |
| # Adjusting the timestamps for the chunk based on its position in the full audio | |
| adjusted_segments = [{ | |
| 'text': segment.text, # ไฝฟ็จ .text ๅฑฌๆง่ไธๆฏ ['text'] | |
| 'start': math.ceil(segment.start + start_time / 1000.0), # Converting milliseconds to seconds | |
| 'end': math.ceil(segment.end + start_time / 1000.0), | |
| 'duration': math.ceil(segment.end - segment.start) | |
| } for segment in response.segments] | |
| transcription.extend(adjusted_segments) | |
| except Exception as e: | |
| print(f"Error processing chunk {i}: {str(e)}") | |
| # Remove temporary chunk files after processing | |
| os.remove(chunk_path) | |
| return transcription | |
| def get_video_duration(video_id): | |
| yt = YouTube(f'https://www.youtube.com/watch?v={video_id}') | |
| try: | |
| video_duration = yt.length | |
| except: | |
| video_duration = None | |
| print(f"video_duration: {video_duration}") | |
| return video_duration | |
| def process_transcript_and_screenshots_on_gcs(video_id): | |
| print("====process_transcript_and_screenshots_on_gcs====") | |
| transcript, exists = get_transcript_from_gcs(video_id) | |
| if not exists: | |
| print("==== video transcript is not exists ====") | |
| try: | |
| transcript = get_transcript_by_yt_api(video_id) | |
| # transcript = generate_transcription_by_gemini(video_id) | |
| except Exception as e: | |
| print(f" Error generating transcription: {str(e)}") | |
| transcript = generate_transcription_by_gemini(video_id) | |
| # transcript = generate_transcription_by_whisper(video_id) | |
| upload_transcript_to_gcs(video_id, transcript) | |
| # ่็ๆชๅ | |
| print("====่็ๆชๅ====") | |
| is_new_transcript = False | |
| has_tried_download_video = False | |
| for entry in transcript: | |
| if 'img_file_id' not in entry: | |
| # ๆชขๆฅ OUTPUT_PATH ๆฏๅฆๅญๅจ video_id.mp4 | |
| video_path = f'{OUTPUT_PATH}/{video_id}.mp4' | |
| # ๆฒๆๅฝฑ็ไปฅๅๆฒๆไธ่ผ้ | |
| if not os.path.exists(video_path) and not has_tried_download_video: | |
| try: | |
| download_youtube_video(video_id) | |
| except Exception as e: | |
| has_tried_download_video = True | |
| print(f"ไธ่ฝฝ่ง้ขๅคฑ่ดฅ: {str(e)}") | |
| if os.path.exists(video_path): | |
| try: | |
| screenshot_path = screenshot_youtube_video(video_id, entry['start']) | |
| screenshot_blob_name = f"{video_id}/{video_id}_{entry['start']}.jpg" | |
| img_file_id = GCS_SERVICE.upload_image_and_get_public_url('video_ai_assistant', screenshot_blob_name, screenshot_path) | |
| entry['img_file_id'] = img_file_id | |
| print(f"ๆชๅพๅทฒไธไผ ๅฐGCS: {img_file_id}") | |
| is_new_transcript = True | |
| except Exception as e: | |
| print(f"Error processing screenshot: {str(e)}") | |
| # ๅฆๆๅฝฑ็ๆไธ่ผๆๅ๏ผไฝๆฏๆชๅๅคฑๆ๏ผๅๅฐ img_file_id ่จญ็บ็ฉบๅญไธฒ | |
| entry['img_file_id'] = "" | |
| print(f"ๆชๅ็ฉบ็ฝ") | |
| is_new_transcript = True | |
| else: | |
| entry['img_file_id'] = "" | |
| print(f"ๆชๅ็ฉบ็ฝ") | |
| is_new_transcript = True | |
| if is_new_transcript: | |
| print("===ๆดๆฐ้ๅญ็จฟๆไปถ===") | |
| upload_transcript_to_gcs(video_id, transcript) | |
| return transcript | |
| def get_transcript(video_id): | |
| print("====get_transcript====") | |
| transcript, exists = get_transcript_from_gcs(video_id) | |
| if not exists: | |
| raise gr.Error("้ๅญ็จฟๆไปถไธๅญๅจๆผGCSไธญใ") | |
| if any('img_file_id' not in entry for entry in transcript): | |
| raise gr.Error("Some entries in the transcript do not have an associated img_file_id.") | |
| print("Transcript is verified with all necessary images.") | |
| return transcript | |
| def get_transcript_from_gcs(video_id): | |
| print("Checking for transcript in GCS...") | |
| bucket_name = 'video_ai_assistant' | |
| transcript_file_name = f'{video_id}_transcript.json' | |
| transcript_blob_name = f"{video_id}/{transcript_file_name}" | |
| # Check if the transcript exists in GCS | |
| is_transcript_exists = GCS_SERVICE.check_file_exists(bucket_name, transcript_blob_name) | |
| if is_transcript_exists: | |
| # Download the transcript if it exists | |
| transcript_text = GCS_SERVICE.download_as_string(bucket_name, transcript_blob_name) | |
| return json.loads(transcript_text), True | |
| else: | |
| print("No transcript found for video ID:", video_id) | |
| return None, False | |
| def upload_transcript_to_gcs(video_id, transcript): | |
| print("Uploading updated transcript to GCS...") | |
| bucket_name = 'video_ai_assistant' | |
| transcript_file_name = f'{video_id}_transcript.json' | |
| transcript_blob_name = f"{video_id}/{transcript_file_name}" | |
| transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, transcript_blob_name, transcript_text) | |
| print("Transcript uploaded successfully.") | |
| def process_youtube_link(password, link, LLM_model=None): | |
| verify_password(password) | |
| video_id = extract_youtube_id(link) | |
| try: | |
| if IS_ENV_PROD == "True": | |
| transcript = get_transcript(video_id) | |
| else: | |
| transcript = process_transcript_and_screenshots_on_gcs(video_id) | |
| except Exception as e: | |
| error_msg = f" {video_id} ้ๅญ็จฟ้ฏ่ชค: {str(e)}" | |
| print("===process_youtube_link error===") | |
| print(error_msg) | |
| raise gr.Error(error_msg) | |
| original_transcript = json.dumps(transcript, ensure_ascii=False, indent=2) | |
| formatted_transcript = [] | |
| formatted_simple_transcript =[] | |
| for entry in transcript: | |
| start_time = format_seconds_to_time(entry['start']) | |
| end_time = format_seconds_to_time(entry['start'] + entry['duration']) | |
| embed_url = get_embedded_youtube_link(video_id, entry['start']) | |
| img_file_id = entry['img_file_id'] | |
| screenshot_path = img_file_id | |
| line = { | |
| "start_time": start_time, | |
| "end_time": end_time, | |
| "text": entry['text'], | |
| "embed_url": embed_url, | |
| "screenshot_path": screenshot_path | |
| } | |
| formatted_transcript.append(line) | |
| # formatted_simple_transcript ๅช่ฆ start_time, end_time, text | |
| simple_line = { | |
| "start_time": start_time, | |
| "end_time": end_time, | |
| "text": entry['text'] | |
| } | |
| formatted_simple_transcript.append(simple_line) | |
| # ๅบไบ้ๅญ็จฟ็ๆๅ ถไปๆ้็่พๅบ | |
| source = "gcs" | |
| questions_answers = get_questions_answers(video_id, formatted_simple_transcript, source, LLM_model) | |
| questions_answers_json = json.dumps(questions_answers, ensure_ascii=False, indent=2) | |
| summary_json = get_video_id_summary(video_id, formatted_simple_transcript, source, LLM_model) | |
| summary_text = summary_json["summary"] | |
| summary = summary_json["summary"] | |
| key_moments_json = get_key_moments(video_id, formatted_simple_transcript, formatted_transcript, source, LLM_model) | |
| key_moments = key_moments_json["key_moments"] | |
| key_moments_text = json.dumps(key_moments, ensure_ascii=False, indent=2) | |
| key_moments_html = get_key_moments_html(key_moments) | |
| html_content = format_transcript_to_html(formatted_transcript) | |
| simple_html_content = format_simple_transcript_to_html(formatted_simple_transcript) | |
| mind_map_json = get_mind_map(video_id, formatted_simple_transcript, source, LLM_model) | |
| mind_map = mind_map_json["mind_map"] | |
| mind_map_html = get_mind_map_html(mind_map) | |
| reading_passage_json = get_reading_passage(video_id, formatted_simple_transcript, source, LLM_model) | |
| reading_passage_text = reading_passage_json["reading_passage"] | |
| reading_passage = reading_passage_json["reading_passage"] | |
| meta_data = get_meta_data(video_id) | |
| subject = meta_data["subject"] | |
| grade = meta_data["grade"] | |
| # ็กฎไฟ่ฟๅไธ UI ็ปไปถ้ขๆๅน้ ็่พๅบ | |
| return video_id, \ | |
| questions_answers_json, \ | |
| original_transcript, \ | |
| summary_text, \ | |
| summary, \ | |
| key_moments_text, \ | |
| key_moments_html, \ | |
| mind_map, \ | |
| mind_map_html, \ | |
| html_content, \ | |
| simple_html_content, \ | |
| reading_passage_text, \ | |
| reading_passage, \ | |
| subject, \ | |
| grade | |
| def process_junyi_link_to_youtube_link(junyi_link): | |
| print("====process_junyi_link_to_youtube_link====") | |
| print(junyi_link) | |
| # ่พจ่ญ /v/{youtube_id} | |
| import re | |
| match = re.search(r"/v/([\w-]+)", junyi_link) | |
| if match: | |
| youtube_id = match.group(1) | |
| youtube_link = f"https://www.youtube.com/watch?v={youtube_id}" | |
| return youtube_link | |
| else: | |
| return junyi_link | |
| def create_formatted_simple_transcript(transcript): | |
| formatted_simple_transcript = [] | |
| for entry in transcript: | |
| start_time = format_seconds_to_time(entry['start']) | |
| end_time = format_seconds_to_time(entry['start'] + entry['duration']) | |
| line = { | |
| "start_time": start_time, | |
| "end_time": end_time, | |
| "text": entry['text'] | |
| } | |
| formatted_simple_transcript.append(line) | |
| return formatted_simple_transcript | |
| def create_formatted_transcript(video_id, transcript): | |
| formatted_transcript = [] | |
| for entry in transcript: | |
| start_time = format_seconds_to_time(entry['start']) | |
| end_time = format_seconds_to_time(entry['start'] + entry['duration']) | |
| embed_url = get_embedded_youtube_link(video_id, entry['start']) | |
| img_file_id = entry['img_file_id'] | |
| screenshot_path = img_file_id | |
| line = { | |
| "start_time": start_time, | |
| "end_time": end_time, | |
| "text": entry['text'], | |
| "embed_url": embed_url, | |
| "screenshot_path": screenshot_path | |
| } | |
| formatted_transcript.append(line) | |
| return formatted_transcript | |
| def format_transcript_to_html(formatted_transcript): | |
| html_content = "" | |
| for entry in formatted_transcript: | |
| html_content += f"<h3>{entry['start_time']} - {entry['end_time']}</h3>" | |
| html_content += f"<p>{entry['text']}</p>" | |
| html_content += f"<img src='{entry['screenshot_path']}' width='500px' />" | |
| return html_content | |
| def format_simple_transcript_to_html(formatted_transcript): | |
| html_content = "" | |
| for entry in formatted_transcript: | |
| html_content += f"<h3>{entry['start_time']} - {entry['end_time']}</h3>" | |
| html_content += f"<p>{entry['text']}</p>" | |
| return html_content | |
| def get_embedded_youtube_link(video_id, start_time): | |
| int_start_time = int(start_time) | |
| embed_url = f"https://www.youtube.com/embed/{video_id}?start={int_start_time}&autoplay=1" | |
| return embed_url | |
| def download_youtube_video(youtube_id, output_path=OUTPUT_PATH): | |
| # Construct the full YouTube URL | |
| youtube_url = f'https://www.youtube.com/watch?v={youtube_id}' | |
| # Create the output directory if it doesn't exist | |
| if not os.path.exists(output_path): | |
| os.makedirs(output_path) | |
| # Download the video | |
| try: | |
| yt = YouTube(youtube_url) | |
| video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() | |
| video_stream.download(output_path=output_path, filename=youtube_id+".mp4") | |
| print(f"[Pytube] Video downloaded successfully: {output_path}/{youtube_id}.mp4") | |
| except Exception as e: | |
| ydl_opts = { | |
| 'format': "bestvideo[height<=720][ext=mp4]", | |
| 'outtmpl': os.path.join(output_path, f'{youtube_id}.mp4'), # Output filename template | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([youtube_url]) | |
| print(f"[yt_dlp] Video downloaded successfully: {output_path}/{youtube_id}.mp4") | |
| def screenshot_youtube_video(youtube_id, snapshot_sec): | |
| video_path = f'{OUTPUT_PATH}/{youtube_id}.mp4' | |
| file_name = f"{youtube_id}_{snapshot_sec}.jpg" | |
| with VideoFileClip(video_path) as video: | |
| screenshot_path = f'{OUTPUT_PATH}/{file_name}' | |
| video.save_frame(screenshot_path, snapshot_sec) | |
| return screenshot_path | |
| # ---- Web ---- | |
| # def process_web_link(link): | |
| # # ๆๅๅ่งฃๆ็ฝ้กตๅ ๅฎน | |
| # response = requests.get(link) | |
| # soup = BeautifulSoup(response.content, 'html.parser') | |
| # return soup.get_text() | |
| # ---- LLM Generator ---- | |
| def split_data(df_string, word_base=100000): | |
| """Split the JSON string based on a character length base and then chunk the parsed JSON array.""" | |
| if isinstance(df_string, str): | |
| data_str_cnt = len(df_string) | |
| data = json.loads(df_string) | |
| else: | |
| data_str_cnt = len(str(df_string)) | |
| data = df_string | |
| # Calculate the number of parts based on the length of the string | |
| n_parts = data_str_cnt // word_base + (1 if data_str_cnt % word_base != 0 else 0) | |
| print(f"Number of Parts: {n_parts}") | |
| # Calculate the number of elements each part should have | |
| part_size = len(data) // n_parts if n_parts > 0 else len(data) | |
| segments = [] | |
| for i in range(n_parts): | |
| start_idx = i * part_size | |
| end_idx = min((i + 1) * part_size, len(data)) | |
| # Serialize the segment back to a JSON string | |
| segment = json.dumps(data[start_idx:end_idx]).encode('utf-8').decode('unicode_escape') | |
| segments.append(segment) | |
| return segments | |
| def generate_content_by_open_ai(sys_content, user_content, response_format=None, model_name=None): | |
| print("generate_content_by_open_ai") | |
| if model_name == "gpt-4-turbo": | |
| model = "gpt-4-turbo" | |
| else: | |
| model = "gpt-4o" | |
| print(f"LLM model: {model}") | |
| messages = [ | |
| {"role": "system", "content": sys_content}, | |
| {"role": "user", "content": user_content} | |
| ] | |
| request_payload = { | |
| "model": model, | |
| "messages": messages, | |
| "max_tokens": 4000, | |
| } | |
| if response_format is not None: | |
| request_payload["response_format"] = response_format | |
| response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) | |
| content = response.choices[0].message.content.strip() | |
| return content | |
| # def generate_content_by_bedrock(sys_content, user_content): | |
| # print("LLM using REDROCK") | |
| # messages = [ | |
| # {"role": "user", "content": user_content +"(ๅฆๆๆฏ JSON ๆ ผๅผ๏ผvalue ็ๅผ่๏ผ่ซ็จๅฎๅผ่๏ผๆๆฏ็จๅๆ็ท๏ผ้ๅผ่๏ผ้ฟๅ JSON Decoder error )"} | |
| # ] | |
| # model_id = "anthropic.claude-3-sonnet-20240229-v1:0" | |
| # print(f"model_id: {model_id}") | |
| # # model_id = "anthropic.claude-3-haiku-20240307-v1:0" | |
| # kwargs = { | |
| # "modelId": model_id, | |
| # "contentType": "application/json", | |
| # "accept": "application/json", | |
| # "body": json.dumps({ | |
| # "anthropic_version": "bedrock-2023-05-31", | |
| # "max_tokens": 4000, | |
| # "system": sys_content, | |
| # "messages": messages | |
| # }) | |
| # } | |
| # response = BEDROCK_CLIENT.invoke_model(**kwargs) | |
| # response_body = json.loads(response.get('body').read()) | |
| # content = response_body.get('content')[0].get('text') | |
| # return content | |
| def generate_content_by_gemini(sys_content, user_content, response_format=None, model_name=None): | |
| print("generate_content_by_gemini") | |
| print(f"LLM using: {model_name}") | |
| gemini_model = GenerativeModel(model_name=model_name) | |
| model_response = gemini_model.generate_content( | |
| f"{sys_content}, {user_content}" | |
| ) | |
| content = model_response.candidates[0].content.parts[0].text | |
| return content | |
| def generate_content_by_LLM(sys_content, user_content, response_format=None, LLM_model=None, model_name=None): | |
| # ไฝฟ็จ OpenAI ็ๆๅบไบไธไผ ๆฐๆฎ็้ฎ้ข | |
| if LLM_model in ["gemini-1.5-pro","gemini-1.5-flash"]: | |
| print(f"LLM: {LLM_model}") | |
| model_name = LLM_model | |
| content = generate_content_by_gemini(sys_content, user_content, response_format, model_name=model_name) | |
| # elif LLM_model == "anthropic-claude-3-sonnet": | |
| # print(f"LLM: {LLM_model}") | |
| # content = generate_content_by_bedrock(sys_content, user_content) | |
| else: | |
| print(f"LLM: {LLM_model}") | |
| print(f"model_name: {model_name}") | |
| content = generate_content_by_open_ai(sys_content, user_content, response_format, model_name=model_name) | |
| print("=====content=====") | |
| print(content) | |
| print("=====content=====") | |
| return content | |
| def get_reading_passage(video_id, df_string, source, LLM_model=None): | |
| if source == "gcs": | |
| print("===get_reading_passage on gcs===") | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_reading_passage_latex.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅ reading_passage ๆฏๅฆๅญๅจ | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_file_exists: | |
| reading_passage = generate_reading_passage(df_string, LLM_model) | |
| reading_passage_json = {"reading_passage": str(reading_passage)} | |
| reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, reading_passage_text) | |
| print("reading_passageๅทฒไธไผ ๅฐGCS") | |
| else: | |
| # reading_passageๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("reading_passageๅทฒๅญๅจไบGCSไธญ") | |
| reading_passage_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| reading_passage_json = json.loads(reading_passage_text) | |
| elif source == "drive": | |
| print("===get_reading_passage on drive===") | |
| service = init_drive_service() | |
| parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' | |
| folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) | |
| file_name = f'{video_id}_reading_passage.json' | |
| # ๆฃๆฅ reading_passage ๆฏๅฆๅญๅจ | |
| exists, file_id = check_file_exists(service, folder_id, file_name) | |
| if not exists: | |
| reading_passage = generate_reading_passage(df_string) | |
| reading_passage_json = {"reading_passage": str(reading_passage)} | |
| reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2) | |
| upload_content_directly(service, file_name, folder_id, reading_passage_text) | |
| print("reading_passageๅทฒไธๅณๅฐGoogle Drive") | |
| else: | |
| # reading_passageๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("reading_passageๅทฒๅญๅจไบGoogle Driveไธญ") | |
| reading_passage_text = download_file_as_string(service, file_id) | |
| return reading_passage_json | |
| def generate_reading_passage(df_string, LLM_model=None): | |
| print("===generate_reading_passage 0===") | |
| print(df_string) | |
| segments = split_data(df_string, word_base=100000) | |
| all_content = [] | |
| model_name = "gpt-4-turbo" | |
| # model_name = "gpt-4o" | |
| for segment in segments: | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| # ๆๆฌ {segment} | |
| # rules: | |
| - ๆ นๆๆๆฌ๏ผๆๅ้้ป | |
| - ๅป้คไบบ้ก่ฌ่ชฒๆๅฃ่ช็ๅ็ญๅฅ๏ผ้ๆฐๆ่งฃๆๆ็ซ ๏ผๅปบ็ซ้ฉๅ้ฑ่ฎ่ชๅฅ้้ ็ Reading Passage | |
| - ๅช้่ฆๅฐๆณจๆไพ Reading Passage๏ผๅญๆธๅจ 500 ๅญไปฅๅ ง | |
| - ๆ่ฟฐไธญ๏ผ่ซๆๆธๅญธๆๆฏๅฐๆฅญ่ก่ช๏ผ็จ Latex ๅ ่ฆ๏ผ$...$๏ผ | |
| - ๅ ๆธไน้คใๆ น่ใๆฌกๆน็ญ็ญ็้็ฎๅผๅฃ่ชไนๆๆ LATEX ๆธๅญธ็ฌฆ่ | |
| # restrictions: | |
| - ่ซไธๅฎ่ฆไฝฟ็จ็น้ซไธญๆ zh-TW๏ผ้ๅพ้่ฆ | |
| - ็ข็็็ตๆไธ่ฆๅๅพๆ่งฃ้๏ผไนไธ่ฆๆ่ฟฐ้็ฏๆ็ซ ๆ้บผ็ข็็ | |
| - ่ซ็ดๆฅ็ตฆๅบๆ็ซ ๏ผไธ็จไป็ดนๆ้บผ่็็ๆๆฏๆ็ซ ๅญๆธ็ญ็ญ | |
| - ๅญๆธๅจ 500 ๅญไปฅๅ ง | |
| """ | |
| print("======user_content 0 ===") | |
| print(user_content) | |
| content = generate_content_by_LLM(sys_content, user_content, response_format=None, LLM_model=LLM_model, model_name=model_name) | |
| all_content.append(content + "\n") | |
| # ๅฐๆๆ็ๆ็้ฑ่ฎ็่งฃๆฎต่ฝๅไฝตๆไธๅๅฎๆด็ๆ็ซ | |
| final_content = "\n".join(all_content) | |
| return final_content | |
| def text_to_speech(video_id, text): | |
| tts = gTTS(text, lang='en') | |
| filename = f'{video_id}_reading_passage.mp3' | |
| tts.save(filename) | |
| return filename | |
| def get_mind_map(video_id, df_string, source, LLM_model=None): | |
| if source == "gcs": | |
| print("===get_mind_map on gcs===") | |
| gcs_client = GCS_CLIENT | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_mind_map.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_file_exists: | |
| mind_map = generate_mind_map(df_string, LLM_model) | |
| mind_map_json = {"mind_map": str(mind_map)} | |
| mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, mind_map_text) | |
| print("mind_mapๅทฒไธๅณๅฐGCS") | |
| else: | |
| # mindmapๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("mind_mapๅทฒๅญๅจไบGCSไธญ") | |
| mind_map_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| mind_map_json = json.loads(mind_map_text) | |
| elif source == "drive": | |
| print("===get_mind_map on drive===") | |
| service = init_drive_service() | |
| parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' | |
| folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) | |
| file_name = f'{video_id}_mind_map.json' | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| exists, file_id = check_file_exists(service, folder_id, file_name) | |
| if not exists: | |
| mind_map = generate_mind_map(df_string, LLM_model) | |
| mind_map_json = {"mind_map": str(mind_map)} | |
| mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2) | |
| upload_content_directly(service, file_name, folder_id, mind_map_text) | |
| print("mind_mapๅทฒไธๅณๅฐGoogle Drive") | |
| else: | |
| # mindmapๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("mind_mapๅทฒๅญๅจไบGoogle Driveไธญ") | |
| mind_map_text = download_file_as_string(service, file_id) | |
| mind_map_json = json.loads(mind_map_text) | |
| return mind_map_json | |
| def generate_mind_map(df_string, LLM_model=None): | |
| print("===generate_mind_map===") | |
| segments = split_data(df_string, word_base=100000) | |
| all_content = [] | |
| for segment in segments: | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| ่ซๆ นๆ {segment} ๆๆฌๅปบ็ซ markdown ๅฟๆบๅ | |
| ๆณจๆ๏ผไธ้่ฆๅๅพๆๆ่ฟฐ๏ผ็ดๆฅ็ตฆๅบ markdown ๆๆฌๅณๅฏ | |
| ้ๅฐๆๅพ้่ฆ | |
| """ | |
| content = generate_content_by_LLM(sys_content, user_content, response_format=None, LLM_model=LLM_model, model_name=None) | |
| all_content.append(content + "\n") | |
| # ๅฐๆๆ็ๆ็้ฑ่ฎ็่งฃๆฎต่ฝๅไฝตๆไธๅๅฎๆด็ๆ็ซ | |
| final_content = "\n".join(all_content) | |
| return final_content | |
| def get_mind_map_html(mind_map): | |
| mind_map_markdown = mind_map.replace("```markdown", "").replace("```", "") | |
| mind_map_html = f""" | |
| <div class="markmap"> | |
| <script type="text/template"> | |
| {mind_map_markdown} | |
| </script> | |
| </div> | |
| """ | |
| return mind_map_html | |
| def get_video_id_summary(video_id, df_string, source, LLM_model=None): | |
| if source == "gcs": | |
| print("===get_video_id_summary on gcs===") | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_summary_markdown.json' | |
| summary_file_blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅ summary_file ๆฏๅฆๅญๅจ | |
| is_summary_file_exists = GCS_SERVICE.check_file_exists(bucket_name, summary_file_blob_name) | |
| if not is_summary_file_exists: | |
| meta_data = get_meta_data(video_id) | |
| summary = generate_summarise(df_string, meta_data, LLM_model) | |
| summary_json = {"summary": str(summary)} | |
| summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, summary_file_blob_name, summary_text) | |
| print("summaryๅทฒไธไผ ๅฐGCS") | |
| else: | |
| # summaryๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("summaryๅทฒๅญๅจไบGCSไธญ") | |
| summary_text = GCS_SERVICE.download_as_string(bucket_name, summary_file_blob_name) | |
| summary_json = json.loads(summary_text) | |
| elif source == "drive": | |
| print("===get_video_id_summary===") | |
| service = init_drive_service() | |
| parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' | |
| folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) | |
| file_name = f'{video_id}_summary.json' | |
| # ๆฃๆฅ้ๅญ็จฟๆฏๅฆๅญๅจ | |
| exists, file_id = check_file_exists(service, folder_id, file_name) | |
| if not exists: | |
| meta_data = get_meta_data(video_id) | |
| summary = generate_summarise(df_string, meta_data, LLM_model) | |
| summary_json = {"summary": str(summary)} | |
| summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2) | |
| try: | |
| upload_content_directly(service, file_name, folder_id, summary_text) | |
| print("summaryๅทฒไธๅณๅฐGoogle Drive") | |
| except Exception as e: | |
| error_msg = f" {video_id} ๆ่ฆ้ฏ่ชค: {str(e)}" | |
| print("===get_video_id_summary error===") | |
| print(error_msg) | |
| print("===get_video_id_summary error===") | |
| else: | |
| # ้ๅญ็จฟๅทฒๅญๅจ๏ผไธ่ฝฝ้ๅญ็จฟๅ ๅฎน | |
| print("summaryๅทฒๅญๅจGoogle Driveไธญ") | |
| summary_text = download_file_as_string(service, file_id) | |
| summary_json = json.loads(summary_text) | |
| return summary_json | |
| def generate_summarise(df_string, metadata=None, LLM_model=None): | |
| print("===generate_summarise===") | |
| # ไฝฟ็จ OpenAI ็ๆๅบไบไธไผ ๆฐๆฎ็้ฎ้ข | |
| if metadata: | |
| title = metadata.get("title", "") | |
| subject = metadata.get("subject", "") | |
| grade = metadata.get("grade", "") | |
| else: | |
| title = "" | |
| subject = "" | |
| grade = "" | |
| segments = split_data(df_string, word_base=100000) | |
| all_content = [] | |
| for segment in segments: | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| ่ชฒ็จๅ็จฑ๏ผ{title} | |
| ็ง็ฎ๏ผ{subject} | |
| ๅนด็ด๏ผ{grade} | |
| ่ซๆ นๆๅ งๆ๏ผ {segment} | |
| ๆ ผๅผ็บ Markdown | |
| ๅฆๆๆ่ชฒ็จๅ็จฑ๏ผ่ซๅ็นใ่ชฒ็จๅ็จฑใ็บๅญธ็ฟ้้ป๏ผ้ฒ่ก้้ปๆด็๏ผไธ่ฆๆด็่ทๆ ๅขๆ ไบ็ธ้็ๅ้ก | |
| ๆด้ซๆ่ฆๅจไธ็พๅญไปฅๅ ง | |
| ้้ปๆฆๅฟตๅๅบ bullet points๏ผ่ณๅฐไธๅ๏ผๆๅคไบๅ | |
| ไปฅๅๅฏ่ฝ็็ต่ซ่็ตๅฐพๅปถไผธๅฐๅ้กๆไพๅญธ็ไฝๅๆ | |
| ๆ่ฟฐไธญ๏ผ่ซๆๆธๅญธๆๆฏๅฐๆฅญ่ก่ช๏ผ็จ Latex ๅ ่ฆ๏ผ$...$๏ผ | |
| ๅ ๆธไน้คใๆ น่ใๆฌกๆน็ญ็ญ็้็ฎๅผๅฃ่ชไนๆๆ LATEX ๆธๅญธ็ฌฆ่ | |
| ๆด้ซๆ ผๅผ็บ๏ผ | |
| ## ๐ ไธป้ก๏ผ{{title}} (ๅฆๆๆฒๆ title ๅฐฑ็็ฅ) | |
| ## ๐ ๆด้ซๆ่ฆ | |
| - (ไธๅ bullet point....) | |
| ## ๐ ้้ปๆฆๅฟต | |
| - xxx | |
| - xxx | |
| - xxx | |
| ## ๐ก ็บไป้บผๆๅ่ฆๅญธ้ๅ๏ผ | |
| - (ไธๅ bullet point....) | |
| ## โ ๅปถไผธๅฐๅ้ก | |
| - (ไธๅ bullet point....่ซๅ็นใ่ชฒ็จๅ็จฑใ็บๅญธ็ฟ้้ป๏ผ้ฒ่ก้้ปๆด็๏ผไธ่ฆๆด็่ทๆ ๅขๆ ไบ็ธ้็ๅ้ก) | |
| """ | |
| content = generate_content_by_LLM(sys_content, user_content, response_format=None, LLM_model=LLM_model, model_name=None) | |
| all_content.append(content + "\n") | |
| if len(all_content) > 1: | |
| all_content_cnt = len(all_content) | |
| all_content_str = json.dumps(all_content) | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| ่ชฒ็จๅ็จฑ๏ผ{title} | |
| ็ง็ฎ๏ผ{subject} | |
| ๅนด็ด๏ผ{grade} | |
| ่ซๆ นๆๅ งๆ๏ผ {all_content_str} | |
| ๅ ฑๆ {all_content_cnt} ๆฎต๏ผ่ซ็ธฑๆดๆไธ็ฏๆ่ฆ | |
| ๆ ผๅผ็บ Markdown | |
| ๅฆๆๆ่ชฒ็จๅ็จฑ๏ผ่ซๅ็นใ่ชฒ็จๅ็จฑใ็บๅญธ็ฟ้้ป๏ผ้ฒ่ก้้ปๆด็๏ผไธ่ฆๆด็่ทๆ ๅขๆ ไบ็ธ้็ๅ้ก | |
| ๆด้ซๆ่ฆๅจ {all_content_cnt} ็พๅญไปฅๅ ง | |
| ้้ปๆฆๅฟตๅๅบ bullet points๏ผ่ณๅฐไธๅ๏ผๆๅคๅๅ | |
| ไปฅๅๅฏ่ฝ็็ต่ซ่็ตๅฐพๅปถไผธๅฐๅ้กๆไพๅญธ็ไฝๅๆ | |
| ๆ่ฟฐไธญ๏ผ่ซๆๆธๅญธๆๆฏๅฐๆฅญ่ก่ช๏ผ็จ Latex ๅ ่ฆ๏ผ$...$๏ผ | |
| ๅ ๆธไน้คใๆ น่ใๆฌกๆน็ญ็ญ็้็ฎๅผๅฃ่ชไนๆๆ LATEX ๆธๅญธ็ฌฆ่ | |
| ๆด้ซๆ ผๅผ็บ๏ผ | |
| ## ๐ ไธป้ก๏ผ{{title}} (ๅฆๆๆฒๆ title ๅฐฑ็็ฅ) | |
| ## ๐ ๆด้ซๆ่ฆ | |
| - ( {all_content_cnt} ๅ bullet point....) | |
| ## ๐ ้้ปๆฆๅฟต | |
| - xxx | |
| - xxx | |
| - xxx | |
| ## ๐ก ็บไป้บผๆๅ่ฆๅญธ้ๅ๏ผ | |
| - ( {all_content_cnt} ๅ bullet point....) | |
| ## โ ๅปถไผธๅฐๅ้ก | |
| - ( {all_content_cnt} ๅ bullet point....่ซๅ็นใ่ชฒ็จๅ็จฑใ็บๅญธ็ฟ้้ป๏ผ้ฒ่ก้้ปๆด็๏ผไธ่ฆๆด็่ทๆ ๅขๆ ไบ็ธ้็ๅ้ก) | |
| """ | |
| final_content = generate_content_by_LLM(sys_content, user_content, response_format=None, LLM_model=LLM_model, model_name=None) | |
| else: | |
| final_content = all_content[0] | |
| return final_content | |
| def get_questions(video_id, df_string, source="gcs", LLM_model=None): | |
| if source == "gcs": | |
| # ๅป gcs ็ขบ่ชๆฏๆๆ video_id_questions.json | |
| print("===get_questions on gcs===") | |
| gcs_client = GCS_CLIENT | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_questions.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| is_questions_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_questions_exists: | |
| questions = generate_questions(df_string, LLM_model) | |
| questions_text = json.dumps(questions, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, questions_text) | |
| print("questionsๅทฒไธๅณๅฐGCS") | |
| else: | |
| # ้ๅญ็จฟๅทฒๅญๅจ๏ผไธ่ฝฝ้ๅญ็จฟๅ ๅฎน | |
| print("questionsๅทฒๅญๅจไบGCSไธญ") | |
| questions_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| questions = json.loads(questions_text) | |
| elif source == "drive": | |
| # ๅป g drive ็ขบ่ชๆฏๆๆ video_id_questions.json | |
| print("===get_questions===") | |
| service = init_drive_service() | |
| parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' | |
| folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) | |
| file_name = f'{video_id}_questions.json' | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| exists, file_id = check_file_exists(service, folder_id, file_name) | |
| if not exists: | |
| questions = generate_questions(df_string, LLM_model) | |
| questions_text = json.dumps(questions, ensure_ascii=False, indent=2) | |
| upload_content_directly(service, file_name, folder_id, questions_text) | |
| print("questionsๅทฒไธๅณๅฐGoogle Drive") | |
| else: | |
| # ้ๅญ็จฟๅทฒๅญๅจ๏ผไธ่ฝฝ้ๅญ็จฟๅ ๅฎน | |
| print("questionsๅทฒๅญๅจไบGoogle Driveไธญ") | |
| questions_text = download_file_as_string(service, file_id) | |
| questions = json.loads(questions_text) | |
| q1 = questions[0] if len(questions) > 0 else "" | |
| q2 = questions[1] if len(questions) > 1 else "" | |
| q3 = questions[2] if len(questions) > 2 else "" | |
| print("=====get_questions=====") | |
| print(f"q1: {q1}") | |
| print(f"q2: {q2}") | |
| print(f"q3: {q3}") | |
| print("=====get_questions=====") | |
| return q1, q2, q3 | |
| def generate_questions(df_string, LLM_model=None): | |
| print("===generate_questions===") | |
| # ไฝฟ็จ OpenAI ็ๆๅบไบไธไผ ๆฐๆฎ็้ฎ้ข | |
| if isinstance(df_string, str): | |
| df_string_json = json.loads(df_string) | |
| else: | |
| df_string_json = df_string | |
| content_text = "" | |
| for entry in df_string_json: | |
| content_text += entry["text"] + "๏ผ" | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไธฆ็จๆขๆ่ณๆ็บๆฌ่ณช็ๆธฌ็จๆถๅฏ่ฝๆๅ็ๅ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| ่ซๆ นๆ {content_text} ็ๆไธๅๅ้ก๏ผไธฆ็จ JSON ๆ ผๅผ่ฟๅ | |
| ไธๅฎ่ฆไฝฟ็จ zh-TW๏ผ้้ๅธธ้่ฆ๏ผ | |
| EXAMPLE: | |
| {{ | |
| questions: | |
| [q1็ๆ่ฟฐtext, q2็ๆ่ฟฐtext, q3็ๆ่ฟฐtext] | |
| }} | |
| """ | |
| response_format = { "type": "json_object" } | |
| questions = generate_content_by_LLM(sys_content, user_content, response_format, LLM_model, model_name=None) | |
| questions_list = json.loads(questions)["questions"] | |
| print("=====json_response=====") | |
| print(questions_list) | |
| print("=====json_response=====") | |
| return questions_list | |
| def get_questions_answers(video_id, df_string, source="gcs", LLM_model=None): | |
| if source == "gcs": | |
| try: | |
| print("===get_questions_answers on gcs===") | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_questions_answers.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| is_questions_answers_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_questions_answers_exists: | |
| questions_answers = generate_questions_answers(df_string, LLM_model) | |
| questions_answers_text = json.dumps(questions_answers, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, questions_answers_text) | |
| print("questions_answersๅทฒไธๅณๅฐGCS") | |
| else: | |
| # questions_answersๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("questions_answersๅทฒๅญๅจไบGCSไธญ") | |
| questions_answers_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| questions_answers = json.loads(questions_answers_text) | |
| except Exception as e: | |
| print(f"Error getting questions_answers: {str(e)}") | |
| questions_list = get_questions(video_id, df_string, source, LLM_model) | |
| questions_answers = [{"question": q, "answer": ""} for q in questions_list] | |
| return questions_answers | |
| def generate_questions_answers(df_string, LLM_model=None): | |
| print("===generate_questions_answers===") | |
| segments = split_data(df_string, word_base=100000) | |
| all_content = [] | |
| for segment in segments: | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| ่ซๆ นๆ {segment} ็ๆไธๅๅ้ก่ท็ญๆก๏ผไธป่ฆ่ๅญธ็งๆ้๏ผไธ่ฆๅ่ทๆ ็ฏๆ ไบ็ธ้็ๅ้ก | |
| ็ญๆก่ฆๅจๆๅพๆจ็คบๅบ่ใๅ่๏ผ00:01:05ใ๏ผ่ซๆ นๆๆ้่ปธ start_time ไพๆจ็คบ | |
| ่ซ็ขบไฟๅ้ก่ท็ญๆก้ฝๆฏ็น้ซไธญๆ zh-TW | |
| ็ญๆกไธ็จๆฏๆจๆบ็ญๆก๏ผ่ๆฏๅธถๆๅ็ผๆง็่ๆ ผๆๅบๅผๅ็ญ๏ผ่ฎๅญธ็ๆ่ๆฌไพ็ๅ้ก๏ผไปฅๅ่ฉฒๅปๅ่็ๆ้้ป | |
| ไธฆ็จ JSON ๆ ผๅผ่ฟๅ list ๏ผ่ซไธๅฎ่ฆ็ตฆไธๅๅ้ก่ท็ญๆก๏ผไธ่ฆ่ฃๅจไธๅ list ่ฃก้ข | |
| k-v pair ็ key ๆฏ question, value ๆฏ answer | |
| EXAMPLE: | |
| {{ | |
| "questions_answers": | |
| [ | |
| {{question: q1็ๆ่ฟฐtext, answer: q1็็ญๆกtextใๅ่๏ผ00:01:05ใ}}, | |
| {{question: q2็ๆ่ฟฐtext, answer: q2็็ญๆกtextใๅ่๏ผ00:32:05ใ}}, | |
| {{question: q3็ๆ่ฟฐtext, answer: q3็็ญๆกtextใๅ่๏ผ01:03:35ใ}} | |
| ] | |
| }} | |
| """ | |
| response_format = { "type": "json_object" } | |
| content = generate_content_by_LLM(sys_content, user_content, response_format, LLM_model, model_name=None) | |
| content_json = json.loads(content)["questions_answers"] | |
| all_content += content_json | |
| print("=====all_content=====") | |
| print(all_content) | |
| print("=====all_content=====") | |
| return all_content | |
| def change_questions(password, df_string): | |
| verify_password(password) | |
| questions = generate_questions(df_string) | |
| q1 = questions[0] if len(questions) > 0 else "" | |
| q2 = questions[1] if len(questions) > 1 else "" | |
| q3 = questions[2] if len(questions) > 2 else "" | |
| print("=====get_questions=====") | |
| print(f"q1: {q1}") | |
| print(f"q2: {q2}") | |
| print(f"q3: {q3}") | |
| print("=====get_questions=====") | |
| return q1, q2, q3 | |
| def get_key_moments(video_id, formatted_simple_transcript, formatted_transcript, source, LLM_model=None): | |
| if source == "gcs": | |
| print("===get_key_moments on gcs===") | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_key_moments.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| is_key_moments_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_key_moments_exists: | |
| key_moments = generate_key_moments(formatted_simple_transcript, formatted_transcript, LLM_model) | |
| key_moments_json = {"key_moments": key_moments} | |
| key_moments_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, key_moments_text) | |
| print("key_momentsๅทฒไธๅณๅฐGCS") | |
| else: | |
| # key_momentsๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("key_momentsๅทฒๅญๅจไบGCSไธญ") | |
| key_moments_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| key_moments_json = json.loads(key_moments_text) | |
| # ๆชขๆฅ key_moments ๆฏๅฆๆ keywords | |
| print("===ๆชขๆฅ key_moments ๆฏๅฆๆ keywords===") | |
| has_keywords_added = False | |
| for key_moment in key_moments_json["key_moments"]: | |
| if "keywords" not in key_moment: | |
| transcript = key_moment["transcript"] | |
| key_moment["keywords"] = generate_key_moments_keywords(transcript, LLM_model) | |
| print("===keywords===") | |
| print(key_moment["keywords"]) | |
| print("===keywords===") | |
| has_keywords_added = True | |
| if has_keywords_added: | |
| key_moments_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, key_moments_text) | |
| key_moments_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| key_moments_json = json.loads(key_moments_text) | |
| # ๆชขๆฅ key_moments ๆฏๅฆๆ suggested_images | |
| print("===ๆชขๆฅ key_moments ๆฏๅฆๆ suggested_images===") | |
| has_suggested_images_added = False | |
| for key_moment in key_moments_json["key_moments"]: | |
| if "suggested_images" not in key_moment: | |
| key_moment["suggested_images"] = generate_key_moments_suggested_images(key_moment) | |
| has_suggested_images_added = True | |
| if has_suggested_images_added: | |
| key_moments_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, key_moments_text) | |
| key_moments_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| key_moments_json = json.loads(key_moments_text) | |
| elif source == "drive": | |
| print("===get_key_moments on drive===") | |
| service = init_drive_service() | |
| parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' | |
| folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) | |
| file_name = f'{video_id}_key_moments.json' | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| exists, file_id = check_file_exists(service, folder_id, file_name) | |
| if not exists: | |
| key_moments = generate_key_moments(formatted_simple_transcript, formatted_transcript, LLM_model) | |
| key_moments_json = {"key_moments": key_moments} | |
| key_moments_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2) | |
| upload_content_directly(service, file_name, folder_id, key_moments_text) | |
| print("key_momentsๅทฒไธๅณๅฐGoogle Drive") | |
| else: | |
| # key_momentsๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("key_momentsๅทฒๅญๅจไบGoogle Driveไธญ") | |
| key_moments_text = download_file_as_string(service, file_id) | |
| key_moments_json = json.loads(key_moments_text) | |
| return key_moments_json | |
| def generate_key_moments(formatted_simple_transcript, formatted_transcript, LLM_model=None): | |
| print("===generate_key_moments===") | |
| segments = split_data(formatted_simple_transcript, word_base=100000) | |
| all_content = [] | |
| for segment in segments: | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| # ๆๆฌ๏ผ{segment} | |
| # Rule | |
| 1. ่ซๆ นๆๆๆฌ๏ผๆๅๅบ 5~8 ๆฎต้้ปๆ่ฆ๏ผไธฆ็ตฆๅบๅฐๆ็ๆ้่ปธ๏ผๆฏไธๆฎต้้ป็ๆ้่ปธ็ฏๅๅคงๆผ1ๅ้๏ผไฝๅฐๆผ 1/3 ็ธฝ้ๅญ็จฟ้ทๅบฆ | |
| 2. ๅ งๅฎน็ถไธญ๏ผๅฆๆๆๅ่ๆนๆณใๆจกๅผๆๆฏๅทฅๅ ท๏ผๅฐฑ็จ bulletpoint ๆๆฏ ็ทจ่ๆนๅผ ๅๅบ๏ผไธฆๅจๅ่้จๅ็้ ญๅฐพ็จ[]ๅกๅ๏ผexample: FAANG ๆฏไปฅไธไบ้ๅ ฌๅธ๏ผ [1. Aๅ ฌๅธ 2.Bๅ ฌๅธ 3.Cๅ ฌๅธ 4.Dๅ ฌๅธ 5.Eๅ ฌๅธ ]๏ผ...๏ผ | |
| 3. ๆณจๆไธ่ฆ้บๆผไปปไฝไธๆฎตๆ้่ปธ็ๅ งๅฎน ๅพ้ถ็ง้ๅง๏ผไปฅ้็จฎๆนๅผๅๆๆดๅๆๆฌ๏ผๅพ้ถ็ง้ๅงๅๆ๏ผ็ดๅฐ็ตๆใ้ๅพ้่ฆ | |
| 4. ็ตๅฐพ็ๆ้ๅฆๆๆ็ธฝ็ตๆง็่ฉฑ๏ผไน่ฆๆทๅ | |
| 5. ๅฆๆ้ ญๅฐพ็ๆ ็ฏไธๆฏ้้ป๏ผ็นๅฅๆฏๆๆๅผๆๆฏไป็ดน่ชๅทฑๆฏ่ชฐใๆๆฏfinally say goodbye ๅฐฑๆฏไธ้่ฆ็ๆ ็ฏ๏ผๅฐฑไธ็จๆทๅ | |
| 6. ้้ตๅญๅพtranscript extract to keyword๏ผไฟ็ๅฐๅฎถๅๅญใๅฐๆฅญ่ก่ชใๅนดไปฝใๆธๅญใๆๅๅ็จฑใๅฐๅใๆธๅญธๅ ฌๅผ | |
| 7. ๆๅพๅๆชขๆฅไธ้๏ผtext, keywords please use or transfer to zh-TW, it's very important | |
| # restrictions | |
| 1. ่ซไธๅฎ่ฆ็จ zh-TW๏ผ้้ๅธธ้่ฆ๏ผ | |
| 2. ๅฆๆๆฏ็ไผผไธปๆญใไธปๆไบบ็ๅ็ๅ ดๆฏ๏ผไธๆฒๆไปปไฝๆ็จ็่ณ่จ๏ผ่ซไธ่ฆ้ธๅ | |
| 3. ๅฆๆ้ ญๅฐพ็ๆ ็ฏไธๆฏ้้ป๏ผ็นๅฅๆฏๆๆๅผๆๆฏไป็ดน่ชๅทฑๆฏ่ชฐใๆๆฏfinally say goodbye ๅฐฑๆฏไธ้่ฆ็ๆ ็ฏ๏ผๅฐฑไธ็จๆทๅ | |
| 4. ๆ้่ปธ่ซๅๅฐ็งๆธ๏ผไธ่ฆๅชๅๅฐๅ้ๆธ๏ผ้ๅพ้่ฆ | |
| Example: retrun JSON | |
| {{key_moments:[{{ | |
| "start": "00:00", | |
| "end": "01:35", | |
| "text": "้ๅญ็จฟ็้้ปๆ่ฆ", | |
| "keywords": ["้้ตๅญ", "้้ตๅญ"] | |
| }}] | |
| }} | |
| """ | |
| response_format = { "type": "json_object" } | |
| content = generate_content_by_LLM(sys_content, user_content, response_format, LLM_model, model_name=None) | |
| key_moments = json.loads(content)["key_moments"] | |
| # "transcript": get text from formatted_simple_transcript | |
| for moment in key_moments: | |
| start_time = parse_time(moment['start']) | |
| end_time = parse_time(moment['end']) | |
| # ไฝฟ็จ่ฝๆๅพ็ timedelta ็ฉไปถ้ฒ่กๆ้ | |
| moment['transcript'] = "๏ผ".join([entry['text'] for entry in formatted_simple_transcript | |
| if start_time <= parse_time(entry['start_time']) <= end_time]) | |
| print("=====key_moments=====") | |
| print(key_moments) | |
| print("=====key_moments=====") | |
| image_links = {entry['start_time']: entry['screenshot_path'] for entry in formatted_transcript} | |
| for moment in key_moments: | |
| start_time = parse_time(moment['start']) | |
| end_time = parse_time(moment['end']) | |
| # ไฝฟ็จ่ฝๆๅพ็ timedelta ็ฉไปถ้ฒ่กๆ้ๆฏ่ผ | |
| moment_images = [image_links[time] for time in image_links | |
| if start_time <= parse_time(time) <= end_time] | |
| moment['images'] = moment_images | |
| # ๆชขๆฅๆฏๅฆๆ suggested_images | |
| if "suggested_images" not in moment: | |
| moment["suggested_images"] = generate_key_moments_suggested_images(moment, LLM_model) | |
| print("===moment_suggested_images===") | |
| print(moment["suggested_images"]) | |
| print("===moment_suggested_images===") | |
| all_content += key_moments | |
| return all_content | |
| def generate_key_moments_keywords(transcript, LLM_model=None): | |
| print("===generate_key_moments_keywords===") | |
| segments = split_data(transcript, word_base=100000) | |
| all_content = [] | |
| for segment in segments: | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| transcript extract to keyword | |
| ไฟ็ๅฐๅฎถๅๅญใๅฐๆฅญ่ก่ชใๅนดไปฝใๆธๅญใๆๅๅ็จฑใๅฐๅใๆธๅญธๅ ฌๅผใๆธๅญธ่กจ็คบๅผใ็ฉ็ๅๅญธ็ฌฆ่๏ผ | |
| ไธ็จ็ตฆไธไธๆ๏ผ็ดๆฅ็ตฆๅบ้้ตๅญ๏ผไฝฟ็จ zh-TW๏ผ็จ้่ๅ้๏ผ example: ้้ตๅญ1, ้้ตๅญ2 | |
| transcript๏ผ{segment} | |
| """ | |
| content = generate_content_by_LLM(sys_content, user_content, response_format=None, LLM_model=LLM_model, model_name=None) | |
| keywords = content.strip().split(",") | |
| all_content += keywords | |
| return all_content | |
| def generate_key_moments_suggested_images(key_moment, LLM_model=None): | |
| # Prepare the text and keywords | |
| text = key_moment["text"] | |
| keywords = ', '.join(key_moment["keywords"]) | |
| images = key_moment["images"] | |
| images_list_prompt = "" | |
| for i, image_url in enumerate(images): | |
| images_list_prompt += f"\nๅ็ {i+1}: {image_url}" | |
| # Prepare the user prompt with text and keywords | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| - ๆๆฌ: {text} | |
| - ้้ตๅญ: {keywords} | |
| # Rule: | |
| 1. ไฟ็ๆๅ่กจๆๆฏๆธๆ็ๅ็ | |
| 2. ๆ นๆๆๆฌๅ้้ตๅญ๏ผ้ธๆๅบๆๅ้ฉ็ๅ็ใ | |
| 3. ็ธฝๆฏไฟ็ๆๅพไธๅผต๏ผ้ค้ไปๆฏไธๅผต็ฉบ็ฝๅ็๏ผๆๆฏไธๅผตๆฒๆไปปไฝๅ งๅฎน็ๅ็ | |
| # Restrictions: | |
| 1. ๅฆๆๆฏ็ไผผไธปๆญใไธปๆไบบ็ๅ็ๅ ดๆฏ๏ผไธๆฒๆไปปไฝๆ็จ็่ณ่จ๏ผ่ซไธ่ฆ้ธๅ๏ผ้ๅพ้่ฆ | |
| 2. ไธ่ฆๆ็ธไผผๆๆฏๆฆๅฟต้่ค็ๅ็ | |
| 3. ็งป้คๆดๅผตๅ็ๆฏ้ป่ฒใ่่ฒๆๆฏ็ฝ่ฒ็ๅ็ | |
| 4. ็งป้คๆฒๆไปปไฝๅ งๅฎน็ๅ็ | |
| 5. ไธ้่ฆ็ๆๅญๅน็ๅทฎ็๏ผๅช้่ฆ็ๅ็็ๅ งๅฎน | |
| ่ซๆ นๆ้ไบไฟกๆฏ๏ผๅ็ๅ่กจๅฆไธ: | |
| {images_list_prompt} | |
| ๅๅณ JSON LIST ๅฐฑๅฅฝ๏ผไธ็จๅๅณไปปไฝๆ่ฟฐ่็ตก๏ผไนไธ่ฆ ```json ๅ ่ฆ | |
| EXAMPLE: | |
| {{ | |
| "suggested_images": ["ๅ็1็ image_url", "ๅ็2 ็ image_url", "ๅ็3็ image_url"] | |
| }} | |
| """ | |
| response_format = { "type": "json_object" } | |
| response = generate_content_by_LLM(sys_content, user_content, response_format, LLM_model, model_name=None) | |
| print("===generate_key_moments_suggested_images===") | |
| print(response) | |
| print("===generate_key_moments_suggested_images===") | |
| suggested_images = json.loads(response)["suggested_images"] | |
| return suggested_images | |
| def get_key_moments_html(key_moments): | |
| css = """ | |
| <style> | |
| #gallery-main { | |
| display: flex; | |
| align-items: center; | |
| margin-bottom: 20px; | |
| } | |
| #gallery { | |
| position: relative; | |
| width: 50%; | |
| flex: 1; | |
| } | |
| #text-content { | |
| flex: 2; | |
| margin-left: 20px; | |
| } | |
| #gallery #gallery-container{ | |
| position: relative; | |
| width: 100%; | |
| height: 0px; | |
| padding-bottom: 56.7%; /* 16/9 ratio */ | |
| background-color: blue; | |
| } | |
| #gallery #gallery-container #gallery-content{ | |
| position: absolute; | |
| top: 0px; | |
| right: 0px; | |
| bottom: 0px; | |
| left: 0px; | |
| height: 100%; | |
| display: flex; | |
| scroll-snap-type: x mandatory; | |
| overflow-x: scroll; | |
| scroll-behavior: smooth; | |
| } | |
| #gallery #gallery-container #gallery-content .gallery__item{ | |
| width: 100%; | |
| height: 100%; | |
| flex-shrink: 0; | |
| scroll-snap-align: start; | |
| scroll-snap-stop: always; | |
| position: relative; | |
| } | |
| #gallery #gallery-container #gallery-content .gallery__item img{ | |
| display: block; | |
| width: 100%; | |
| height: 100%; | |
| object-fit: contain; | |
| background-color: white; | |
| } | |
| .click-zone{ | |
| position: absolute; | |
| width: 20%; | |
| height: 100%; | |
| z-index: 3; | |
| } | |
| .click-zone.click-zone-prev{ | |
| left: 0px; | |
| } | |
| .click-zone.click-zone-next{ | |
| right: 0px; | |
| } | |
| #gallery:not(:hover) .arrow{ | |
| opacity: 0.8; | |
| } | |
| .arrow{ | |
| text-align: center; | |
| z-index: 3; | |
| position: absolute; | |
| display: block; | |
| width: 25px; | |
| height: 25px; | |
| line-height: 25px; | |
| background-color: black; | |
| border-radius: 50%; | |
| text-decoration: none; | |
| color: white !important; | |
| opacity: 0.8; | |
| transition: opacity 200ms ease; | |
| } | |
| .arrow:hover{ | |
| opacity: 1; | |
| } | |
| .arrow span{ | |
| position: relative; | |
| top: 2px; | |
| } | |
| .arrow.arrow-prev{ | |
| top: 50%; | |
| left: 5px; | |
| } | |
| .arrow.arrow-next{ | |
| top: 50%; | |
| right: 5px; | |
| } | |
| .arrow.arrow-disabled{ | |
| opacity:0.8; | |
| } | |
| #text-content { | |
| padding: 0px 36px; | |
| } | |
| #text-content p { | |
| margin-top: 10px; | |
| } | |
| body{ | |
| font-family: sans-serif; | |
| margin: 0px; | |
| padding: 0px; | |
| } | |
| main{ | |
| padding: 0px; | |
| margin: 0px; | |
| max-width: 900px; | |
| margin: auto; | |
| } | |
| .hidden{ | |
| border: 0; | |
| clip: rect(0 0 0 0); | |
| height: 1px; | |
| margin: -1px; | |
| overflow: hidden; | |
| padding: 0; | |
| position: absolute; | |
| width: 1px; | |
| } | |
| .keyword-label { | |
| display: inline-block; | |
| padding: 5px 10px; | |
| margin: 2px; | |
| border: 2px solid black; | |
| border-radius: 5px; | |
| font-size: 0.9em; | |
| } | |
| details { | |
| border-radius: 5px; | |
| padding: 10px; | |
| border: 2px solid black; | |
| } | |
| summary { | |
| font-weight: bold; | |
| cursor: pointer; | |
| outline: none; | |
| } | |
| summary::-webkit-details-marker { | |
| display: none; | |
| } | |
| @media (max-width: 768px) { | |
| #gallery-main { | |
| flex-direction: column; /* ๅจๅฐๅฑๅนไธๅ ๅ ๅ ็ด */ | |
| } | |
| #gallery { | |
| width: 100%; /* ่ฎฉ็ปๅปๅ ๆปกๆดไธชๅฎนๅจๅฎฝๅบฆ */ | |
| } | |
| #text-content { | |
| margin-left: 0; /* ็งป้คๅทฆ่พน่ท๏ผ่ฎฉๆๆฌๅ ๅฎนๅ ๆปกๅฎฝๅบฆ */ | |
| margin-top: 20px; /* ไธบๆๆฌๅ ๅฎนๆทปๅ ้กถ้จ้ด่ท */ | |
| } | |
| #gallery #gallery-container { | |
| height: 350px; /* ๆ่ ไฝ ๅฏไปฅ่ฎพ็ฝฎไธไธชๅบๅฎ็้ซๅบฆ๏ผ่ไธๆฏ็จ padding-bottom */ | |
| padding-bottom: 0; /* ็งป้คๅบ้จๅกซๅ */ | |
| } | |
| } | |
| </style> | |
| """ | |
| key_moments_html = css | |
| for i, moment in enumerate(key_moments): | |
| # if "suggested_images" in moment: | |
| # images = moment['suggested_images'] | |
| # else: | |
| # images = moment['images'] | |
| # image_elements = "" | |
| # for j, image in enumerate(images): | |
| # current_id = f"img_{i}_{j}" | |
| # prev_id = f"img_{i}_{j-1}" if j-1 >= 0 else f"img_{i}_{len(images)-1}" | |
| # next_id = f"img_{i}_{j+1}" if j+1 < len(images) else f"img_{i}_0" | |
| # image_elements += f""" | |
| # <div id="{current_id}" class="gallery__item"> | |
| # <a href="#{prev_id}" class="click-zone click-zone-prev"> | |
| # <div class="arrow arrow-disabled arrow-prev"> โ๏ธ </div> | |
| # </a> | |
| # <a href="#{next_id}" class="click-zone click-zone-next"> | |
| # <div class="arrow arrow-next"> โถ๏ธ </div> | |
| # </a> | |
| # <img src="{image}"> | |
| # </div> | |
| # """ | |
| # gallery_content = f""" | |
| # <div id="gallery-content"> | |
| # {image_elements} | |
| # </div> | |
| # """ | |
| keywords_html = ' '.join([f'<span class="keyword-label">{keyword}</span>' for keyword in moment['keywords']]) | |
| key_moments_html += f""" | |
| <div class="gallery-container" id="gallery-main"> | |
| <div id="text-content"> | |
| <h3>{moment['start']} - {moment['end']}</h3> | |
| <p><strong>ๆ่ฆ: {moment['text']} </strong></p> | |
| <details> | |
| <summary>้ๅญ็จฟ</summary> | |
| <p><strong>ๅ งๅฎน: </strong> {moment['transcript']} </p> | |
| </details> | |
| <p><strong>้้ตๅญ:</strong> {keywords_html}</p> | |
| </div> | |
| </div> | |
| """ | |
| return key_moments_html | |
| # ---- LLM CRUD ---- | |
| def get_LLM_content(video_id, kind): | |
| print(f"===get_{kind}===") | |
| gcs_client = GCS_CLIENT | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_{kind}.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅ file ๆฏๅฆๅญๅจ | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if is_file_exists: | |
| content = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| content_json = json.loads(content) | |
| if kind == "reading_passage_latex": | |
| content_text = content_json["reading_passage"] | |
| elif kind == "summary_markdown": | |
| content_text = content_json["summary"] | |
| elif kind == "key_moments": | |
| content_text = content_json["key_moments"] | |
| content_text = json.dumps(content_text, ensure_ascii=False, indent=2) | |
| else: | |
| content_text = json.dumps(content_json, ensure_ascii=False, indent=2) | |
| else: | |
| content_text = "" | |
| return content_text | |
| def enable_edit_mode(): | |
| return gr.update(interactive=True) | |
| def delete_LLM_content(video_id, kind): | |
| print(f"===delete_{kind}===") | |
| gcs_client = GCS_CLIENT | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_{kind}.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅ file ๆฏๅฆๅญๅจ | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if is_file_exists: | |
| GCS_SERVICE.delete_blob(bucket_name, blob_name) | |
| print(f"{file_name}ๅทฒไปGCSไธญๅ ้ค") | |
| return gr.update(value="", interactive=False) | |
| def update_LLM_content(video_id, new_content, kind): | |
| print(f"===upfdate kind on gcs===") | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_{kind}.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| if kind == "reading_passage_latex": | |
| print("=========reading_passage=======") | |
| print(new_content) | |
| reading_passage_json = {"reading_passage": str(new_content)} | |
| reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, reading_passage_text) | |
| updated_content = new_content | |
| elif kind == "summary_markdown": | |
| summary_json = {"summary": str(new_content)} | |
| summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, summary_text) | |
| updated_content = new_content | |
| elif kind == "mind_map": | |
| mind_map_json = {"mind_map": str(new_content)} | |
| mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, mind_map_text) | |
| updated_content = mind_map_text | |
| elif kind == "key_moments": | |
| # from update_LLM_btn -> new_content is a string | |
| # create_LLM_content -> new_content is a list | |
| if isinstance(new_content, str): | |
| key_moments_list = json.loads(new_content) | |
| else: | |
| key_moments_list = new_content | |
| key_moments_json = {"key_moments": key_moments_list} | |
| key_moments_json_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, key_moments_json_text) | |
| key_moments_text = json.dumps(key_moments_list, ensure_ascii=False, indent=2) | |
| updated_content = key_moments_text | |
| elif kind == "transcript": | |
| if isinstance(new_content, str): | |
| transcript_json = json.loads(new_content) | |
| else: | |
| transcript_json = new_content | |
| transcript_text = json.dumps(transcript_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, transcript_text) | |
| updated_content = transcript_text | |
| elif kind == "questions": | |
| # from update_LLM_btn -> new_content is a string | |
| # create_LLM_content -> new_content is a list | |
| if isinstance(new_content, str): | |
| questions_json = json.loads(new_content) | |
| else: | |
| questions_json = new_content | |
| questions_text = json.dumps(questions_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, questions_text) | |
| updated_content = questions_text | |
| elif kind == "questions_answers": | |
| # from update_LLM_btn -> new_content is a string | |
| # create_LLM_content -> new_content is a list | |
| if isinstance(new_content, str): | |
| questions_answers_json = json.loads(new_content) | |
| else: | |
| questions_answers_json = new_content | |
| questions_answers_text = json.dumps(questions_answers_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, questions_answers_text) | |
| updated_content = questions_answers_text | |
| elif kind == "ai_content_list": | |
| if isinstance(new_content, str): | |
| ai_content_json = json.loads(new_content) | |
| else: | |
| ai_content_json = new_content | |
| ai_content_text = json.dumps(ai_content_json, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, ai_content_text) | |
| updated_content = ai_content_text | |
| print(f"{kind} ๅทฒๆดๆฐๅฐGCS") | |
| return gr.update(value=updated_content, interactive=False) | |
| def create_LLM_content(video_id, df_string, kind, LLM_model=None): | |
| print(f"===create_{kind}===") | |
| print(f"video_id: {video_id}") | |
| if kind == "reading_passage_latex": | |
| content = generate_reading_passage(df_string, LLM_model) | |
| update_LLM_content(video_id, content, kind) | |
| elif kind == "summary_markdown": | |
| meta_data = get_meta_data(video_id) | |
| content = generate_summarise(df_string, meta_data, LLM_model) | |
| update_LLM_content(video_id, content, kind) | |
| elif kind == "mind_map": | |
| content = generate_mind_map(df_string) | |
| update_LLM_content(video_id, content, kind) | |
| elif kind == "key_moments": | |
| if isinstance(df_string, str): | |
| transcript = json.loads(df_string) | |
| else: | |
| transcript = df_string | |
| formatted_simple_transcript = create_formatted_simple_transcript(transcript) | |
| formatted_transcript = create_formatted_transcript(video_id, transcript) | |
| gen_content = generate_key_moments(formatted_simple_transcript, formatted_transcript, LLM_model) | |
| update_LLM_content(video_id, gen_content, kind) | |
| content = json.dumps(gen_content, ensure_ascii=False, indent=2) | |
| elif kind == "transcript": | |
| gen_content = process_transcript_and_screenshots_on_gcs(video_id) | |
| update_LLM_content(video_id, gen_content, kind) | |
| content = json.dumps(gen_content, ensure_ascii=False, indent=2) | |
| elif kind == "questions": | |
| gen_content = generate_questions(df_string, LLM_model) | |
| update_LLM_content(video_id, gen_content, kind) | |
| content = json.dumps(gen_content, ensure_ascii=False, indent=2) | |
| elif kind == "questions_answers": | |
| if isinstance(df_string, str): | |
| transcript = json.loads(df_string) | |
| else: | |
| transcript = df_string | |
| formatted_simple_transcript = create_formatted_simple_transcript(transcript) | |
| gen_content = generate_questions_answers(formatted_simple_transcript, LLM_model) | |
| update_LLM_content(video_id, gen_content, kind) | |
| content = json.dumps(gen_content, ensure_ascii=False, indent=2) | |
| return gr.update(value=content, interactive=False) | |
| # ---- LLM refresh CRUD ---- | |
| def reading_passage_add_latex_version(video_id): | |
| # ็ขบ่ช GCS ๆฏๅฆๆ reading_passage.json | |
| print("===reading_passage_convert_to_latex===") | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_reading_passage.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| print(f"blob_name: {blob_name}") | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_file_exists: | |
| raise gr.Error("reading_passage ไธๅญๅจ!") | |
| # ้ๅญ็จฟๅทฒๅญๅจ๏ผไธ่ฝฝ้ๅญ็จฟๅ ๅฎน | |
| print("reading_passage ๅทฒๅญๅจไบGCSไธญ๏ผ่ฝๆ Latex ๆจกๅผ") | |
| reading_passage_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| reading_passage_json = json.loads(reading_passage_text) | |
| original_reading_passage = reading_passage_json["reading_passage"] | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| ่ซๆ นๆ {original_reading_passage} | |
| ๆ่ฟฐไธญ๏ผ่ซๆๆธๅญธๆๆฏๅฐๆฅญ่ก่ช๏ผ็จ Latex ๅ ่ฆ๏ผ$...$๏ผ๏ผ็ก้ไธ่ฆๅปๆนๅๆฌ็ๆ็ซ | |
| ๅ ๆธไน้คใๆ น่ใๆฌกๆนใๅๅญธ็ฌฆ่ใ็ฉ็็ฌฆ่็ญ็ญ็้็ฎๅผๅฃ่ชไนๆๆ LATEX ็ฌฆ่ | |
| ่ซไธๅฎ่ฆไฝฟ็จ็น้ซไธญๆ zh-TW๏ผไธฆ็จๅฐ็ฃไบบ็ๅฃ่ช | |
| ็ข็็็ตๆไธ่ฆๅๅพๆ่งฃ้๏ผไนไธ่ฆๆ่ฟฐ้็ฏๆ็ซ ๆ้บผ็ข็็ | |
| ๅช้่ฆๅฐๆณจๆไพ Reading Passage๏ผๅญๆธๅจ 200~500 ๅญไปฅๅ ง | |
| """ | |
| messages = [ | |
| {"role": "system", "content": sys_content}, | |
| {"role": "user", "content": user_content} | |
| ] | |
| request_payload = { | |
| "model": "gpt-4o", | |
| "messages": messages, | |
| "max_tokens": 4000, | |
| } | |
| response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) | |
| new_reading_passage = response.choices[0].message.content.strip() | |
| print("=====new_reading_passage=====") | |
| print(new_reading_passage) | |
| print("=====new_reading_passage=====") | |
| reading_passage_json["reading_passage"] = new_reading_passage | |
| reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2) | |
| # ๅฆๅญ็บ reading_passage_latex.json | |
| new_file_name = f'{video_id}_reading_passage_latex.json' | |
| new_blob_name = f"{video_id}/{new_file_name}" | |
| GCS_SERVICE.upload_json_string(bucket_name, new_blob_name, reading_passage_text) | |
| return new_reading_passage | |
| def summary_add_markdown_version(video_id): | |
| # ็ขบ่ช GCS ๆฏๅฆๆ summary.json | |
| print("===summary_convert_to_markdown===") | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_summary.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| print(f"blob_name: {blob_name}") | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_file_exists: | |
| raise gr.Error("summary ไธๅญๅจ!") | |
| # ้ๅญ็จฟๅทฒๅญๅจ๏ผไธ่ฝฝ้ๅญ็จฟๅ ๅฎน | |
| print("summary ๅทฒๅญๅจไบGCSไธญ๏ผ่ฝๆ Markdown ๆจกๅผ") | |
| summary_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| summary_json = json.loads(summary_text) | |
| original_summary = summary_json["summary"] | |
| sys_content = "ไฝ ๆฏไธๅๆ ้ท่ณๆๅๆ่ทๅฝฑ็ๆๅญธ็่ๅธซ๏ผuser ็บๅญธ็๏ผ่ซ็ฒพ่ฎ่ณๆๆๆฌ๏ผ่ช่กๅคๆท่ณๆ็็จฎ้ก๏ผไฝฟ็จ zh-TW" | |
| user_content = f""" | |
| ่ซๆ นๆ {original_summary} | |
| ่ฝๆๆ ผๅผ็บ Markdown | |
| ๅชไฟ็๏ผ๐ ๆด้ซๆ่ฆใ๐ ้้ปๆฆๅฟตใ๐ก ็บไป้บผๆๅ่ฆๅญธ้ๅใโ ๅปถไผธๅฐๅ้ก | |
| ๅ ถไป็ไธ่ฆไฟ็ | |
| ๆด้ซๆ่ฆๅจไธ็พๅญไปฅๅ ง | |
| ้้ปๆฆๅฟต่ฝๆ bullet points | |
| ไปฅๅๅฏ่ฝ็็ต่ซ่็ตๅฐพๅปถไผธๅฐๅ้กๆไพๅญธ็ไฝๅๆ | |
| ๆ่ฟฐไธญ๏ผ่ซๆๆธๅญธๆๆฏๅฐๆฅญ่ก่ช๏ผ็จ Latex ๅ ่ฆ๏ผ$...$๏ผ | |
| ๅ ๆธไน้คใๆ น่ใๆฌกๆน็ญ็ญ็้็ฎๅผๅฃ่ชไนๆๆ LATEX ๆธๅญธ็ฌฆ่ | |
| ๆด้ซๆ ผๅผ็บ๏ผ | |
| ## ๐ ๆด้ซๆ่ฆ | |
| - (ไธๅ bullet point....) | |
| ## ๐ ้้ปๆฆๅฟต | |
| - xxx | |
| - xxx | |
| - xxx | |
| ## ๐ก ็บไป้บผๆๅ่ฆๅญธ้ๅ๏ผ | |
| - (ไธๅ bullet point....) | |
| ## โ ๅปถไผธๅฐๅ้ก | |
| - (ไธๅ bullet point....) | |
| """ | |
| messages = [ | |
| {"role": "system", "content": sys_content}, | |
| {"role": "user", "content": user_content} | |
| ] | |
| request_payload = { | |
| "model": "gpt-4o", | |
| "messages": messages, | |
| "max_tokens": 4000, | |
| } | |
| response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) | |
| new_summary = response.choices[0].message.content.strip() | |
| print("=====new_summary=====") | |
| print(new_summary) | |
| print("=====new_summary=====") | |
| summary_json["summary"] = new_summary | |
| summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2) | |
| # ๅฆๅญ็บ summary_markdown.json | |
| new_file_name = f'{video_id}_summary_markdown.json' | |
| new_blob_name = f"{video_id}/{new_file_name}" | |
| GCS_SERVICE.upload_json_string(bucket_name, new_blob_name, summary_text) | |
| return new_summary | |
| # LLM ๅผทๅถ้ๅท | |
| def get_sheet_data(sheet_url, range_name): | |
| data = SHEET_SERVICE.get_sheet_data_by_url(sheet_url, range_name) | |
| flattened_data = SHEET_SERVICE.flatten_column_data(data) | |
| flattened_data_string = ', '.join(flattened_data) | |
| return flattened_data_string | |
| def update_sheet_data(sheet_url, qa_result, video_id): | |
| # ๆ นๆ url ๆพๅฐ sheet ๏ผๆ นๆ video_id ๆพๅฐ sheet ไธญ็ video_id ็ row number | |
| sheet_data = SHEET_SERVICE.get_sheet_data_by_url(sheet_url) | |
| # print(f"sheet_data: {sheet_data}") | |
| if not sheet_data or len(sheet_data) < 1: | |
| print("้ฏ่ชค๏ผๅทฅไฝ่กจ่ณๆ็บ็ฉบๆ็ผบๅฐๆจ้ ญๅใ") | |
| return | |
| header_row = sheet_data[0] | |
| data_rows = sheet_data[1:] # ่ณๆๅ๏ผ่ทณ้ๆจ้ ญ๏ผ | |
| # ็ดๆฅๆๅฎ่ฆๅฐๆพ็ๆฌไฝๅ็จฑ | |
| target_column_name = 'ๅไธๅนณๅฐ YT Readable ID' | |
| target_qa_column_name = "QA" # ๅ่จญ QA ๆฌไฝ็ๆจ้กๆฏ "QA" | |
| try: | |
| # 1. ๆพๅฐ 'ๅไธๅนณๅฐ YT Readable ID' ๆๅจ็ๆฌไฝ็ดขๅผ (column index) | |
| video_id_col_index = header_row.index(target_column_name) | |
| except ValueError: | |
| # ๅฆๆๆพไธๅฐๆๅฎ็ๆฌไฝๅ็จฑ๏ผๅฐๅบ้ฏ่ชคไธฆ่ฟๅ | |
| print(f"้ฏ่ชค๏ผๅจๆจ้ ญๅ {header_row} ไธญๆพไธๅฐๆๅฎ็ๆฌไฝๅ็จฑ '{target_column_name}'") | |
| return | |
| # 2. ้ๆญท่ณๆๅ๏ผๅฐๆพ video_id ๅน้ ็ๅ็ดขๅผ (row index) | |
| target_row_index = -1 # ๅๅงๅ็บ -1๏ผ่กจ็คบๆชๆพๅฐ | |
| for i, row in enumerate(data_rows): | |
| # ็ขบไฟ่ฉฒๅๆ่ถณๅค ็ๆฌไฝ๏ผไธฆไธ่ฉฒๆฌไฝ็ๅผ็ญๆผ video_id | |
| if len(row) > video_id_col_index and row[video_id_col_index] == video_id: | |
| target_row_index = i + 1 # ๆพๅฐๅน้ ็ๅ๏ผๅ ถๅจๅๅง sheet_data ไธญ็็ดขๅผๆฏ i + 1 (ๅ ็บ data_rows ๅพ sheet_data[1] ้ๅง) | |
| break # ๆพๅฐๅพๅณๅฏ่ทณๅบ่ฟดๅ | |
| if target_row_index != -1: | |
| # target_row_index ๆฏ video_id ๅจ sheet_data (ๅซๆจ้ ญ) ไธญ็ 0-based ็ดขๅผ | |
| # ไพๅฆ๏ผ็ฌฌไธ็ญ่ณๆ็ target_row_index ๆฏ 1 | |
| actual_target_row_number = target_row_index + 1 # ้ๆฏๅฏฆ้็ 1-based ๅทฅไฝ่กจๅ่ (ไพๅฆ 2) | |
| print(f"ๆพๅฐ video_id '{video_id}' ๆผๆฌไฝ '{target_column_name}' ็็ฌฌ {actual_target_row_number} ๅ (ๅทฅไฝ่กจๅ่)") | |
| # --- ไปฅไธๆฏๆดๆฐ้่ผฏ --- | |
| try: | |
| # ๆพๅฐ QA ๆฌไฝ็็ดขๅผ | |
| qa_col_index = header_row.index(target_qa_column_name) | |
| # ๅๅพ็ฎๆจๅ็ๅฏฆ้่ณๆ | |
| target_row_data = sheet_data[target_row_index] | |
| # ๆชขๆฅ็ฎๆจๅๆฏๅฆๆ่ถณๅค ็ๆฌไฝๅฏไปฅๆดๆฐ | |
| if len(target_row_data) > qa_col_index: | |
| # *** ไฟฎๆญฃ้ป๏ผๅณ้็ตฆ update_sheet_cell ็ๅ่ๆธ 1 *** | |
| # ๅ ็บ่งๅฏๅฐ update_sheet_cell(N) ๆๆดๆฐๅฐ N+1 ๅ | |
| # ๆไปฅๆๅๅณ้ N-1๏ผ่ฎๅฎๅ ง้จ +1 ๅพๅๅฅฝๆฏ N | |
| row_index_to_pass = actual_target_row_number - 1 # ๅฐ 1-based ๅ่ๆธ 1 | |
| # ็บไบ log ๆธ ๆฐ๏ผไป็ถ้กฏ็คบ 1-based ็็ฎๆจๅ่ | |
| print(f"ๆบๅๆดๆฐ ็ฌฌ {actual_target_row_number} ๅ, ๆฌไฝ '{target_qa_column_name}' (็ดขๅผ {qa_col_index}) ็บ '{qa_result}'") | |
| # ๆทปๅ ไธ่ก log ไพ็ขบ่ชๅณ้็ๅผ | |
| print(f" (ๅณ้็ตฆ update_sheet_cell ็ row_index: {row_index_to_pass})") | |
| # --- ๅฏฆ้ๆดๆฐ Google Sheet ็็จๅผ็ขผ --- | |
| # ๅณ้่ชฟๆดๅพ็็ดขๅผ row_index_to_pass | |
| SHEET_SERVICE.update_sheet_cell(sheet_url, row_index_to_pass, qa_col_index, qa_result) | |
| else: | |
| # ๅฆๆ็ฎๆจๅ็้ทๅบฆไธ่ถณไปฅๅ ๅซ qa_col_index | |
| print(f"้ฏ่ชค๏ผ็ฌฌ {actual_target_row_number} ๅ (่ณๆ้ทๅบฆ {len(target_row_data)}) ๆฒๆ่ถณๅค ็ๆฌไฝไพๆดๆฐ QA (้่ฆ็ดขๅผ {qa_col_index})ใ") | |
| except ValueError: | |
| print(f"้ฏ่ชค๏ผๅจๆจ้ ญๅ {header_row} ไธญๆพไธๅฐ QA ๆฌไฝๅ็จฑ '{target_qa_column_name}'") | |
| # --- ๆดๆฐ้่ผฏ็ตๆ --- | |
| else: | |
| # ๅฆๆๆพไธๅฐ video_id๏ผๅฐๅบ้ฏ่ชค่จๆฏ | |
| print(f"้ฏ่ชค๏ผๅจๆฌไฝ '{target_column_name}' ไธญๆพไธๅฐ video_id '{video_id}'ใ") | |
| def refresh_video_LLM_all_content_by_sheet(sheet_url, sheet_qa_column, video_ids_input): # ๅๆธๅ็จฑๆน็บ video_ids_input ้ฟๅ ๆททๆท | |
| print("=== ้ๅงๆนๆฌก่็ๅทฅไฝ่กจๅทๆฐ ===") | |
| target_id_column_name = 'ๅไธๅนณๅฐ YT Readable ID' | |
| target_qa_column_name = "QA" | |
| sheet_qa_success_tag = "OO" | |
| sheet_qa_failed_tag = "XX" | |
| bucket_name = 'video_ai_assistant' | |
| # 1. ่งฃๆ่ผธๅ ฅ็ video_ids | |
| video_ids_list = video_ids_input.replace('\n', ',').split(',') | |
| video_ids_list = [vid.strip() for vid in video_ids_list if vid.strip() and vid.strip() != target_id_column_name] | |
| print(f"ๆบๅ่็็ Video IDs: {video_ids_list}") | |
| if not video_ids_list: | |
| print("ๆฒๆๆๆ็ Video ID ้่ฆ่็ใ") | |
| return {"success_video_ids": [], "failed_video_ids": []} | |
| # 2. ไธๆฌกๆง่ฎๅๅทฅไฝ่กจ่ณๆ | |
| try: | |
| print(f"ๆญฃๅจๅพ {sheet_url} ่ฎๅๅทฅไฝ่กจ่ณๆ...") | |
| sheet_data = SHEET_SERVICE.get_sheet_data_by_url(sheet_url) | |
| if not sheet_data or len(sheet_data) < 1: | |
| print("้ฏ่ชค๏ผๅทฅไฝ่กจ่ณๆ็บ็ฉบๆ็ผบๅฐๆจ้ ญๅใ") | |
| return {"success_video_ids": [], "failed_video_ids": []} | |
| print(f"ๆๅ่ฎๅ {len(sheet_data)} ๅ่ณๆใ") | |
| header_row = sheet_data[0] | |
| data_rows = sheet_data[1:] | |
| except Exception as e: | |
| print(f"่ฎๅๅทฅไฝ่กจๆ็ผ็้ฏ่ชค: {e}") | |
| return {"success_video_ids": [], "failed_video_ids": []} | |
| # 3. ๆพๅฐ็ฎๆจๆฌไฝ็ดขๅผไธฆๅปบ็ซ video_id ๅฐ row_number ็ๆ ๅฐ | |
| try: | |
| video_id_col_index = header_row.index(target_id_column_name) | |
| qa_col_index = header_row.index(target_qa_column_name) | |
| # Google Sheet ็ๆฌไฝๅญๆฏ (A=0, B=1, ...) | |
| qa_col_letter = chr(ord('A') + qa_col_index) # <--- ไฝฟ็จ้ๅ่ฎๆธ | |
| print(f"'{target_id_column_name}' ๆฌไฝ็ดขๅผ: {video_id_col_index}") | |
| print(f"'{target_qa_column_name}' ๆฌไฝ็ดขๅผ: {qa_col_index} (ๆฌไฝ {qa_col_letter})") | |
| # *** ๆฐๅข๏ผๅ่ฉฆๅๅพๅทฅไฝ่กจๅ็จฑ *** | |
| sheet_name = SHEET_SERVICE.get_sheet_name_by_url(sheet_url) # ๅ่จญ SHEET_SERVICE ๆๆญคๆนๆณ | |
| if not sheet_name: | |
| print("้ฏ่ชค๏ผ็กๆณๅพ URL ็ฒๅๅทฅไฝ่กจๅ็จฑใๅฐ็กๆณๅท่กๅณๆๆดๆฐใ") | |
| # ๅฏไปฅ้ธๆๅจ้่ฃก return ๆ่จญๅฎไธๅๆจ่ชไพ่ทณ้ๆดๆฐ | |
| # return {"success_video_ids": [], "failed_video_ids": list(set(video_ids_list))} # ็ฏไพ๏ผ็ดๆฅ่ฟๅๅคฑๆ | |
| can_update_sheet = False | |
| else: | |
| print(f"ๅๅพๅทฅไฝ่กจๅ็จฑ: {sheet_name}") | |
| can_update_sheet = True | |
| except ValueError as e: | |
| print(f"้ฏ่ชค๏ผๅจๆจ้ ญๅ {header_row} ไธญๆพไธๅฐๅฟ ่ฆ็ๆฌไฝ: {e}") | |
| return {"success_video_ids": [], "failed_video_ids": []} | |
| except AttributeError: | |
| print("้ฏ่ชค๏ผSHEET_SERVICE ๅฏ่ฝๆฒๆ get_sheet_name_by_url ๆนๆณใ่ซ็ขบ่ชๆๅฏฆไฝใ") | |
| can_update_sheet = False | |
| except Exception as e: # ๆๆๅ ถไปๅฏ่ฝ็้ฏ่ชค | |
| print(f"ๅๅพๅทฅไฝ่กจๅ็จฑๆ็ผ็ๆช้ ๆ้ฏ่ชค: {e}") | |
| can_update_sheet = False | |
| video_id_to_row_map = {} | |
| for i, row in enumerate(data_rows): | |
| if len(row) > video_id_col_index and row[video_id_col_index]: | |
| # ๅฒๅญ 1-based ็ๅทฅไฝ่กจๅ่ | |
| video_id_to_row_map[row[video_id_col_index]] = i + 2 # ่ณๆๅๅพ็ฌฌ 2 ๅ้ๅง (1-based) | |
| # 4. ๆนๆฌกๆชขๆฅ GCS ไธฆๅ้ก | |
| ids_to_set_oo_phase1 = [] # (video_id, row_number) | |
| ids_to_refresh = [] # video_id | |
| processed_ids = set() # ่ฟฝ่นคๅทฒ่็็ID๏ผ้ฟๅ ้่ค | |
| successfully_refreshed_ids = [] # <--- ็งปๅฐ้่ฃกๅๅงๅ | |
| failed_refresh_ids = [] # <--- ็งปๅฐ้่ฃกๅๅงๅ | |
| print("้ๅงๆชขๆฅ GCS ๆชๆก็ๆ ...") | |
| for video_id in video_ids_list: | |
| if video_id in processed_ids: | |
| continue | |
| processed_ids.add(video_id) | |
| if video_id not in video_id_to_row_map: | |
| print(f"่ญฆๅ๏ผ่ผธๅ ฅ็ video_id '{video_id}' ๅจๅทฅไฝ่กจ็ '{target_id_column_name}' ๆฌไฝไธญๆพไธๅฐ๏ผๅฐ่ทณ้ใ") | |
| continue | |
| row_number = video_id_to_row_map[video_id] | |
| file_name = f'{video_id}_transcript.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| try: | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if is_file_exists: | |
| print(f" - {video_id} (็ฌฌ {row_number} ๅ): GCS ๆชๆกๅญๅจ๏ผๆจ่จ็บ OOใ") | |
| ids_to_set_oo_phase1.append((video_id, row_number)) | |
| else: | |
| print(f" - {video_id} (็ฌฌ {row_number} ๅ): GCS ๆชๆกไธๅญๅจ๏ผ้่ฆๅทๆฐใ") | |
| ids_to_refresh.append(video_id) | |
| except Exception as e: | |
| print(f"ๆชขๆฅ GCS ๆชๆก {blob_name} ๆ็ผ็้ฏ่ชค: {e}๏ผๅฐๅ่ฉฆๅทๆฐใ") | |
| ids_to_refresh.append(video_id) # ๅฆๆๆชขๆฅๅบ้ฏ๏ผไนๆญธ้ก็บ้่ฆๅทๆฐ | |
| # 5. ๆนๆฌกๆดๆฐ "OO" (Phase 1: GCS ๅทฒๅญๅจ) | |
| if ids_to_set_oo_phase1 and can_update_sheet: # <--- ๅ ไธ can_update_sheet ๆชขๆฅ | |
| print(f"\nๆบๅๆนๆฌกๆดๆฐ {len(ids_to_set_oo_phase1)} ๅ Video ID ็ QA ็บ '{sheet_qa_success_tag}' (GCS ๅทฒๅญๅจ)...") | |
| update_data_oo1 = [] | |
| # sheet_name = SHEET_SERVICE.get_sheet_name_by_url(sheet_url) # ๅทฒๅจๅ้ขๅๅพ | |
| # if not sheet_name: | |
| # print("้ฏ่ชค๏ผ็กๆณ็ฒๅๅทฅไฝ่กจๅ็จฑ๏ผ็กๆณๅท่กๆนๆฌกๆดๆฐใ") | |
| # else: | |
| for vid, row_num in ids_to_set_oo_phase1: | |
| update_data_oo1.append({ | |
| 'range': f"{sheet_name}!{qa_col_letter}{row_num}", # <--- ไฝฟ็จ sheet_name ๅ qa_col_letter | |
| 'values': [[sheet_qa_success_tag]] # Google API batchUpdate ้ๅธธ้่ฆไบ็ถญๅ่กจ | |
| }) | |
| if update_data_oo1: # ็ขบไฟๅ่กจไธๆฏ็ฉบ็ | |
| try: | |
| # *** ๅ่จญ SHEET_SERVICE.batch_update_cells ๅญๅจ *** | |
| # *** ๆณจๆ๏ผ็ขบ่ช batch_update_cells ็ value ๆ ผๅผๆฏๅฆ้่ฆ [[value]] *** | |
| SHEET_SERVICE.batch_update_cells(sheet_url, update_data_oo1) | |
| print(f"ๆๅๆนๆฌกๆดๆฐ {len(update_data_oo1)} ๅๅฒๅญๆ ผ็บ '{sheet_qa_success_tag}'ใ") | |
| except Exception as e: | |
| print(f"ๆนๆฌกๆดๆฐ QA ็บ '{sheet_qa_success_tag}' ๆ็ผ็้ฏ่ชค: {e}") | |
| # ้่ฃกๅฏไปฅ่ๆ ฎๆฏๅฆ่ฆๅฐ้ไบ ID ็งปๅฐๅคฑๆๅ่กจ | |
| elif ids_to_set_oo_phase1 and not can_update_sheet: | |
| print(f"\n่ญฆๅ๏ผ็กๆณๅๅพๅทฅไฝ่กจๅ็จฑ๏ผ่ทณ้ๆนๆฌกๆดๆฐ {len(ids_to_set_oo_phase1)} ๅ GCS ๅทฒๅญๅจ็็ๆ ใ") | |
| # 6. ้ไธ่็้่ฆๅทๆฐ็ Video ID๏ผไธฆๅณๆๆดๆฐ Sheet ็ๆ | |
| if ids_to_refresh: | |
| print(f"\n้ๅง่็ {len(ids_to_refresh)} ๅ้่ฆๅทๆฐ็ Video ID (ๅณๆๆดๆฐ Sheet)...") | |
| # successfully_refreshed_ids = [] # <--- ๅพ้่ฃก็งป้ค | |
| # failed_refresh_ids = [] # <--- ๅพ้่ฃก็งป้ค | |
| # ้ๅ่ฟดๅ่ฒ ่ฒฌๅท่กๅทๆฐ๏ผไธฆๅจๆฏๆฌกๅฎๆๅพ็ซๅณๆดๆฐ Sheet | |
| for video_id in ids_to_refresh: | |
| row_number = video_id_to_row_map[video_id] # ๅฟ ๅฎๅญๅจ๏ผๅ้ขๅทฒๆชขๆฅ (1-based sheet row number) | |
| print(f" ๆญฃๅจๅทๆฐ {video_id} (็ฌฌ {row_number} ๅ)...") | |
| current_status_tag = sheet_qa_failed_tag # ้ ่จญๅคฑๆ | |
| try: | |
| refresh_video_LLM_all_content_by_id(video_id) | |
| print(f" - {video_id} ๅทๆฐๆๅใ") | |
| successfully_refreshed_ids.append((video_id, row_number)) # ไป็ถ่จ้ๆๅ๏ผไปฅๅๅพ็บ็ตฑ่จ | |
| current_status_tag = sheet_qa_success_tag # ่จญ็บๆๅ | |
| time.sleep(1) # ้ฟๅ ้ๆผ้ ป็น่งธ็ผๅ ถไป API (ไพๅฆ GCS ๅช้ค/ไธๅณ) | |
| except Exception as e: | |
| print(f" - {video_id} ๅทๆฐๅคฑๆ: {str(e)}") | |
| failed_refresh_ids.append((video_id, row_number)) # ไป็ถ่จ้ๅคฑๆ๏ผไปฅๅๅพ็บ็ตฑ่จ | |
| current_status_tag = sheet_qa_failed_tag # ็ขบ่ชๆฏๅคฑๆ | |
| time.sleep(5) # ๅคฑๆๆ็จไฝๅ้ | |
| finally: | |
| # ็ก่ซๆๅๆๅคฑๆ๏ผ้ฝๅ่ฉฆๆดๆฐ่ฉฒ่ก็็ๆ (ๅฆๆๅฏไปฅๆดๆฐ Sheet) | |
| if can_update_sheet: # <--- ๅ ไธ can_update_sheet ๆชขๆฅ | |
| try: | |
| # *** ไฝฟ็จ update_sheet_cell ๆดๆฐๅฎไธๅฒๅญๆ ผ *** | |
| # row_number ๆฏ 1-based ็ๅฏฆ้ๅ่ | |
| # update_sheet_cell ้่ฆ 0-based ็็ดขๅผ๏ผๆไปฅๅณ้ row_number - 1 | |
| target_row_index = row_number - 1 | |
| print(f" - ๆดๆฐ Sheet ็ฌฌ {row_number} ๅ, ๆฌไฝ็ดขๅผ {qa_col_index} ็ๆ ็บ '{current_status_tag}'...") | |
| # ๅผๅซ update_sheet_cell | |
| update_success = SHEET_SERVICE.update_sheet_cell( | |
| sheet_url=sheet_url, | |
| target_row_index_in_data=target_row_index, # ๅณ้ 0-based ็ดขๅผ | |
| qa_col_index=qa_col_index, # ๅณ้ 0-based ็ดขๅผ | |
| qa_result=current_status_tag # ๅณ้่ฆๅฏซๅ ฅ็ๅผ | |
| ) | |
| if update_success: | |
| print(f" - ็ฌฌ {row_number} ๅ็ๆ ๆดๆฐๆๅใ") | |
| else: | |
| # update_sheet_cell ๅ ง้จๅทฒ็ถ่จ้ไบ้ฏ่ชค๏ผ้่ฃกๅฏไปฅ้ธๆๆงๅฐๅ่จ้ไธๆฌก | |
| print(f" - !! ๆดๆฐ Sheet ็ฌฌ {row_number} ๅ็ๆ ๅคฑๆ (่ฉณ่ฆๅ ๅๆฅ่ช)ใ") | |
| except Exception as update_e: | |
| # ๆ็ฒๅผๅซ update_sheet_cell ๆฌ่บซๅฏ่ฝ็ผ็็ๆๅค้ฏ่ชค | |
| print(f" - !! ๅผๅซ update_sheet_cell ๆ็ผ็ๆช้ ๆ้ฏ่ชค: {str(update_e)}") | |
| # ๅณไฝฟๆดๆฐ Sheet ๅคฑๆ๏ผไน่ฆ่จ้ไธไพ | |
| else: | |
| print(f" - ็กๆณๅๅพๅทฅไฝ่กจๅ็จฑ๏ผ่ทณ้ๆดๆฐ Sheet ็ฌฌ {row_number} ๅ็ๆ ใ") | |
| # 7. ๆด็ๆ็ต็ตๆ (็งป้ค่็ๆนๆฌกๆดๆฐ้่ผฏ) | |
| print("\n=== ๆนๆฌก่็ๅฎๆ ===") | |
| # ็พๅจ้ๅ ฉ่กๆ่ฉฒ็ธฝๆฏๅฎๅ จ็๏ผๅ ็บ successfully_refreshed_ids ๅ failed_refresh_ids ่ณๅฐๆฏ [] | |
| final_success_ids = [item[0] for item in ids_to_set_oo_phase1] + [item[0] for item in successfully_refreshed_ids] | |
| final_failed_ids = [item[0] for item in failed_refresh_ids] | |
| # ๅฐๅจๅทฅไฝ่กจไธญๆพไธๅฐ็ ID ไน่ฆ็บๅคฑๆ | |
| initial_not_found = [vid for vid in video_ids_list if vid not in video_id_to_row_map] | |
| final_failed_ids.extend(initial_not_found) | |
| print(f"ๆๅ่็ (ๆ GCS ๅทฒๅญๅจ): {len(final_success_ids)} ๅ") | |
| print(f"่็ๅคฑๆ (ๆๆชๆพๅฐ/ๅทๆฐๅคฑๆ): {len(final_failed_ids)} ๅ") | |
| result = { | |
| "success_video_ids": final_success_ids, | |
| "failed_video_ids": list(set(final_failed_ids)) # ๅป้ | |
| } | |
| return result | |
| def refresh_video_LLM_all_content_by_id(video_id): | |
| print(f"===refresh_all_LLM_content===") | |
| print(f"video_id: {video_id}") | |
| print(f"===delete_blobs_by_folder_name: {video_id}===") | |
| bucket_name = 'video_ai_assistant' | |
| GCS_SERVICE.delete_blobs_by_folder_name(bucket_name, video_id) | |
| print(f"ๆๆไปฅ {video_id} ้้ ญ็ๆชๆกๅทฒๅช้ค") | |
| # process_youtube_link | |
| video_link = f"https://www.youtube.com/watch?v={video_id}" | |
| process_youtube_link(PASSWORD, video_link) | |
| def refresh_video_LLM_all_content_by_ids(video_ids): | |
| # ่ผธๅ ฅๅฝฑ็ id๏ผไปฅ , ้่ๅ้ ๆๆฏ \n ๆ่ก | |
| video_id_list = video_ids.replace('\n', ',').split(',') | |
| video_id_list = [vid.strip() for vid in video_id_list if vid.strip()] | |
| success_video_ids = [] | |
| failed_video_ids = [] | |
| for video_id in video_id_list: | |
| try: | |
| refresh_video_LLM_all_content_by_id(video_id) | |
| success_video_ids.append(video_id) | |
| except Exception as e: | |
| print(f"===refresh_all_LLM_content error===") | |
| print(f"video_id: {video_id}") | |
| print(f"error: {str(e)}") | |
| print(f"===refresh_all_LLM_content error===") | |
| failed_video_ids.append(video_id) | |
| result = { | |
| "success_video_ids": success_video_ids, | |
| "failed_video_ids": failed_video_ids | |
| } | |
| return result | |
| # AI ็ๆๆๅญธ็ด ๆ | |
| def get_meta_data(video_id, source="gcs"): | |
| if source == "gcs": | |
| print("===get_meta_data on gcs===") | |
| gcs_client = GCS_CLIENT | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_meta_data.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_file_exists: | |
| meta_data_json = { | |
| "subject": "", | |
| "grade": "", | |
| } | |
| print("meta_data empty return") | |
| else: | |
| # meta_dataๅทฒๅญๅจ๏ผไธ่ฝฝๅ ๅฎน | |
| print("meta_dataๅทฒๅญๅจไบGCSไธญ") | |
| meta_data_text = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| meta_data_json = json.loads(meta_data_text) | |
| # meta_data_json grade ๆธๅญ่ฝๆๆๆๅญ | |
| grade = meta_data_json["grade"] | |
| case = { | |
| 1: "ไธๅนด็ด", | |
| 2: "ไบๅนด็ด", | |
| 3: "ไธๅนด็ด", | |
| 4: "ๅๅนด็ด", | |
| 5: "ไบๅนด็ด", | |
| 6: "ๅ ญๅนด็ด", | |
| 7: "ไธๅนด็ด", | |
| 8: "ๅ ซๅนด็ด", | |
| 9: "ไนๅนด็ด", | |
| 10: "ๅๅนด็ด", | |
| 11: "ๅไธๅนด็ด", | |
| 12: "ๅไบๅนด็ด", | |
| } | |
| grade_text = case.get(grade, "") | |
| meta_data_json["grade"] = grade_text | |
| return meta_data_json | |
| def get_ai_content(password, user_data, video_id, df_string, topic, grade, level, specific_feature, content_type, source="gcs"): | |
| verify_password(password) | |
| if source == "gcs": | |
| print("===get_ai_content on gcs===") | |
| bucket_name = 'video_ai_assistant' | |
| file_name = f'{video_id}_ai_content_list.json' | |
| blob_name = f"{video_id}/{file_name}" | |
| # ๆฃๆฅๆชๆกๆฏๅฆๅญๅจ | |
| is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) | |
| if not is_file_exists: | |
| # ๅ ๅปบ็ซไธๅ ai_content_list.json | |
| ai_content_list = [] | |
| ai_content_text = json.dumps(ai_content_list, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, ai_content_text) | |
| print("ai_content_list [] ๅทฒไธๅณๅฐGCS") | |
| # ๆญคๆ ai_content_list ๅทฒๅญๅจ | |
| ai_content_list_string = GCS_SERVICE.download_as_string(bucket_name, blob_name) | |
| ai_content_list = json.loads(ai_content_list_string) | |
| # by key ๆพๅฐ ai_content ๏ผtopic, grade, level, specific_feature, content_type๏ผ | |
| target_kvs = { | |
| "video_id": video_id, | |
| "level": level, | |
| "specific_feature": specific_feature, | |
| "content_type": content_type | |
| } | |
| ai_content_json = [ | |
| item for item in ai_content_list | |
| if all(item[k] == v for k, v in target_kvs.items()) | |
| ] | |
| if len(ai_content_json) == 0: | |
| ai_content, prompt = generate_ai_content(password, df_string, topic, grade, level, specific_feature, content_type) | |
| ai_content_json = { | |
| "video_id": video_id, | |
| "content": str(ai_content), | |
| "prompt": prompt, | |
| "level": level, | |
| "specific_feature": specific_feature, | |
| "content_type": content_type | |
| } | |
| ai_content_list.append(ai_content_json) | |
| ai_content_text = json.dumps(ai_content_list, ensure_ascii=False, indent=2) | |
| GCS_SERVICE.upload_json_string(bucket_name, blob_name, ai_content_text) | |
| print("ai_contentๅทฒไธๅณๅฐGCS") | |
| # insert_log_to_bigquery usage | |
| data_endpoint = "chat_completions" | |
| else: | |
| ai_content_json = ai_content_json[-1] | |
| ai_content = ai_content_json["content"] | |
| prompt = ai_content_json["prompt"] | |
| # insert_log_to_bigquery usage | |
| data_endpoint = "gcs" | |
| # send data to GBQ | |
| user_id = user_data | |
| route = "get_ai_content" | |
| endpoint = data_endpoint | |
| event_response = {"event_response": str(ai_content)} | |
| event_response_json = json.dumps(event_response) | |
| prompt = ai_content_json | |
| prompt_json = json.dumps(prompt) | |
| feature = content_type | |
| insert_log_to_bigquery(user_id, route, endpoint, event_response_json, prompt_json, feature) | |
| return ai_content, ai_content, prompt, prompt | |
| def generate_ai_content(password, df_string, topic, grade, level, specific_feature, content_type): | |
| verify_password(password) | |
| material = EducationalMaterial(df_string, topic, grade, level, specific_feature, content_type) | |
| prompt = material.generate_content_prompt() | |
| try: | |
| ai_content = material.get_ai_content(OPEN_AI_CLIENT, ai_type="openai") | |
| except Exception as e: | |
| error_msg = f" {video_id} OPEN AI ็ๆๆๅญธ็ด ๆ้ฏ่ชค: {str(e)}" | |
| print("===generate_ai_content error===") | |
| print(error_msg) | |
| print("===generate_ai_content error===") | |
| ai_content = material.get_ai_content(BEDROCK_CLIENT, ai_type="bedrock") | |
| return ai_content, prompt | |
| def generate_ai_content_fine_tune_result(password, user_data, exam_result_prompt , df_string_output, exam_result, exam_result_fine_tune_prompt, content_type): | |
| verify_password(password) | |
| material = EducationalMaterial(df_string_output, "", "", "", "", "") | |
| try: | |
| fine_tuned_ai_content = material.get_fine_tuned_ai_content(OPEN_AI_CLIENT, "openai", exam_result_prompt, exam_result, exam_result_fine_tune_prompt) | |
| except: | |
| fine_tuned_ai_content = material.get_fine_tuned_ai_content(BEDROCK_CLIENT, "bedrock", exam_result_prompt, exam_result, exam_result_fine_tune_prompt) | |
| # send data to GBQ | |
| user_id = user_data | |
| route = "generate_ai_content_fine_tune_result" | |
| endpoint = "chat_completions" | |
| event_response = {"event_response": str(fine_tuned_ai_content)} | |
| event_response_json = json.dumps(event_response) | |
| prompt = { | |
| "exam_result_prompt": exam_result_prompt, | |
| "exam_result_fine_tune_prompt": exam_result_fine_tune_prompt | |
| } | |
| prompt_json = json.dumps(prompt) | |
| feature = content_type | |
| insert_log_to_bigquery(user_id, route, endpoint, event_response_json, prompt_json, feature) | |
| return fine_tuned_ai_content | |
| def return_original_exam_result(exam_result_original): | |
| return exam_result_original | |
| def create_word(content): | |
| unique_filename = str(uuid.uuid4()) | |
| word_file_path = f"/tmp/{unique_filename}.docx" | |
| doc = Document() | |
| doc.add_paragraph(content) | |
| doc.save(word_file_path) | |
| return word_file_path | |
| def download_exam_result(content): | |
| word_path = create_word(content) | |
| return word_path | |
| # ---- Chatbot ---- | |
| def get_instructions(content_subject, content_grade, transcript_text, key_moments, socratic_mode=True): | |
| if socratic_mode: | |
| method = "Socratic style, guide thinking, no direct answers. this is very important, please be seriously following." | |
| else: | |
| method = "direct answers, but encourage user to think more." | |
| instructions = f""" | |
| subject: {content_subject} | |
| grade: {content_grade} | |
| context: {key_moments} | |
| transcript_text: {transcript_text} | |
| Assistant Role: you are a {content_subject} assistant. you can call yourself as {content_subject} ๅญธไผด and your name if you know | |
| User Role: {content_grade} th-grade student. | |
| Method: {method} | |
| Language: Traditional Chinese ZH-TW (it's very important), suitable for {content_grade} th-grade level. | |
| Strategy: | |
| - You are a professional tutor, and you will use the following teaching strategies based on the textbook content. | |
| # General Strategies | |
| Needs Analysis: | |
| The tutor/assistant teacher should be able to conduct dynamic needs analysis based on the student's responses. Use questions to understand the student's needs and difficulties. | |
| Example questions: "What do you want to learn today?" or "What difficulties are you encountering in this part of the content?" | |
| Dynamic Goal Setting: | |
| Set learning goals based on student feedback, which can be short-term or long-term. The tutor/assistant teacher can adjust the plan automatically according to the student's progress. | |
| Example questions: "What is our goal for this week?" or "What tasks do you hope to complete today?" | |
| Flexible Teaching Methods: | |
| Provide different teaching methods and resources based on the student's age and learning style. The tutor/assistant teacher can adjust teaching strategies based on student feedback. | |
| Example questions: "Do you prefer learning through videos or reading materials?" or "We can understand this problem through examples, what do you think?" | |
| Patience and Encouragement: | |
| Provide positive feedback and encouragement, especially when students encounter difficulties. The tutor/assistant teacher should be able to detect the student's emotions and provide appropriate support. | |
| Example questions: "Don't worry, let's try again." or "You did well, keep it up!" | |
| Regular Feedback and Evaluation: | |
| Regularly evaluate the student's learning progress and provide feedback. The tutor/assistant teacher can use tests and practice questions to assess the student's understanding. | |
| Example questions: "Let's check your progress." or "How do you feel about your learning progress during this period?" | |
| Good Communication Skills: | |
| Maintain good communication with students, responding to their questions and needs in a timely manner. The tutor/assistant teacher should be able to identify and solve students' problems. | |
| Example questions: "Is there any problem that you need my help with?" or "Is this part clear to you?" | |
| Maintaining Professionalism: | |
| Continue learning and improving teaching skills, and maintain punctuality and responsibility. The tutor/assistant teacher should provide accurate and up-to-date information. | |
| Example questions: "What is our learning goal for today?" or "Remember to study a little bit every day, and gradually accumulate knowledge." | |
| Creating a Positive Learning Environment: | |
| Create a positive, supportive, and motivating learning atmosphere. The tutor/assistant teacher should suggest students take breaks and relax at appropriate times. | |
| Example questions: "Let's take a break and continue studying afterward." or "How do you feel about this learning environment? Do we need any adjustments?" | |
| # Specific Applications | |
| The tutor/assistant teacher can automatically adjust the depth and complexity of the questions based on these general strategies by grade. For example: | |
| - Kindergarten and Elementary School Students: Use simple vocabulary and concrete examples, with more pictures and gamified content. | |
| - Middle School Students: Use interactive and practical methods, such as quizzes and group discussions. | |
| - High School Students: Use deep learning and critical thinking exercises, such as project research and discussions. | |
| - Adult Learners: Emphasize practical applications and work-related content, such as case studies and workshops. | |
| Response: | |
| - if user say hi or hello or any greeting, just say hi back and introduce yourself. Then tell user to ask question in context. | |
| - include math symbols (use LaTeX $ to cover before and after, ex: $x^2$) | |
| - hint with video timestamp which format ใๅ่๏ผ00:00:00ใ. | |
| - Sometimes encourage user with relaxing atmosphere. | |
| - if user ask questions not include in context, just tell them to ask the question in context and give them example question. | |
| Restrictions: | |
| - Answer within video content, no external references | |
| - don't repeat user's question, guide them to think more. | |
| - don't use simple-chinese words, use ZH-TW words. such as below: | |
| - intead of ่ฆ้ ป, use ๅฝฑ็. | |
| - instead of ๅฎ่ชๅก, use ๅคช็ฉบไบบ | |
| - instead of ่จ็ฎๆฉ, use ้ป่ ฆ | |
| - instead of ้ผ ๆจ, use ๆป้ผ | |
| - instead of ๅ้ต, use ๆท้ | |
| - instead of ๅฑๅน, use ่ขๅน | |
| - instead of ๅไธญ, use ๅไธญ | |
| - instead of ้ ๅฐ, use ้ทๅฎ | |
| - instead of ่ปไปถ, use ่ป้ซ | |
| - instead of ็กฌไปถ, use ็กฌ้ซ | |
| - instead of ๅ ฌๅฎ, use ่ญฆๅฏ | |
| - instead of ๆธ ้, use ้่ทฏ | |
| - instead of ไฟกๆฏ, use ่ณ่จ | |
| - instead of ็ฝ็ป, use ็ถฒ่ทฏ | |
| - instead of ็ฝ็ซ, use ็ถฒ็ซ | |
| - instead of ็ต่ง, use ้ป่ฆ | |
| - instead of ็ตๅฝฑ, use ้ปๅฝฑ | |
| - instead of ็ต่, use ้ป่ ฆ | |
| - instead of ็ต่ฏ, use ้ป่ฉฑ | |
| - instead of ๆๆฌ, use ๆไปถ | |
| - instead of ่กไธ, use ็ขๆฅญ | |
| - instead of ไผไธ, use ๅ ฌๅธ | |
| - instead of ไบงๅ, use ็ขๅ | |
| - instead of ๆๅก, use ๆๅ | |
| """ | |
| return instructions | |
| def get_chat_moderation(user_content): | |
| # response = client.moderations.create(input=text) | |
| response = OPEN_AI_CLIENT.moderations.create(input=user_content) | |
| response_dict = response.model_dump() | |
| is_flagged = response_dict['results'][0]['flagged'] | |
| print("========get_chat_moderation==========") | |
| print(f"is_flagged: {is_flagged}") | |
| print(response_dict) | |
| print("========get_chat_moderation==========") | |
| return is_flagged, response_dict | |
| def chat_with_any_ai(ai_type, password, video_id, user_data, transcript_state, key_moments, user_message, chat_history, content_subject, content_grade, questions_answers_json, socratic_mode=False, thread_id=None, ai_name=None): | |
| print(f"ai_type: {ai_type}") | |
| print(f"user_data: {user_data}") | |
| print(f"===thread_id:{thread_id}===") | |
| verify_password(password) | |
| verify_message_length(user_message, max_length=1500) | |
| is_questions_answers_exists, question_message, answer_message = check_questions_answers(user_message, questions_answers_json) | |
| if is_questions_answers_exists: | |
| chat_history = update_chat_history(question_message, answer_message, chat_history) | |
| send_btn_update, send_feedback_btn_update = update_send_and_feedback_buttons(chat_history, CHAT_LIMIT) | |
| time.sleep(3) | |
| return "", chat_history, send_btn_update, send_feedback_btn_update, thread_id | |
| verify_chat_limit(chat_history, CHAT_LIMIT) | |
| is_flagged, response_dict = get_chat_moderation(user_message) | |
| if ai_type == "chat_completions": | |
| if is_flagged: | |
| response_text = "ๆจ็็่จๅทฒ่ขซๆจ่จ็บไธ็ถๅ งๅฎน๏ผ่ซ้ๆฐ็ผ้ใ" | |
| else: | |
| chatbot_config = get_chatbot_config(ai_name, transcript_state, key_moments, content_subject, content_grade, video_id, socratic_mode) | |
| chatbot = Chatbot(chatbot_config) | |
| response_text = chatbot.chat(user_message, chat_history) | |
| # if thread_id is none, create random thread_id + timestamp | |
| if thread_id is None or thread_id == "": | |
| thread_id = "thread_" + str(uuid.uuid4()) + str(int(time.time())) | |
| print(f"===thread_id:{thread_id}===") | |
| metadata = { | |
| "video_id": video_id, | |
| "user_data": user_data, | |
| "content_subject": content_subject, | |
| "content_grade": content_grade, | |
| "socratic_mode": str(socratic_mode), | |
| "assistant_id": ai_name, | |
| "is_streaming": "false", | |
| "moderation_is_flagged": str(is_flagged), | |
| # "moderation_response_dict": str(response_dict) | |
| } | |
| elif ai_type == "assistant": | |
| client = OPEN_AI_CLIENT | |
| assistant_id = OPEN_AI_ASSISTANT_ID_GPT4 | |
| metadata={ | |
| "video_id": video_id, | |
| "user_data": user_data, | |
| "content_subject": content_subject, | |
| "content_grade": content_grade, | |
| "socratic_mode": str(socratic_mode), | |
| "assistant_id": assistant_id, | |
| "is_streaming": "false", | |
| "moderation_is_flagged": str(is_flagged), | |
| # "moderation_response_dict": str(response_dict) | |
| } | |
| if is_flagged: | |
| response_text = "ๆจ็็่จๅทฒ่ขซๆจ่จ็บไธ็ถๅ งๅฎน๏ผ่ซ้ๆฐ็ผ้ใ" | |
| else: | |
| if isinstance(key_moments, str): | |
| key_moments_json = json.loads(key_moments) | |
| else: | |
| key_moments_json = key_moments | |
| # key_moments_json remove images | |
| for moment in key_moments_json: | |
| moment.pop('images', None) | |
| moment.pop('end', None) | |
| moment.pop('transcript', None) | |
| moment.pop('suggested_images', None) | |
| if isinstance(transcript_state, str): | |
| transcript_state_json = json.loads(transcript_state) | |
| else: | |
| transcript_state_json = transcript_state | |
| # remain only text | |
| transcript_text = "" | |
| for content in transcript_state_json: | |
| transcript_text += content["text"] + "," | |
| key_moments_text = json.dumps(key_moments_json, ensure_ascii=False) | |
| instructions = get_instructions(content_subject, content_grade, transcript_text, key_moments_text, socratic_mode) | |
| print(f"=== instructions:{instructions} ===") | |
| user_message_note = "/n ่ซๅดๆ ผ้ตๅพชinstructions๏ผๆไปปไธไฝ่ๆ ผๆๅบๅฎถๆ๏ผ็ตๅฐไธ่ฆ้่ค user ็ๅๅฅ๏ผ่ซ็จๅผๅฐ็ๆนๅผๆๅผๆนๅ๏ผ่ซไธๅฎ่ฆ็จ็น้ซไธญๆๅ็ญ zh-TW๏ผไธฆ็จๅฐ็ฃไบบ็็ฆฎ่ฒๅฃ่ช่กจ้๏ผๅ็ญๆไธ่ฆ็นๅฅ่ชชๆ้ๆฏๅฐ็ฃไบบ็่ชๆฐฃ๏ผ่ซๅจๅ็ญ็ๆๅพๆจ่จปใๅ่๏ผ๏ผๆ๏ผ:๏ผๅ๏ผ:๏ผ็ง๏ผใ๏ผ๏ผๅฆๆๆฏๅๅๅญธ็๏ผๅฐฑๅชๅไธๅๅ้ก๏ผ่ซๅนซๅฉๅญธ็ๆดๅฅฝ็็่งฃ่ณๆ๏ผๅญๆธๅจ100ๅญไปฅๅ ง๏ผๅ็ญๆๅฆๆ่ฌๅฐๆธๅญธๅฐๆๅ่ฉ๏ผ่ซ็จๆธๅญธ็ฌฆ่ไปฃๆฟๆๅญ๏ผLatex ็จ $ ๅญ่ render, ex: $x^2$)" | |
| user_content = user_message + user_message_note | |
| response_text, thread_id = handle_conversation_by_open_ai_assistant(client, user_content, instructions, assistant_id, thread_id, metadata, fallback=True) | |
| # ๆดๆฐ่ๅคฉๅๅฒ | |
| chat_history = update_chat_history(user_message, response_text, chat_history) | |
| send_btn_update, send_feedback_btn_update = update_send_and_feedback_buttons(chat_history, CHAT_LIMIT) | |
| user_id = user_data | |
| route = "chat_with_any_ai" | |
| endpoint = ai_type #chat_completions or assistant | |
| event_response = { | |
| "event_response": str(response_text), | |
| } | |
| event_response_json = json.dumps(event_response) | |
| prompt = { | |
| "thread_id": thread_id, | |
| "metadata": metadata, | |
| "user_message": user_message | |
| } | |
| prompt_json = json.dumps(prompt) | |
| feature = "vaitor_chatbot" | |
| insert_log_to_bigquery(user_id, route, endpoint, event_response_json, prompt_json, feature) | |
| # ่ฟๅ่ๅคฉๅๅฒๅ็ฉบๅญ็ฌฆไธฒๆธ ็ฉบ่พๅ ฅๆก | |
| return "", chat_history, send_btn_update, send_feedback_btn_update, thread_id | |
| def get_chatbot_config(ai_name, transcript_state, key_moments, content_subject, content_grade, video_id, socratic_mode=True): | |
| if not ai_name in ["foxcat", "lili", "maimai"]: | |
| ai_name = "foxcat" | |
| ai_name_clients_model = { | |
| "foxcat": { | |
| "ai_name": "foxcat", | |
| "ai_client": PERPLEXITY_CLIENT, | |
| "ai_model_name": "perplexity_sonar", | |
| }, | |
| "lili": { | |
| "ai_name": "lili", | |
| "ai_client": PERPLEXITY_CLIENT, | |
| "ai_model_name": "perplexity_r1_1776", | |
| }, | |
| "maimai": { | |
| "ai_name": "maimai", | |
| "ai_client": PERPLEXITY_CLIENT, | |
| "ai_model_name": "perplexity_r1_1776", | |
| } | |
| } | |
| ai_client = ai_name_clients_model.get(ai_name, "foxcat")["ai_client"] | |
| ai_model_name = ai_name_clients_model.get(ai_name, "foxcat")["ai_model_name"] | |
| if isinstance(transcript_state, str): | |
| simple_transcript = json.loads(transcript_state) | |
| else: | |
| simple_transcript = transcript_state | |
| if isinstance(key_moments, str): | |
| key_moments_json = json.loads(key_moments) | |
| else: | |
| key_moments_json = key_moments | |
| # key_moments_json remove images | |
| for moment in key_moments_json: | |
| moment.pop('images', None) | |
| moment.pop('end', None) | |
| moment.pop('transcript', None) | |
| moment.pop('suggested_images', None) | |
| key_moments_text = json.dumps(key_moments_json, ensure_ascii=False) | |
| if isinstance(transcript_state, str): | |
| transcript_state_json = json.loads(transcript_state) | |
| else: | |
| transcript_state_json = transcript_state | |
| # remain only text | |
| transcript_text = "" | |
| for content in transcript_state_json: | |
| transcript_text += content["text"] + "," | |
| instructions = get_instructions(content_subject, content_grade, transcript_text, key_moments_text, socratic_mode) | |
| chatbot_config = { | |
| "video_id": video_id, | |
| "transcript": simple_transcript, | |
| "key_moments": key_moments, | |
| "content_subject": content_subject, | |
| "content_grade": content_grade, | |
| "jutor_chat_key": JUTOR_CHAT_KEY, | |
| "ai_model_name": ai_model_name, | |
| "ai_client": ai_client, | |
| "instructions": instructions | |
| } | |
| return chatbot_config | |
| def feedback_with_ai(user_data, ai_type, chat_history, thread_id=None): | |
| # prompt: ่ซไพๆไปฅไธ็ๅฐ่ฉฑ(chat_history)๏ผ็ธฝ็ตๆ็ใๆๅๅใ๏ผไธฆ็ตฆไบๆๆฏๅฆๆใๅๅฐๅ้กใ็ๅ้ฅๅๅปบ่ญฐ | |
| system_content = """ | |
| ไฝ ๆฏไธๅๆ ้ทๅผๅฐๅ็ญ็ด ้ค็่ๅธซ๏ผuser ็บๅญธ็็ๆๅ่ทๅ็ญ๏ผ่ซ็ฒพ่ฎๅฐ่ฉฑ้็จ๏ผ้ๅฐ user ็ตฆไบๅ้ฅๅฐฑๅฅฝ๏ผๆ นๆไปฅไธ Rule: | |
| - ่ซไฝฟ็จ็น้ซไธญๆ zh-TW ็ธฝ็ต user ็ๆๅๅ๏ผไธฆ็ตฆไบๆฏๅฆๆๅๅฐๅ้ก็ๅ้ฅๅๅปบ่ญฐ | |
| - ไธๆก่จใ้ ่จญๆๅใ็ๅ้ก๏ผๅฆๆ user ็ๆๅ้ฝไพ่ชใ้ ่จญๆๅใ๏ผ่กจ้็จๆถๅๆผไฝฟ็จ็ณป็ตฑ๏ผ่ซ็ตฆไบๅ้ฅไธฆ้ผๅต user ่ฆช่ชๆๅๆดๅ ท้ซ็ๅ้ก | |
| - ๅฆๆ็จๆถๆๅ้ฝ็ธ็ถ็ฐก็ญ๏ผ็่ณๅฐฑๆฏไธๅๅญๆ้ฝๆฏไธๅๆธๅญ๏ผๅๆฏ user: 1, user:2๏ผ๏ผ่ซ็ตฆไบๅ้ฅไธฆๅปบ่ญฐ user ๆๅๆดๅ ท้ซ็ๅ้ก | |
| - ๅฆๆ็จๆถๆๅๅ งๅฎนๅชๆ็ฌฆ่ๆๆฏไบ็ขผ๏ผๅๆฏ๏ผ,๏ผ, ..., 3bhwbqhfw2vve2 ็ญ๏ผ่ซ็ตฆไบๅ้ฅไธฆๅปบ่ญฐ user ๆๅๆดๅ ท้ซ็ๅ้ก | |
| - ๅฆๆ็จๆถๆๅๅ งๅฎนๆ่ฒๆ ใๆดๅใไปๆจใไธ็ถ่จ่ซ็ญ๏ผ่ซ็ตฆไบๅดๅฒ็ๅ้ฅไธฆๅปบ่ญฐ user ๆๅๆดๅ ท้ซ็ๅ้ก | |
| - ไธฆ็จ็ฌฌไบไบบ็จฑใไฝ ใไพไปฃ่กจ user | |
| - ่ซ็ฆฎ่ฒ๏ผไธฆ็ตฆไบ้ผๅต | |
| """ | |
| chat_history_conversation = "" | |
| # ๆจ่จป user and assistant as string | |
| # chat_history ็ฌฌไธ็ตไธๆก่จ | |
| for chat in chat_history[1:]: | |
| user_message = chat[0] | |
| assistant_message = chat[1] | |
| chat_history_conversation += f"User: {user_message}\nAssistant: {assistant_message}\n" | |
| feedback_request_message = "่ซไพๆไปฅไธ็ๅฐ่ฉฑ๏ผ็ธฝ็ตๆ็ใๆๅๅใ๏ผไธฆ็ตฆไบๆๆฏๅฆๆใๅๅฐๅ้กใ็ๅ้ฅๅๅปบ่ญฐ" | |
| user_content = f"""conversation: {chat_history_conversation} | |
| {feedback_request_message} | |
| ๆๅพๆ นๆๆๅๅ่กจ็พ๏ผ็ตฆไบๆๅๅปบ่ญฐใๆๅ่กจ็พ๏ผไธฆ็จ emoji ไพ่กจ็คบ่ฉๅ๏ผ | |
| ๐ข๏ผ๏ผ่กจ็พๅพๅฅฝ็ๅ้ฅ๏ผ็ตฆไบๆญฃๅ่ฏๅฎ๏ผ | |
| ๐ก๏ผ๏ผ้ๅฏไปฅๅ ๆฒน็็ๅ้ฅ๏ผ็ตฆไบๆ็ขบ็ๅปบ่ญฐ๏ผ | |
| ๐ด๏ผ๏ผ้ๅธธไธๆๆๅ็ๅ้ฅ๏ผ็ตฆไบ้ผๅตไธฆ็ตฆๅบๆ็ขบ็คบ็ฏ๏ผ | |
| example: | |
| ๅฆไธๆน้ข๏ผไฝ ่กจ้ใๆไธๆณๅญธไบใ้ๅๆ ๆ๏ผๅ ถๅฏฆไนๆฏไธ็จฎ้่ฆ็ๅ้ฅใ้้กฏ็คบไฝ ๅฏ่ฝๆๅฐๆซๆๆ็ฒๅฆใๅจ้็จฎๆ ๆณไธ๏ผ่กจ้ๅบไฝ ็ๆๅๆฏๅฅฝ็๏ผไฝๅฆๆ่ฝๅ ท้ซ่ชชๆๆฏไป้บผ่ฎไฝ ๆๅฐ้ๆจฃ๏ผๆๆฏๆไป้บผๅ ท้ซ็ๅญธ็ฟ้็ค๏ผๆๆดๆๅฉๆผๆพๅฐ่งฃๆฑบๆนๆกใ | |
| ็ตฆไบไฝ ็ๅปบ่ญฐๆฏ๏ผๅ่ฉฆๅจๆๅๆๆดๆ็ขบไธไบ๏ผ้ๆจฃไธๅ ่ฝๅนซๅฉไฝ ็ฒๅพๆดๅฅฝ็ๅญธ็ฟๆฏๆ๏ผไน่ฝๆ้ซไฝ ็ๅ้ก่งฃๆฑบๆๅทงใ | |
| ...... | |
| ๆๅๅปบ่ญฐ๏ผๅจๆๅๆ๏ผ่ฉฆ่ๅ ท้ซไธฆๆธ ๆฐๅฐ่กจ้ไฝ ็้ๆฑๅ็ๆ๏ผ้ๆจฃ่ฝๆดๆๆๅฐๅพๅฐๅนซๅฉใ | |
| ๆๅ่กจ็พ๏ผใ๐กใๅ ๆฒน๏ผๆ็บ็ทด็ฟ๏ผไฝ ็ๆๅๅๆ่ถไพ่ถๅฅฝ๏ผ | |
| """ | |
| client = OPEN_AI_CLIENT | |
| if ai_type == "chat_completions": | |
| model_name = "gpt-4o" | |
| response_text = handle_conversation_by_open_ai_chat_completions(client, model_name, user_content, system_content) | |
| elif ai_type == "assistant": | |
| assistant_id = OPEN_AI_ASSISTANT_ID_GPT4 #GPT 4 turbo | |
| # assistant_id = OPEN_AI_ASSISTANT_ID_GPT3 #GPT 3.5 turbo | |
| response_text, thread_id = handle_conversation_by_open_ai_assistant(client, user_content, system_content, assistant_id, thread_id, metadata=None, fallback=True) | |
| chat_history = update_chat_history(feedback_request_message, response_text, chat_history) | |
| feedback_btn_update = gr.update(value="ๅทฒๅ้ฅ", interactive=False, variant="secondary") | |
| user_id = user_data | |
| route = "feedback_with_ai" | |
| endpoint = ai_type #chat_completions or assistant | |
| event_response = { | |
| "event_response": str(response_text), | |
| } | |
| event_response_json = json.dumps(event_response) | |
| prompt = { | |
| "thread_id": thread_id, | |
| "metadata": None, | |
| "user_message": user_content | |
| } | |
| prompt_json = json.dumps(prompt) | |
| feature = "vaitor_chatbot" | |
| insert_log_to_bigquery(user_id, route, endpoint, event_response_json, prompt_json, feature) | |
| return chat_history, feedback_btn_update | |
| def handle_conversation_by_open_ai_chat_completions(client, model_name, user_content, system_content): | |
| response = client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system_content}, | |
| {"role": "user", "content": user_content} | |
| ], | |
| max_tokens=4000, | |
| ) | |
| response_text = response.choices[0].message.content.strip() | |
| return response_text | |
| def handle_conversation_by_open_ai_assistant(client, user_message, instructions, assistant_id, thread_id=None, metadata=None, fallback=False): | |
| """ | |
| Handles the creation and management of a conversation thread. | |
| :param client: The OpenAI client object. | |
| :param thread_id: The existing thread ID, if any. | |
| :param user_message: The message from the user. | |
| :param instructions: System instructions for the assistant. | |
| :param assistant_id: ID of the assistant to use. | |
| :param metadata: Additional metadata to add to the thread. | |
| :param fallback: Whether to use a fallback method in case of failure. | |
| :return: A string with the response text or an error message. | |
| """ | |
| try: | |
| if not thread_id: | |
| thread = client.beta.threads.create() | |
| thread_id = thread.id | |
| else: | |
| thread = client.beta.threads.retrieve(thread_id) | |
| if metadata: | |
| client.beta.threads.update(thread_id=thread.id, metadata=metadata) | |
| # Send the user message to the thread | |
| client.beta.threads.messages.create(thread_id=thread.id, role="user", content=user_message) | |
| # Run the assistant | |
| run = client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant_id, instructions=instructions) | |
| # Wait for the response | |
| run_status = poll_run_status(run.id, thread.id, timeout=30) | |
| if run_status == "completed": | |
| messages = client.beta.threads.messages.list(thread_id=thread.id) | |
| response = messages | |
| response_text = messages.data[0].content[0].text.value | |
| else: | |
| response_text = "ๅญธ็ฟ็ฒพ้ๆ้ป็ดฏ๏ผ่ซ็จๅพๅ่ฉฆ๏ผ" | |
| except Exception as e: | |
| if fallback: | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": instructions}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| max_tokens=4000, | |
| ) | |
| response_text = response.choices[0].message.content.strip() | |
| else: | |
| print(f"Error: {e}") | |
| raise gr.Error(f"Error: {e}") | |
| return response_text, thread_id | |
| def verify_message_length(user_message, max_length=500): | |
| # ้ฉ่ญ็จๆถๆถๆฏ็้ทๅบฆ | |
| if len(user_message) > max_length: | |
| error_msg = "ไฝ ็่จๆฏๅคช้ทไบ๏ผ่ซ็ธฎ็ญ่จๆฏ้ทๅบฆ่ณไบ็พๅญไปฅๅ ง" | |
| raise gr.Error(error_msg) | |
| def check_questions_answers(user_message, questions_answers_json): | |
| """ๆชขๆฅๅ็ญๆฏๅฆๅญๅจ๏ผไธฆ่็็ธ้้่ผฏ""" | |
| is_questions_answers_exists = False | |
| answer = "" | |
| # ่งฃๆๅ็ญๆธๆ | |
| if isinstance(questions_answers_json, str): | |
| qa_data = json.loads(questions_answers_json) | |
| else: | |
| qa_data = questions_answers_json | |
| question_message = "" | |
| answer_message = "" | |
| for qa in qa_data: | |
| if user_message == qa["question"] and qa["answer"]: | |
| is_questions_answers_exists = True | |
| question_message = f"ใ้ ่จญๅ้กใ{user_message}" | |
| answer_message = qa["answer"] | |
| print("=== in questions_answers_json==") | |
| print(f"question: {qa['question']}") | |
| print(f"answer: {answer_message}") | |
| break # ๅน้ ๅฐ็ญๆกๅพ้ๅบๅพช็ฐ | |
| return is_questions_answers_exists, question_message, answer_message | |
| def verify_chat_limit(chat_history, chat_limit): | |
| if chat_history is not None and len(chat_history) > chat_limit: | |
| error_msg = "ๆญคๆฌกๅฐ่ฉฑ่ถ ้ไธ้๏ผๅฐ่ฉฑไธ่ผช10ๆฌก๏ผ" | |
| raise gr.Error(error_msg) | |
| def update_chat_history(user_message, response, chat_history): | |
| # ๆดๆฐ่ๅคฉๆญทๅฒ็้่ผฏ | |
| new_chat_history = (user_message, response) | |
| if chat_history is None: | |
| chat_history = [new_chat_history] | |
| else: | |
| chat_history.append(new_chat_history) | |
| return chat_history | |
| def update_send_and_feedback_buttons(chat_history, chat_limit): | |
| # ่ฎก็ฎๅ้ๆฌกๆฐ | |
| send_count = len(chat_history) - 1 | |
| # ๆ นๆฎ่ๅคฉๅๅฒ้ฟๅบฆๆดๆฐๅ้ๆ้ฎๅๅ้ฆๆ้ฎ | |
| if len(chat_history) > chat_limit: | |
| send_btn_value = f"ๅฐ่ฉฑไธ้ ({send_count}/{chat_limit})" | |
| send_btn_update = gr.update(value=send_btn_value, interactive=False) | |
| send_feedback_btn_update = gr.update(visible=True) | |
| else: | |
| send_btn_value = f"็ผ้ ({send_count}/{chat_limit})" | |
| send_btn_update = gr.update(value=send_btn_value, interactive=True) | |
| send_feedback_btn_update = gr.update(visible=False) | |
| return send_btn_update, send_feedback_btn_update | |
| def process_open_ai_audio_to_chatbot(password, audio_url): | |
| verify_password(password) | |
| if audio_url: | |
| with open(audio_url, "rb") as audio_file: | |
| file_size = os.path.getsize(audio_url) | |
| if file_size > 2000000: | |
| raise gr.Error("ๆชๆกๅคงๅฐ่ถ ้๏ผ่ซไธ่ฆ่ถ ้ 60็ง") | |
| else: | |
| transcription = OPEN_AI_CLIENT.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| response_format="text" | |
| ) | |
| # response ๆ่งฃ dict | |
| print("=== transcription ===") | |
| print(transcription) | |
| print("=== transcription ===") | |
| # ็ขบ่ช response ๆฏๅฆๆๆธๅญธ็ฌฆ่๏ผprompt to LATEX $... $, ex: $x^2$ | |
| if transcription: | |
| system_message = """ไฝ ๆฏๅฐๆฅญ็ LATEX ่ฝๆๅธซ๏ผๆ ้ทๅฐๆธๅญธ็ฌฆ่ใๅ ฌๅผ่ฝๆๆ LATEX ๆ ผๅผ๏ผไธฆ็จ LATEX ็ฌฆ่ $...$ ๅ ่ฃน๏ผex: $x^2$ | |
| ็ฏไพ๏ผ | |
| transcription: x็ๅนณๆนๅ 2x ๅ 1 ็ญๆผ 0 | |
| ่ฝๆ LATEX ๆ ผๅผ๏ผ$x^2 + 2x + 1 = 0$ | |
| """ | |
| user_message = f"""transcription: {transcription} | |
| ่ซๅฐ transcription ๅ ง็ๆธๅญธใๅ ฌๅผใ้็ฎๅผใๅๅญธๅผใ็ฉ็ formula ๅ งๅฎน่ฝๆๆ LATEX ๆ ผๅผ | |
| ๅ ถไปๆๅญ้ฝไฟ็ๅๆจฃ | |
| ไนไธ่ฆ็ตฆๅบๅค้ค็ๆ่ฟฐ | |
| """ | |
| request = OPEN_AI_CLIENT.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| max_tokens=4000, | |
| ) | |
| response = request.choices[0].message.content.strip() | |
| else: | |
| response = "" | |
| return response | |
| def poll_run_status(run_id, thread_id, timeout=600, poll_interval=5): | |
| """ | |
| Polls the status of a Run and handles different statuses appropriately. | |
| :param run_id: The ID of the Run to poll. | |
| :param thread_id: The ID of the Thread associated with the Run. | |
| :param timeout: Maximum time to wait for the Run to complete, in seconds. | |
| :param poll_interval: Time to wait between each poll, in seconds. | |
| """ | |
| client = OPEN_AI_CLIENT | |
| start_time = time.time() | |
| while time.time() - start_time < timeout: | |
| run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id) | |
| if run.status in ["completed", "cancelled", "failed"]: | |
| print(f"Run completed with status: {run.status}") | |
| break | |
| elif run.status == "requires_action": | |
| print("Run requires action. Performing required action...") | |
| # Here, you would perform the required action, e.g., running functions | |
| # and then submitting the outputs. This is simplified for this example. | |
| # After performing the required action, you'd complete the action: | |
| # OPEN_AI_CLIENT.beta.threads.runs.complete_required_action(...) | |
| elif run.status == "expired": | |
| print("Run expired. Exiting...") | |
| break | |
| else: | |
| print(f"Run status is {run.status}. Waiting for updates...") | |
| time.sleep(poll_interval) | |
| else: | |
| print("Timeout reached. Run did not complete in the expected time.") | |
| # Once the Run is completed, handle the result accordingly | |
| if run.status == "completed": | |
| # Retrieve and handle messages or run steps as needed | |
| messages = client.beta.threads.messages.list(thread_id=thread_id) | |
| for message in messages.data: | |
| if message.role == "assistant": | |
| print(f"Assistant response: {message.content}") | |
| elif run.status in ["cancelled", "failed"]: | |
| # Handle cancellation or failure | |
| print(f"Run ended with status: {run.status}") | |
| elif run.status == "expired": | |
| # Handle expired run | |
| print("Run expired without completion.") | |
| return run.status | |
| def chat_with_opan_ai_assistant_streaming(user_message, chat_history, password, video_id, user_data, thread_id, transcript_state, key_moments, content_subject, content_grade, socratic_mode=True): | |
| verify_password(password) | |
| print("=====user_data=====") | |
| print(f"user_data: {user_data}") | |
| print("===chat_with_opan_ai_assistant_streaming===") | |
| print(thread_id) | |
| # ๅ ่จ็ฎ user_message ๆฏๅฆ่ถ ้ 500 ๅๅญ | |
| if len(user_message) > 1500: | |
| error_msg = "ไฝ ็่จๆฏๅคช้ทไบ๏ผ่ซ็ธฎ็ญ่จๆฏ้ทๅบฆ่ณไบ็พๅญไปฅๅ ง" | |
| raise gr.Error(error_msg) | |
| # ๅฆๆ chat_history ่ถ ้ 10 ๅ่จๆฏ๏ผ็ดๆฅ return "ๅฐ่ฉฑ่ถ ้ไธ้" | |
| if chat_history is not None and len(chat_history) > CHAT_LIMIT: | |
| error_msg = f"ๆญคๆฌกๅฐ่ฉฑ่ถ ้ไธ้๏ผๅฐ่ฉฑไธ่ผช{CHAT_LIMIT}ๆฌก๏ผ" | |
| raise gr.Error(error_msg) | |
| print("===chat_with_opan_ai_assistant_streaming===") | |
| print(user_message) | |
| is_flagged, response_dict = get_chat_moderation(user_message) | |
| assistant_id = OPEN_AI_ASSISTANT_ID_GPT4 #GPT 4 turbo | |
| # assistant_id = OPEN_AI_ASSISTANT_ID_GPT3 #GPT 3.5 turbo | |
| client = OPEN_AI_CLIENT | |
| metadata = { | |
| "youtube_id": video_id, | |
| "user_data": user_data, | |
| "content_subject": content_subject, | |
| "content_grade": content_grade, | |
| "assistant_id": assistant_id, | |
| "is_streaming": "true", | |
| "moderation_is_flagged": str(is_flagged), | |
| # "moderation_response_dict": str(response_dict) | |
| } | |
| if is_flagged: | |
| partial_messages = "ๆจ็็่จๅทฒ่ขซๆจ่จ็บไธ็ถๅ งๅฎน๏ผ่ซ้ๆฐ็ผ้ใ" | |
| yield partial_messages | |
| else: | |
| try: | |
| if isinstance(key_moments, str): | |
| key_moments_json = json.loads(key_moments) | |
| else: | |
| key_moments_json = key_moments | |
| # key_moments_json remove images | |
| for moment in key_moments_json: | |
| moment.pop('images', None) | |
| moment.pop('end', None) | |
| moment.pop('transcript', None) | |
| moment.pop('suggested_images', None) | |
| key_moments_text = json.dumps(key_moments_json, ensure_ascii=False) | |
| if isinstance(transcript_state, str): | |
| transcript_state_json = json.loads(transcript_state) | |
| else: | |
| transcript_state_json = transcript_state | |
| # remain only text | |
| transcript_text = "" | |
| for content in transcript_state_json: | |
| transcript_text += content["text"] + "," | |
| instructions = get_instructions(content_subject, content_grade, transcript_text, key_moments_text, socratic_mode) | |
| # ๅๅปบ็บฟ็จ | |
| if not thread_id: | |
| thread = client.beta.threads.create() | |
| thread_id = thread.id | |
| print(f"new thread_id: {thread_id}") | |
| else: | |
| thread = client.beta.threads.retrieve(thread_id) | |
| print(f"old thread_id: {thread_id}") | |
| client.beta.threads.update( | |
| thread_id=thread_id, | |
| metadata=metadata | |
| ) | |
| # ๅ็บฟ็จๆทปๅ ็จๆท็ๆถๆฏ | |
| client.beta.threads.messages.create( | |
| thread_id=thread.id, | |
| role="user", | |
| content=user_message + "/n ่ซๅดๆ ผ้ตๅพชinstructions๏ผๆไปปไธไฝ่ๆ ผๆๅบๅฎถๆ๏ผ่ซไธๅฎ่ฆ็จ็น้ซไธญๆๅ็ญ zh-TW๏ผไธฆ็จๅฐ็ฃไบบ็็ฆฎ่ฒๅฃ่ช่กจ้๏ผๅ็ญๆไธ่ฆ็นๅฅ่ชชๆ้ๆฏๅฐ็ฃไบบ็่ชๆฐฃ๏ผไธ็จๆๅฐใ้ๅญ็จฟใ้ๅ่ฉ๏ผ็จใๅ งๅฎนใไปฃๆฟ))๏ผ่ซๅจๅ็ญ็ๆๅพๆจ่จปใๅ่่ณๆ๏ผ๏ผๆ๏ผ:๏ผๅ๏ผ:๏ผ็ง๏ผใ๏ผ๏ผๅฆๆๆฏๅๅๅญธ็๏ผๅฐฑๅชๅไธๅๅ้ก๏ผ่ซๅนซๅฉๅญธ็ๆดๅฅฝ็็่งฃ่ณๆ๏ผๅญๆธๅจ100ๅญไปฅๅ ง๏ผ" | |
| ) | |
| with client.beta.threads.runs.stream( | |
| thread_id=thread.id, | |
| assistant_id=assistant_id, | |
| instructions=instructions, | |
| ) as stream: | |
| partial_messages = "" | |
| for event in stream: | |
| if event.data and event.data.object == "thread.message.delta": | |
| message = event.data.delta.content[0].text.value | |
| partial_messages += message | |
| yield partial_messages | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| raise gr.Error(f"Error: {e}") | |
| user_id = user_data | |
| route = "chat_with_opan_ai_assistant_streaming" | |
| endpoint = "assistant_streaming" | |
| event_response = { | |
| "event_response": partial_messages | |
| } | |
| event_response_json = json.dumps(event_response) | |
| prompt = { | |
| "thread_id": thread_id, | |
| "metadata": metadata, | |
| "user_message": user_message | |
| } | |
| prompt_json = json.dumps(prompt) | |
| feature = "vaitor_chatbot" | |
| insert_log_to_bigquery(user_id, route, endpoint, event_response_json, prompt_json, feature) | |
| def create_thread_id(): | |
| thread = OPEN_AI_CLIENT.beta.threads.create() | |
| thread_id = thread.id | |
| print(f"create new thread_id: {thread_id}") | |
| return thread_id | |
| def chatbot_select(chatbot_name): | |
| chatbot_select_accordion_visible = gr.update(visible=False) | |
| all_chatbot_select_btn_visible = gr.update(visible=True) | |
| chatbot_open_ai_streaming_visible = gr.update(visible=False) | |
| chatbot_ai_visible = gr.update(visible=False) | |
| ai_name_update = gr.update(value="foxcat") | |
| ai_chatbot_thread_id_update = gr.update(value="") | |
| if chatbot_name == "chatbot_open_ai": | |
| chatbot_ai_visible = gr.update(visible=True) | |
| ai_chatbot_ai_type_update = gr.update(value="assistant") | |
| elif chatbot_name == "chatbot_open_ai_streaming": | |
| chatbot_open_ai_streaming_visible = gr.update(visible=True) | |
| ai_chatbot_ai_type_update = gr.update(value="assistant_streaming") | |
| else: | |
| chatbot_ai_visible = gr.update(visible=True) | |
| ai_chatbot_ai_type_update = gr.update(value="chat_completions") | |
| ai_name_update = gr.update(value=chatbot_name) | |
| return chatbot_select_accordion_visible, all_chatbot_select_btn_visible, \ | |
| chatbot_open_ai_streaming_visible, chatbot_ai_visible, \ | |
| ai_name_update, ai_chatbot_ai_type_update, ai_chatbot_thread_id_update | |
| def update_avatar_images(avatar_images, chatbot_description_value): | |
| value = [[ | |
| "่ซๅไฝ ๆฏ่ชฐ๏ผ", | |
| chatbot_description_value | |
| ]] | |
| ai_chatbot_update = gr.update(avatar_images=avatar_images, value=value) | |
| return ai_chatbot_update | |
| def show_all_chatbot_accordion(): | |
| chatbot_select_accordion_visible = gr.update(visible=True) | |
| all_chatbot_select_btn_visible = gr.update(visible=False) | |
| return chatbot_select_accordion_visible, all_chatbot_select_btn_visible | |
| def insert_log_to_bigquery(user_id, route, endpoint, event_response_json, prompt_json, feature): | |
| table_id = "junyiacademy.streaming_log.log_video_ai_usage" | |
| rows_to_insert = [ | |
| { | |
| "user_id": user_id, | |
| "route": route, | |
| "endpoint": endpoint, | |
| "event_response": event_response_json, | |
| "event_timestamp": datetime.now(timezone.utc).isoformat(), | |
| "prompt": prompt_json, | |
| "feature": feature | |
| } | |
| ] | |
| errors = GBQ_CLIENT.insert_rows_json(table_id, rows_to_insert) | |
| if errors: | |
| print(f"Encountered errors while inserting rows: {errors}") | |
| else: | |
| print("Rows have been successfully inserted.") | |
| # --- Init params --- | |
| def init_params(text, request: gr.Request): | |
| if request: | |
| print("Request headers dictionary:", request.headers) | |
| print("IP address:", request.client.host) | |
| print("Query parameters:", dict(request.query_params)) | |
| # url = request.url | |
| print("Request URL:", request.url) | |
| youtube_link = "" | |
| password_text = "" | |
| block_ready_flag = "READY" | |
| admin = gr.update(visible=True) | |
| reading_passage_admin = gr.update(visible=True) | |
| summary_admin = gr.update(visible=True) | |
| see_detail = gr.update(visible=True) | |
| worksheet_accordion = gr.update(visible=True) | |
| lesson_plan_accordion = gr.update(visible=True) | |
| exit_ticket_accordion = gr.update(visible=True) | |
| chatbot_open_ai_streaming = gr.update(visible=False) | |
| chatbot_ai = gr.update(visible=False) | |
| ai_chatbot_params = gr.update(visible=True) | |
| is_env_prod = gr.update(value=False) | |
| # if youtube_link in query_params | |
| if "youtube_id" in request.query_params: | |
| youtube_id = request.query_params["youtube_id"] | |
| youtube_link = f"https://www.youtube.com/watch?v={youtube_id}" | |
| print(f"youtube_link: {youtube_link}") | |
| # check if origin is from junyiacademy | |
| origin = request.headers.get("origin", "") | |
| allowed_domains = [ | |
| "junyiacademy.org", | |
| "junyiacademy.appspot.com", | |
| "rev-proxy-dg4bspkswq-de.a.run.app" | |
| ] | |
| if any(domain in origin for domain in allowed_domains): | |
| password_text = PASSWORD | |
| admin = gr.update(visible=False) | |
| reading_passage_admin = gr.update(visible=False) | |
| summary_admin = gr.update(visible=False) | |
| see_detail = gr.update(visible=False) | |
| worksheet_accordion = gr.update(visible=False) | |
| lesson_plan_accordion = gr.update(visible=False) | |
| exit_ticket_accordion = gr.update(visible=False) | |
| ai_chatbot_params = gr.update(visible=False) | |
| if IS_ENV_PROD == "True": | |
| is_env_prod = gr.update(value=True) | |
| return admin, reading_passage_admin, summary_admin, see_detail, \ | |
| worksheet_accordion, lesson_plan_accordion, exit_ticket_accordion, \ | |
| password_text, youtube_link, block_ready_flag, \ | |
| chatbot_open_ai_streaming, chatbot_ai, ai_chatbot_params, \ | |
| is_env_prod | |
| def update_state(content_subject, content_grade, trascript, key_moments, questions_answers): | |
| # inputs=[content_subject, content_grade, df_string_output], | |
| # outputs=[content_subject_state, content_grade_state, trascript_state] | |
| content_subject_state = content_subject | |
| content_grade_state = content_grade | |
| trascript_json = json.loads(trascript) | |
| formatted_simple_transcript = create_formatted_simple_transcript(trascript_json) | |
| trascript_state = formatted_simple_transcript | |
| key_moments_state = key_moments | |
| streaming_chat_thread_id_state = "" | |
| questions_answers_json = json.loads(questions_answers) | |
| question_1 = questions_answers_json[0]["question"] | |
| question_2 = questions_answers_json[1]["question"] | |
| question_3 = questions_answers_json[2]["question"] | |
| ai_chatbot_question_1 = question_1 | |
| ai_chatbot_question_2 = question_2 | |
| ai_chatbot_question_3 = question_3 | |
| return content_subject_state, content_grade_state, trascript_state, key_moments_state, \ | |
| streaming_chat_thread_id_state, \ | |
| ai_chatbot_question_1, ai_chatbot_question_2, ai_chatbot_question_3 | |
| HEAD = """ | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <script src="https://cdn.jsdelivr.net/npm/markmap-autoloader@0.15.2"></script> | |
| <script> | |
| let mind_map_tab_button = document.querySelector("#mind_map_tab-button"); | |
| if (mind_map_tab_button) { | |
| mind_map_tab_button.addEventListener('click', function() { | |
| const mind_map_markdown = document.querySelector("#mind_map_markdown > label > textarea"); | |
| if (mind_map_markdown) { | |
| // ๅฝๆ้ฎ่ขซ็นๅปๆถ๏ผๆๅฐๅฝๅ็textarea็ๅผ | |
| console.log('Value changed to: ' + mind_map_markdown.value); | |
| markmap.autoLoader.renderAll(); | |
| } | |
| }); | |
| } | |
| </script> | |
| """ | |
| JS = """ | |
| function createGradioAnimation() { | |
| let mind_map_tab_button = document.querySelector("#mind_map_tab-button"); | |
| if (mind_map_tab_button) { | |
| mind_map_tab_button.addEventListener('click', function() { | |
| const mind_map_markdown = document.querySelector("#mind_map_markdown > label > textarea"); | |
| if (mind_map_markdown) { | |
| // ๅฝๆ้ฎ่ขซ็นๅปๆถ๏ผๆๅฐๅฝๅ็textarea็ๅผ | |
| console.log('Value changed to: ' + mind_map_markdown.value); | |
| markmap.autoLoader.renderAll(); | |
| } | |
| }); | |
| } | |
| return 'Animation created'; | |
| } | |
| """ | |
| CSS = """ | |
| #mind_map_tab { | |
| height: 500px; | |
| } | |
| .markmap { | |
| position: relative; | |
| } | |
| .markmap > svg { | |
| width: 100%; | |
| height: 600px; | |
| } | |
| """ | |
| streaming_chat_greeting = """ | |
| Hi๏ผๆๆฏใ้ฃ็น้ณ้ใ๏ผ่ชช่ฉฑๆฏ่ผๅฟซ๏ผไฝๆไป้บผๅ้ก้ฝๅฏไปฅๅๆๅ๏ผ \n | |
| ๐ ๆๆฒๆ้ ่จญๅ้กใไนๆฒๆ่ช้ณ่ผธๅ ฅ๏ผ้ฉๅๅฟซๅๅฟซ็ญ็ไฝ \n | |
| ๐ ้ต็ค่ผธๅ ฅไฝ ็ๅ้ก๏ผๆๆ็กๅๅ็ญไฝ ็ๅ้กๅ๏ผ\n | |
| ๐ค ๆ้ๅจๆ้ท๏ผ้ซๅๆ้๏ผๆฏไธๆฌกๅญธ็ฟๅช่ฝๅ็ญๅๅๅ้ก๏ผ่ซ่ฎๆไผๆฏไธไธๅๅๅ้กๅ๏ผ | |
| """ | |
| latex_delimiters = [{"left": "$", "right": "$", "display": False}] | |
| streaming_ai_chatbot = gr.Chatbot( | |
| show_share_button=False, | |
| latex_delimiters=latex_delimiters, | |
| show_copy_button=True, | |
| ) | |
| def create_app(): | |
| app = FastAPI() | |
| app.add_middleware(BlockFileRedirectMiddleware) | |
| with gr.Blocks(theme=gr.themes.Base(primary_hue=gr.themes.colors.orange, secondary_hue=gr.themes.colors.amber, text_size = gr.themes.sizes.text_lg), head=HEAD, js=JS, css=CSS) as demo: | |
| with gr.Row() as admin: | |
| password = gr.Textbox(label="Password", type="password", elem_id="password_input", visible=True) | |
| junyi_link = gr.Textbox(label="Junyi Link", elem_id="junyi_link_input", visible=True) | |
| youtube_link = gr.Textbox(label="Enter YouTube Link", elem_id="youtube_link_input", visible=True) | |
| video_id = gr.Textbox(label="video_id", visible=True) | |
| # file_upload = gr.File(label="Upload your CSV or Word file", visible=False) | |
| # web_link = gr.Textbox(label="Enter Web Page Link", visible=False) | |
| user_data = gr.Textbox(label="User Data", elem_id="user_data_input", visible=True) | |
| # block_ready_flag: ่ฎไธป็ซ็ Vaitor component ็ฅ้ Blocks.load ๅทฒ็ถๅท่กๅฎๆ๏ผ็ถ block_ready_flag = "READY" ๆ๏ผ | |
| block_ready_flag = gr.Textbox(label="Block Ready Flag", elem_id="block_ready_flag", visible=False, value="LOADING") | |
| youtube_link_btn = gr.Button("Submit_YouTube_Link", elem_id="youtube_link_btn", visible=True) | |
| with gr.Row() as data_state: | |
| content_subject_state = gr.State() # ไฝฟ็จ gr.State ๅญๅจ content_subject | |
| content_grade_state = gr.State() # ไฝฟ็จ gr.State ๅญๅจ content_grade | |
| trascript_state = gr.State() # ไฝฟ็จ gr.State ๅญๅจ trascript | |
| key_moments_state = gr.State() # ไฝฟ็จ gr.State ๅญๅจ key_moments | |
| streaming_chat_thread_id_state = gr.State() # ไฝฟ็จ gr.State ๅญๅจ streaming_chat_thread_id | |
| with gr.Tab("AIๅฐ็ฒพ้"): | |
| with gr.Row(): | |
| all_chatbot_select_btn = gr.Button("้ธๆ AI ๅฐ็ฒพ้ ๐", elem_id="all_chatbot_select_btn", visible=False, variant="secondary", size="sm") | |
| with gr.Row() as ai_chatbot_params: | |
| ai_name = gr.Dropdown( | |
| label="้ธๆ AI ๅฉ็", | |
| choices=[ | |
| ("้ฃ็น็ฒพ้","chatbot_open_ai"), | |
| ("้ฃ็น้ณ้","chatbot_open_ai_streaming"), | |
| ("ๆขจๆขจ","lili"), | |
| ("้บฅ้บฅ","maimai"), | |
| ("็็ธ่ฒ","foxcat") | |
| ], | |
| value="foxcat", | |
| visible=True | |
| ) | |
| ai_chatbot_ai_type = gr.Textbox(value="chat_completions", visible=True) | |
| ai_chatbot_thread_id = gr.Textbox(label="thread_id", visible=True) | |
| ai_chatbot_socratic_mode_btn = gr.Checkbox(label="่ๆ ผๆๅบๅฎถๆๅฉ็ๆจกๅผ", value=False, visible=True) | |
| latex_delimiters = [{"left": "$", "right": "$", "display": False}] | |
| with gr.Accordion("้ธๆ AI ๅฐ็ฒพ้", elem_id="chatbot_select_accordion") as chatbot_select_accordion: | |
| with gr.Row(): | |
| # ้ฃ็น้ณ้ | |
| with gr.Column(scale=1, variant="panel", visible=True): | |
| streaming_chatbot_avatar_url = "https://storage.googleapis.com/wpassets.junyiacademy.org/1/2020/11/1-%E6%98%9F%E7%A9%BA%E9%A0%AD%E8%B2%BC-%E5%A4%AA%E7%A9%BA%E7%8B%90%E7%8B%B8%E8%B2%93-150x150.png" | |
| streaming_chatbot_description = """Hi๏ผๆๆฏใ้ฃ็น้ณ้ใ๏ผ \n | |
| ่ชช่ฉฑๆฏ่ผๅฟซ๏ผไฝๆไป้บผๅ้ก้ฝๅฏไปฅๅๆๅ๏ผ \n | |
| ๐ ๆๆฒๆ้ ่จญๅ้กใไนๆฒๆ่ช้ณ่ผธๅ ฅ๏ผ้ฉๅๅฟซๅๅฟซ็ญ๏ผไธ่ตท็ทด็ฟๅๅบๅฅฝๅ้กๅง \n | |
| ๐ ๆ ้ท็จๆๅญ่กจ้็ไฝ ๏ผๅฏไปฅ็จ้ต็ค่ผธๅ ฅไฝ ็ๅ้ก๏ผๆๆ็กๅๅ็ญไฝ ็ๅ้กๅ\n | |
| ๐ค ๆ้ๅจๆ้ท๏ผ้ซๅๆ้๏ผๆฏไธๆฌกๅญธ็ฟๅช่ฝๅ็ญๅๅๅ้ก๏ผ่ซ่ฎๆไผๆฏไธไธๅๅๅ้กๅ๏ฝ | |
| """ | |
| chatbot_open_ai_streaming_name = gr.State("chatbot_open_ai_streaming") | |
| gr.Image(value=streaming_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False, show_share_button=False, show_fullscreen_button=False) | |
| chatbot_open_ai_streaming_select_btn = gr.Button("๐้ธๆใ้ฃ็น้ณ้ใ", elem_id="streaming_chatbot_btn", visible=True, variant="primary") | |
| with gr.Accordion("๐ ้ฃ็น้ณ้ ๆ่ฟฐ", open=False): | |
| gr.Markdown(value=streaming_chatbot_description, visible=True) | |
| user_avatar = "https://em-content.zobj.net/source/google/263/flushed-face_1f633.png" | |
| # ้ฃ็น็ฒพ้ | |
| with gr.Column(scale=1, variant="panel", visible=True): | |
| vaitor_chatbot_avatar_url = "https://junyitopicimg.s3.amazonaws.com/s4byy--icon.jpe" | |
| vaitor_chatbot_avatar_images = gr.State([user_avatar, vaitor_chatbot_avatar_url]) | |
| vaitor_chatbot_description = """Hi๏ผๆๆฏไฝ ็AIๅญธไผดใ้ฃ็น็ฒพ้ใ๏ผ\n | |
| ๆๅฏไปฅ้ชไฝ ไธ่ตทๅญธ็ฟๆฌๆฌก็ๅ งๅฎน๏ผๆไป้บผๅ้ก้ฝๅฏไปฅๅๆๅ๏ผ\n | |
| ๐ค ๅฆๆไฝ ไธ็ฅ้ๆ้บผ็ผๅ๏ผๅฏไปฅ้ปๆๅทฆไธๆน็ๅ้กไธใๅ้กไบใๅ้กไธ๏ผๆๆๅนซไฝ ็ๆๅ้ก๏ผ\n | |
| ๐ฃ๏ธ ไนๅฏไปฅ้ปๆๅณไธๆน็จ่ช้ณ่ผธๅ ฅ๏ผๆๆๅนซไฝ ่ฝๆๆๆๅญ๏ผๅฒๅฎณๅง๏ผ\n | |
| ๐ ๆๆฏ็ดๆฅ้ต็ค่ผธๅ ฅไฝ ็ๅ้ก๏ผๆๆ็กๅๅ็ญไฝ ็ๅ้กๅ๏ผ\n | |
| ๐ค ไฝๆ้ๅจๆ้ท๏ผ้ซๅๆ้๏ผๆฏไธๆฌกๅญธ็ฟๅช่ฝๅ็ญๅๅๅ้ก๏ผ่ซ่ฎๆไผๆฏไธไธๅๅๅ้กๅ๏ผ\n | |
| ๐ฆ ๅฆๆ้ๅฐไธ้๏ผๆๆฏ้ๅฐ็ฒพ้ๅพ็ดฏ๏ผ่ซๅๅๅ ถไปๆๅ๏ผๅๆฏ้ฃ็น้ณ้่ชช่ฉฑ็้ๅบฆๆฏ่ผๅฟซ๏ผไฝ ๆฏๅฆ่ทๅพไธๅข๏ผไฝ ไนๅฏไปฅๅๅ ถไป็ฒพ้ไบๅ็็ๅ๏ผ\n | |
| """ | |
| chatbot_open_ai_name = gr.State("chatbot_open_ai") | |
| gr.Image(value=vaitor_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False, show_share_button=False, show_fullscreen_button=False) | |
| vaitor_chatbot_select_btn = gr.Button("๐้ธๆใ้ฃ็น็ฒพ้ใ", elem_id="chatbot_btn", visible=True, variant="primary") | |
| with gr.Accordion("๐ฆ ้ฃ็น็ฒพ้ ๆ่ฟฐ", open=False): | |
| vaitor_chatbot_description_value = gr.Markdown(value=vaitor_chatbot_description, visible=True) | |
| # ็็ธ่ฒ | |
| with gr.Column(scale=1, variant="panel", visible=True): | |
| foxcat_chatbot_avatar_url = "https://storage.googleapis.com/wpassets.junyiacademy.org/1/2020/06/%E7%A7%91%E5%AD%B8%E5%BE%BD%E7%AB%A0-2-150x150.png" | |
| foxcat_avatar_images = gr.State([user_avatar, foxcat_chatbot_avatar_url]) | |
| foxcat_chatbot_description = """Hi๏ผๆๆฏใ็็ธ่ฒใ๏ผๅฏไปฅ้ชไฝ ไธ่ตทๅญธ็ฟๆฌๆฌก็ๅ งๅฎน๏ผๆไป้บผๅ้ก้ฝๅฏไปฅๅๆๅ๏ผ\n | |
| ๐ค ไธๅนด็ดๅญธ็๏ฝ10 ๆญฒ๏ฝ็ท\n | |
| ๐ฃ๏ธ ๅฃ้ ญ็ฆช๏ผใๆ่ฆบๅฅฝๅฅฝ็ฉๅ๏ผใใๅฆ๏ผๆฏ้ๆจฃๅ๏ผใ\n | |
| ๐ ่่ถฃ๏ผ็็ฅ่ญๅๆธ็ฑใ็ฑ่ก็ๅๆผซๅก้ใๆ็ใ็ฌๅฑฑใ้จ่ ณ่ธ่ปใๅ ็บๅคชๅๆญกๅ้ญไบ๏ผๆญฃๅชๅๅ็ธ็ธๅญธ็ฟ้ฃ้ญใๆ็้ญๅๅ็จฎๆ้้ญ็็ฅ่ญ๏ผๆ่จๅญ็้ฃ็ฉๆฏ้ๆคใ\n | |
| ๐ค ๅๆง๏ผๅๆญกๅญธ็ฟๆฐ็ฅ๏ผๆๆๆๆบ็็ๅฅฝๅฅๅฟ๏ผๅฎถ่ฃกๅ ๆปฟ็พ็งๅ จๆธ๏ผไพๅฆ๏ผๅๅฎถๅฐ็้ ป้ๅบ็็ใ็ตๆฅต้ญ็พ็งใ๏ผ้้ฝๆฒๆ็ๅฎ๏ผๅธธๅธธ่ขซๆขจๆขจๅธๆฏไธๅ้็ฑๅบฆ๏ผไฝๆฏไนไธ้ปไธ้ปๅญธ็ฟๅฐไธๅ้ ๅ็็ฅ่ญใ้็ถๆๆๆๅฟๆฑๅฟ่ฅฟ๏ผไฝ่ช็่ตทไพไนๆฏๅพๅฏ้ ๏ผ็ญๆ็ไบ็ตๅฐไฝฟๅฝๅฟ ้ใ้ๅฐๆๆฐๆ๏ผๅๆผ่ทณๅบ่้ฉๅ๏ผ่ฟฝๆฑ่ชๆๆน่ฎ๏ผ่ฆๅฐ้ฃ็บๆ้ท็ๆฉๆใ | |
| """ | |
| foxcat_chatbot_name = gr.State("foxcat") | |
| gr.Image(value=foxcat_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False, show_share_button=False, show_fullscreen_button=False) | |
| foxcat_chatbot_select_btn = gr.Button("๐้ธๆใ็็ธ่ฒใ", visible=True, variant="primary", elem_classes="chatbot_select_btn") | |
| with gr.Accordion("๐ ็็ธ่ฒ ๆ่ฟฐ", open=False): | |
| foxcat_chatbot_description_value = gr.Markdown(value=foxcat_chatbot_description, visible=True) | |
| # ๆขจๆขจ | |
| with gr.Column(scale=1, variant="panel", visible=True): | |
| lili_chatbot_avatar_url = "https://junyitopicimg.s3.amazonaws.com/live/v1283-new-topic-44-icon.png" | |
| lili_avatar_images = gr.State([user_avatar, lili_chatbot_avatar_url]) | |
| lili_chatbot_description = """ไฝ ๅฅฝ๏ผๆๆฏๆบซๆ็ใๆขจๆขจใ๏ผๅพ้ซ่ๅฏไปฅๅจ้่ฃก้ชไผดไฝ ๅญธ็ฟใๅฆๆไฝ ๆไปปไฝ็ๅ๏ผ่ซ้จๆๅๆๆๅบๅฆ๏ผ \n | |
| ๐ค ไธๅนด็ดๅญธ็๏ฝ10 ๆญฒ๏ฝๅฅณ\n | |
| ๐ฃ๏ธ ๅฃ้ ญ็ฆช๏ผใ็็ๅ็๏ผ๏ผใใ่ฎๆๆณไธๆณๅใใไฝ ็ๅง๏ผๅคงๅ้กๆ่งฃๆๅฐๅ้ก๏ผๅฐฑ่ฎๅพ็ฐกๅฎๅฆ๏ผใใๆททๆททๅฉๅฉ็็ๆดปไธๅผๅพ้ใ\n | |
| ๐ ่่ถฃ๏ผ็็้ค ไนพ๏ผ็ถๆฏ้็ณ้ค ๅบ๏ผใ็ซ็ซใ่ฝๆต่ก้ณๆจใๆถ็ดใ\n | |
| ๐ค ๅๆง๏ผ | |
| - ๅ งๅๅฎณ็พ๏ผๆฏ่ตทๅบๅป็ฉๆดๅๆญกๅพ ๅจๅฎถ๏ผ้ค้ๆฏ่ท็็ธ่ฒๅบๅป็ฉ๏ผ | |
| - ๆธ็้่ผฏๅพๅฅฝ๏ผๅ ถๅฏฆ่ฆบๅพ้บฅ้บฅ้ฃ็ ็ฎ็ๆๅๆ้ป็ ฉ๏ผไฝ้ๆฏๆ่ๅฟๅฐๅ็ญ | |
| - ๆ้ฉไบบ็็ผๅ๏ผ็ธฝ่ฝ่งๅฏๅฐๅ ถไปไบบๆฒๆๅฏ่ฆบ็็ดฐ็ฏ | |
| - ๅๆญกๆดๆด้ฝ้ฝ็็ฐๅข๏ผๆไปฅไธๅฐ้บฅ้บฅๅฎถๅฐฑๅไธไบ | |
| """ | |
| lili_chatbot_name = gr.State("lili") | |
| gr.Image(value=lili_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False, show_share_button=False, show_fullscreen_button=False) | |
| lili_chatbot_select_btn = gr.Button("๐้ธๆใๆขจๆขจใ", visible=True, variant="primary", elem_classes="chatbot_select_btn") | |
| with gr.Accordion("๐งก ๆขจๆขจ ๆ่ฟฐ", open=False): | |
| lili_chatbot_description_value = gr.Markdown(value=lili_chatbot_description, visible=True) | |
| # ้บฅ้บฅ | |
| with gr.Column(scale=1, variant="panel", visible=True): | |
| maimai_chatbot_avatar_url = "https://storage.googleapis.com/wpassets.junyiacademy.org/1/2020/07/%E6%80%9D%E8%80%83%E5%8A%9B%E8%B6%85%E4%BA%BA%E5%BE%BD%E7%AB%A0_%E5%B7%A5%E4%BD%9C%E5%8D%80%E5%9F%9F-1-%E8%A4%87%E6%9C%AC-150x150.png" | |
| maimai_avatar_images = gr.State([user_avatar, maimai_chatbot_avatar_url]) | |
| maimai_chatbot_description = """Hi๏ผๆๆฏ่ฟทไบบ็ใ้บฅ้บฅใ๏ผๆๅจ้่ฃก็ญ่ๅไฝ ไธ่ตทๆข็ดขๆฐ็ฅ๏ผไปปไฝ็ๅ้ฝๅฏไปฅๅๆๆๅบ๏ผ\n | |
| ๐ค ไธๅนด็ดๅญธ็๏ฝ10 ๆญฒ๏ฝ็ท\n | |
| ๐ฃ๏ธ ๅฃ้ ญ็ฆช๏ผใOh My God!ใใๅฅฝๅฅๆชๅ๏ผใใๅ๏ผๅไพๆฏ้ๆจฃๅ๏ผใ\n | |
| ๐ ่่ถฃ๏ผๆๆๅป้ๅค็ฉ่๏ผๅฟๆ ๅฅฝๆๆ้ ไพฟๆ้ญ้็ตฆ็็ธ่ฒ๏ผ๏ผๅๆญก่ฌๅท็ฌ่ฉฑใๆกไฝๅใๅ ็บๅคชๅๆญก็ฉๅ ท๏ผ่้ๅง่ชๅทฑๅ็ฉๅ ท๏ผๅฎถ่ฃกๅฐฑๅฅฝๅไป็้ๆจๅ ดใ\n | |
| ๐ค ๅๆง๏ผๅๆญกๅๅ้ก๏ผๅฐฑ็ฎ่ขซๆขจๆขจใใ๏ผไน้ๆฏ็ งๅ๏ฝๆจๅ๏ผๅคๅๅฅฝๅ๏ผๆจๅคฉ้ๆ๏ผไธๆ่ขซ้ฃ้กๆๆ๏ฝๅๆญกๆถ้ๅๅผๅๆจฃ็ๆฑ่ฅฟ๏ผๆฟ้ๅชๆๅจๆด็็้ฃไธๅคฉๆไนพๆทจ | |
| """ | |
| maimai_chatbot_name = gr.State("maimai") | |
| gr.Image(value=maimai_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False, show_share_button=False, show_fullscreen_button=False) | |
| maimai_chatbot_select_btn = gr.Button("๐้ธๆใ้บฅ้บฅใ", visible=True, variant="primary", elem_classes="chatbot_select_btn") | |
| with gr.Accordion("๐ ้บฅ้บฅ ๆ่ฟฐ", open=False): | |
| maimai_chatbot_description_value = gr.Markdown(value=maimai_chatbot_description, visible=True) | |
| # ๅฐๆช้ๆพ | |
| with gr.Column(scale=1, variant="panel"): | |
| gr.Markdown(value="### ๅฐๆช้ๆพ", visible=True) | |
| with gr.Row("้ฃ็น้ณ้") as chatbot_open_ai_streaming: | |
| with gr.Column(): | |
| streaming_chat_greeting = """ | |
| Hi๏ผๆๆฏใ้ฃ็น้ณ้ใ๏ผ่ชช่ฉฑๆฏ่ผๅฟซ๏ผไฝๆไป้บผๅ้ก้ฝๅฏไปฅๅๆๅ๏ผ \n | |
| ๐ ๆๆฒๆ้ ่จญๅ้กใไนๆฒๆ่ช้ณ่ผธๅ ฅ๏ผ้ฉๅๅฟซๅๅฟซ็ญ็ไฝ \n | |
| ๐ ้ต็ค่ผธๅ ฅไฝ ็ๅ้ก๏ผๆๆ็กๅๅ็ญไฝ ็ๅ้กๅ๏ผ\n | |
| ๐ค ๆ้ๅจๆ้ท๏ผ้ซๅๆ้๏ผๆฏไธๆฌกๅญธ็ฟๅช่ฝๅ็ญๅๅๅ้ก๏ผ่ซ่ฎๆไผๆฏไธไธๅๅๅ้กๅ๏ผ | |
| """ | |
| additional_inputs = [password, video_id, user_data, streaming_chat_thread_id_state, trascript_state, key_moments_state, content_subject_state, content_grade_state, ai_chatbot_socratic_mode_btn] | |
| streaming_chat = gr.ChatInterface( | |
| fn=chat_with_opan_ai_assistant_streaming, | |
| chatbot=streaming_ai_chatbot, | |
| additional_inputs=additional_inputs, | |
| submit_btn="้ๅบ", | |
| stop_btn=None, | |
| description=streaming_chat_greeting | |
| ) | |
| with gr.Row("ไธ่ฌ็ฒพ้") as chatbot_ai: | |
| with gr.Column(): | |
| ai_chatbot_greeting = [[ | |
| "่ซๅไฝ ๆฏ่ชฐ๏ผ", | |
| """Hi๏ผๆๆฏ้ฃ็น็ฒพ้็ๆๅๅใๆขจๆขจใ้บฅ้บฅใ็็ธ่ฒใ๏ผไนๅฏไปฅ้ชไฝ ไธ่ตทๅญธ็ฟๆฌๆฌก็ๅ งๅฎน๏ผๆไป้บผๅ้ก้ฝๅฏไปฅๅๆๅ๏ผ | |
| ๐ค ๅฆๆไฝ ไธ็ฅ้ๆ้บผ็ผๅ๏ผๅฏไปฅ้ปๆๅทฆไธๆน็ๅ้กไธใๅ้กไบใๅ้กไธ๏ผๆๆๅนซไฝ ็ๆๅ้ก๏ผ | |
| ๐ฃ๏ธ ไนๅฏไปฅ้ปๆๅณไธๆน็จ่ช้ณ่ผธๅ ฅ๏ผๆๆๅนซไฝ ่ฝๆๆๆๅญ๏ผๅฒๅฎณๅง๏ผ | |
| ๐ ๆๆฏ็ดๆฅ้ต็ค่ผธๅ ฅไฝ ็ๅ้ก๏ผๆๆ็กๅๅ็ญไฝ ็ๅ้กๅ๏ผ | |
| ๐ค ็ฒพ้ๅ้ซๅ้ฝๆ้๏ผๆฏไธๆฌกๅญธ็ฟๅช่ฝๅ็ญๅๅๅ้ก๏ผ่ซ่ฎๆไผๆฏไธไธๅๅๅ้กๅ๏ผ | |
| """, | |
| ]] | |
| with gr.Row(): | |
| ai_chatbot = gr.Chatbot(label="ai_chatbot", show_share_button=False, show_label=False, latex_delimiters=latex_delimiters, value=ai_chatbot_greeting) | |
| with gr.Row(): | |
| with gr.Accordion("ไฝ ไนๆ้กไผผ็ๅ้กๆณๅๅ๏ผ ่ซๆไธ โ๏ธ", open=False) as ask_questions_accordion_2: | |
| ai_chatbot_question_1 = gr.Button("ๅ้กไธ") | |
| ai_chatbot_question_2 = gr.Button("ๅ้กไธ") | |
| ai_chatbot_question_3 = gr.Button("ๅ้กไธ") | |
| create_questions_btn = gr.Button("็ๆๅ้ก", variant="primary") | |
| ai_chatbot_audio_input = gr.Audio(sources=["microphone"], type="filepath", max_length=60, label="่ช้ณ่ผธๅ ฅ") | |
| with gr.Row(): | |
| ai_msg = gr.Textbox(label="่จๆฏ่ผธๅ ฅ",scale=3) | |
| ai_send_button = gr.Button("้ๅบ", variant="primary",scale=1) | |
| ai_send_feedback_btn = gr.Button("ๆๅๅๅ้ฅ", variant="primary", scale=1, visible=False) | |
| with gr.Tab("ๆ็ซ ๆจกๅผ"): | |
| with gr.Row(): | |
| reading_passage = gr.Markdown(show_label=False, latex_delimiters = [{"left": "$", "right": "$", "display": False}]) | |
| reading_passage_speak_button = gr.Button("Speak", visible=False) | |
| reading_passage_audio_output = gr.Audio(label="Audio Output", visible=False) | |
| with gr.Tab("้้ปๆ่ฆ"): | |
| with gr.Row(): | |
| df_summarise = gr.Markdown(show_label=False, latex_delimiters = [{"left": "$", "right": "$", "display": False}]) | |
| mind_map_tab = gr.Tab("ๅฟๆบๅ", elem_id="mind_map_tab") | |
| with mind_map_tab: | |
| with gr.Row(): | |
| mind_map_html = gr.HTML() | |
| def render_mind_map(): | |
| # ้ๅๅฝๆธๆ่ฟๅไธๅ JavaScript ไปฃ็ขผ๏ผ่ฉฒไปฃ็ขผๅฐๅจๅ็ซฏๅท่ก | |
| js_code = """ | |
| function() { | |
| if (document.querySelector('.markmap svg') === null) { | |
| console.log('No markmap SVG found, rendering now...'); | |
| markmap.autoLoader.renderAll(); | |
| } else { | |
| console.log('Markmap SVG already exists, skipping render'); | |
| } | |
| return true; // ๅฏไปฅ่ฟๅไปปไฝๅผ๏ผ้่ฃก่ฟๅ true ่กจ็คบๆไฝๆๅ | |
| } | |
| """ | |
| return js_code | |
| mind_map_tab.select(fn=None, inputs=None, outputs=None, js=render_mind_map()) | |
| with gr.Tab("้้ตๆๅป"): | |
| with gr.Row(): | |
| key_moments_html = gr.HTML(value="") | |
| with gr.Tab("ๆๅญธๅ่ชฒ"): | |
| with gr.Row(): | |
| content_subject = gr.Textbox(label="้ธๆไธป้ก", value="", visible=False) | |
| content_grade = gr.Textbox(label="้ธๆๅนด็ด", value="", visible=False) | |
| content_level = gr.Dropdown(label="ๅทฎ็ฐๅๆๅญธ", choices=["ๅบ็ค", "ไธญ็ด", "้ฒ้"], value="ๅบ็ค") | |
| with gr.Row(): | |
| with gr.Tab("ๅญธ็ฟๅฎ"): | |
| with gr.Row(): | |
| worksheet_content_type_name = gr.Textbox(value="worksheet", visible=False) | |
| worksheet_algorithm = gr.Dropdown(label="้ธๆๆๅญธ็ญ็ฅๆ็่ซ", choices=["Bloom่ช็ฅ้ๅฑค็่ซ", "Polyaๆธๅญธ่งฃ้กๆณ", "CRAๆๅญธๆณ"], value="Bloom่ช็ฅ้ๅฑค็่ซ", visible=False) | |
| worksheet_content_btn = gr.Button("็ๆๅญธ็ฟๅฎ ๐", variant="primary", visible=True) | |
| with gr.Accordion("ๅพฎ่ชฟ", open=False): | |
| worksheet_result_fine_tune_prompt = gr.Textbox(label="ๆ นๆ็ตๆ๏ผ่ผธๅ ฅไฝ ๆณๆดๆน็ๆณๆณ") | |
| worksheet_result_fine_tune_btn = gr.Button("ๅพฎ่ชฟ็ตๆ", variant="primary") | |
| worksheet_result_retrun_original = gr.Button("่ฟๅๅๅง็ตๆ") | |
| with gr.Accordion("prompt", open=False) as worksheet_accordion: | |
| worksheet_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40) | |
| with gr.Column(scale=2): | |
| # ็ๆๅฐๆไธๅๆจกๅผ็็ตๆ | |
| worksheet_result_prompt = gr.Textbox(visible=False) | |
| worksheet_result_original = gr.Textbox(visible=False) | |
| worksheet_result = gr.Markdown(label="ๅๆฌก็ๆ็ตๆ", latex_delimiters = [{"left": "$", "right": "$", "display": False}]) | |
| worksheet_download_button = gr.Button("่ฝๆ word๏ผๅฎๆๅพ่ซ้ปๆๅณไธ่ง download ๆ้", variant="primary") | |
| worksheet_result_word_link = gr.File(label="Download Word") | |
| with gr.Tab("ๆๆก"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| lesson_plan_content_type_name = gr.Textbox(value="lesson_plan", visible=False) | |
| lesson_plan_time = gr.Slider(label="้ธๆ่ชฒ็จๆ้(ๅ้)", minimum=10, maximum=120, step=5, value=40) | |
| lesson_plan_btn = gr.Button("็ๆๆๆก ๐", variant="primary", visible=True) | |
| with gr.Accordion("ๅพฎ่ชฟ", open=False): | |
| lesson_plan_result_fine_tune_prompt = gr.Textbox(label="ๆ นๆ็ตๆ๏ผ่ผธๅ ฅไฝ ๆณๆดๆน็ๆณๆณ") | |
| lesson_plan_result_fine_tune_btn = gr.Button("ๅพฎ่ชฟ็ตๆ", variant="primary") | |
| lesson_plan_result_retrun_original = gr.Button("่ฟๅๅๅง็ตๆ") | |
| with gr.Accordion("prompt", open=False) as lesson_plan_accordion: | |
| lesson_plan_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40) | |
| with gr.Column(scale=2): | |
| # ็ๆๅฐๆไธๅๆจกๅผ็็ตๆ | |
| lesson_plan_result_prompt = gr.Textbox(visible=False) | |
| lesson_plan_result_original = gr.Textbox(visible=False) | |
| lesson_plan_result = gr.Markdown(label="ๅๆฌก็ๆ็ตๆ", latex_delimiters = [{"left": "$", "right": "$", "display": False}]) | |
| lesson_plan_download_button = gr.Button("่ฝๆ word๏ผๅฎๆๅพ่ซ้ปๆๅณไธ่ง download ๆ้", variant="primary") | |
| lesson_plan_result_word_link = gr.File(label="Download Word") | |
| with gr.Tab("ๅบๅ ดๅธ"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| exit_ticket_content_type_name = gr.Textbox(value="exit_ticket", visible=False) | |
| exit_ticket_time = gr.Slider(label="้ธๆๅบๅ ดๅธๆ้(ๅ้)", minimum=5, maximum=10, step=1, value=8) | |
| exit_ticket_btn = gr.Button("็ๆๅบๅ ดๅธ ๐๏ธ", variant="primary", visible=True) | |
| with gr.Accordion("ๅพฎ่ชฟ", open=False): | |
| exit_ticket_result_fine_tune_prompt = gr.Textbox(label="ๆ นๆ็ตๆ๏ผ่ผธๅ ฅไฝ ๆณๆดๆน็ๆณๆณ") | |
| exit_ticket_result_fine_tune_btn = gr.Button("ๅพฎ่ชฟ็ตๆ", variant="primary") | |
| exit_ticket_result_retrun_original = gr.Button("่ฟๅๅๅง็ตๆ") | |
| with gr.Accordion("prompt", open=False) as exit_ticket_accordion: | |
| exit_ticket_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40) | |
| with gr.Column(scale=2): | |
| # ็ๆๅฐๆไธๅๆจกๅผ็็ตๆ | |
| exit_ticket_result_prompt = gr.Textbox(visible=False) | |
| exit_ticket_result_original = gr.Textbox(visible=False) | |
| exit_ticket_result = gr.Markdown(label="ๅๆฌก็ๆ็ตๆ", latex_delimiters = [{"left": "$", "right": "$", "display": False}]) | |
| exit_ticket_download_button = gr.Button("่ฝๆ word๏ผๅฎๆๅพ่ซ้ปๆๅณไธ่ง download ๆ้", variant="primary") | |
| exit_ticket_result_word_link = gr.File(label="Download Word") | |
| # with gr.Tab("็ด ้คๅฐๅ้ฑ่ฎ้ก็ต"): | |
| # literacy_oriented_reading_content = gr.Textbox(label="่ผธๅ ฅ้ฑ่ฎๆๆ") | |
| # literacy_oriented_reading_content_btn = gr.Button("็ๆ้ฑ่ฎ็่งฃ้ก") | |
| # with gr.Tab("่ชๆ่ฉไผฐ"): | |
| # self_assessment_content = gr.Textbox(label="่ผธๅ ฅ่ช่ฉๅๅทๆๆชขๆฅ่กจ") | |
| # self_assessment_content_btn = gr.Button("็ๆ่ช่ฉๅๅท") | |
| # with gr.Tab("่ชๆๅๆ่ฉ้"): | |
| # self_reflection_content = gr.Textbox(label="่ผธๅ ฅ่ชๆๅๆๆดปๅ") | |
| # self_reflection_content_btn = gr.Button("็ๆ่ชๆๅๆๆดปๅ") | |
| # with gr.Tab("ๅพ่จญ่ช็ฅ"): | |
| # metacognition_content = gr.Textbox(label="่ผธๅ ฅๅพ่จญ่ช็ฅ็ธ้ๅ้ก") | |
| # metacognition_content_btn = gr.Button("็ๆๅพ่จญ่ช็ฅๅ้ก") | |
| with gr.Accordion("ๅ ่ฒฌ่ฒๆ", open=True): | |
| gr.Markdown(""" | |
| ๆฌๅ งๅฎน็ฑAI่งฃๆไธฆ่ชๅ็ๆ๏ผๅไธๅนณๅฐ่ๅฝฑ็ไฝ่ ไธๅฐๅ งๅฎน็ๆบ็ขบๆงๅๅบไฟ่ญใๅปบ่ญฐๅจๅญธ็ฟๆๆ็จๅ๏ผๅ ่กๆฅ่ญไธฆ็ขบ่ช็ธ้่ณ่จ็ๆญฃ็ขบๆงใ | |
| ็ธ้่ฆ็ฏ่ซๅ่๏ผhttps://www.junyiacademy.org/event/jutor-policy/ | |
| """) | |
| with gr.Accordion("See Details", open=False) as see_details: | |
| with gr.Row(): | |
| is_env_prod = gr.Checkbox(value=False, label="is_env_prod") | |
| LLM_model = gr.Dropdown(label="LLM Model", choices=["open-ai-gpt-4o", "anthropic-claude-3-sonnet", "gemini-1.5-pro", "gemini-1.5-flash"], value="open-ai-gpt-4o", visible=True, interactive=True) | |
| with gr.Tab("้ๅญ็จฟๆฌๆ"): | |
| with gr.Row() as transcript_admmin: | |
| transcript_kind = gr.Textbox(value="transcript", show_label=False) | |
| transcript_get_button = gr.Button("ๅๅพ", size="sm", variant="primary") | |
| transcript_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary") | |
| transcript_update_button = gr.Button("ๅฒๅญ", size="sm", variant="primary") | |
| transcript_delete_button = gr.Button("ๅช้ค", size="sm", variant="primary") | |
| transcript_create_button = gr.Button("้ๅปบ", size="sm", variant="primary") | |
| with gr.Row(): | |
| df_string_output = gr.Textbox(lines=40, label="Data Text", interactive=False, show_copy_button=True) | |
| with gr.Tab("ๆ็ซ ๆฌๆ"): | |
| with gr.Row() as reading_passage_admin: | |
| with gr.Column(): | |
| with gr.Row(): | |
| reading_passage_kind = gr.Textbox(value="reading_passage_latex", show_label=False) | |
| with gr.Row(): | |
| # reading_passage_text_to_latex = gr.Button("ๆฐๅข LaTeX", size="sm", variant="primary") | |
| reading_passage_get_button = gr.Button("ๅๅพ", size="sm", variant="primary") | |
| reading_passage_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary") | |
| reading_passage_update_button = gr.Button("ๅฒๅญ", size="sm", variant="primary") | |
| reading_passage_delete_button = gr.Button("ๅช้ค", size="sm", variant="primary") | |
| reading_passage_create_button = gr.Button("้ๅปบ", size="sm", variant="primary") | |
| with gr.Row(): | |
| reading_passage_text = gr.Textbox(label="reading_passage_latex", lines=40, interactive=False, show_copy_button=True) | |
| with gr.Tab("้้ปๆ่ฆๆฌๆ"): | |
| with gr.Row() as summary_admmin: | |
| with gr.Column(): | |
| with gr.Row(): | |
| summary_kind = gr.Textbox(value="summary_markdown", show_label=False) | |
| with gr.Row(): | |
| # summary_to_markdown = gr.Button("ๆฐๅข Markdown", size="sm", variant="primary") | |
| summary_get_button = gr.Button("ๅๅพ", size="sm", variant="primary") | |
| summary_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary") | |
| summary_update_button = gr.Button("ๅฒๅญ", size="sm", variant="primary") | |
| summary_delete_button = gr.Button("ๅช้ค", size="sm", variant="primary") | |
| summary_create_button = gr.Button("้ๅปบ", size="sm", variant="primary") | |
| with gr.Row(): | |
| summary_text = gr.Textbox(label="summary_markdown", lines=40, interactive=False, show_copy_button=True) | |
| with gr.Tab("้้ตๆๅปๆฌๆ"): | |
| with gr.Row() as key_moments_admin: | |
| key_moments_kind = gr.Textbox(value="key_moments", show_label=False) | |
| key_moments_get_button = gr.Button("ๅๅพ", size="sm", variant="primary") | |
| key_moments_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary") | |
| key_moments_update_button = gr.Button("ๅฒๅญ", size="sm", variant="primary") | |
| key_moments_delete_button = gr.Button("ๅช้ค", size="sm", variant="primary") | |
| key_moments_create_button = gr.Button("้ๅปบ", size="sm", variant="primary") | |
| with gr.Row(): | |
| key_moments = gr.Textbox(label="Key Moments", lines=40, interactive=False, show_copy_button=True) | |
| with gr.Tab("ๅ้กๆฌๆ"): | |
| with gr.Row() as question_list_admin: | |
| questions_kind = gr.Textbox(value="questions", show_label=False) | |
| questions_get_button = gr.Button("ๅๅพ", size="sm", variant="primary") | |
| questions_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary") | |
| questions_update_button = gr.Button("ๅฒๅญ", size="sm", variant="primary") | |
| questions_delete_button = gr.Button("ๅช้ค", size="sm", variant="primary") | |
| questions_create_button = gr.Button("้ๅปบ", size="sm", variant="primary") | |
| with gr.Row(): | |
| questions_json = gr.Textbox(label="Questions", lines=40, interactive=False, show_copy_button=True) | |
| with gr.Tab("ๅ้ก็ญๆกๆฌๆ"): | |
| with gr.Row() as questions_answers_admin: | |
| questions_answers_kind = gr.Textbox(value="questions_answers", show_label=False) | |
| questions_answers_get_button = gr.Button("ๅๅพ", size="sm", variant="primary") | |
| questions_answers_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary") | |
| questions_answers_update_button = gr.Button("ๅฒๅญ", size="sm", variant="primary") | |
| questions_answers_delete_button = gr.Button("ๅช้ค", size="sm", variant="primary") | |
| questions_answers_create_button = gr.Button("้ๅปบ", size="sm", variant="primary") | |
| with gr.Row(): | |
| questions_answers_json = gr.Textbox(label="Questions Answers", lines=40, interactive=False, show_copy_button=True) | |
| with gr.Tab("ๆๅญธๅ่ชฒ"): | |
| with gr.Row() as worksheet_admin: | |
| worksheet_kind = gr.Textbox(value="ai_content_list", show_label=False) | |
| worksheet_get_button = gr.Button("ๅๅพ", size="sm", variant="primary") | |
| worksheet_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary") | |
| worksheet_update_button = gr.Button("ๅฒๅญ", size="sm", variant="primary") | |
| worksheet_delete_button = gr.Button("ๅช้ค", size="sm", variant="primary") | |
| worksheet_create_button = gr.Button("้ๅปบ(X)", size="sm", variant="primary", interactive=False) | |
| with gr.Row(): | |
| worksheet_json = gr.Textbox(label="worksheet", lines=40, interactive=False, show_copy_button=True) | |
| with gr.Tab("้ๅญ็จฟ"): | |
| simple_html_content = gr.HTML(label="Simple Transcript") | |
| with gr.Tab("ๅๆ"): | |
| transcript_html = gr.HTML(label="YouTube Transcript and Video") | |
| with gr.Tab("markdown"): | |
| gr.Markdown("## ่ซ่ค่ฃฝไปฅไธ markdown ไธฆ่ฒผๅฐไฝ ็ๅฟๆบๅๅทฅๅ ทไธญ๏ผๅปบ่ญฐไฝฟ็จ๏ผhttps://markmap.js.org/repl") | |
| mind_map = gr.Textbox(container=True, show_copy_button=True, lines=40, elem_id="mind_map_markdown") | |
| with gr.Accordion("refresh all", open=False): | |
| with gr.Row(): | |
| gr.Markdown("## ๆธ ๅฎๅฝฑ็๏ผ้ๆฐ็ๆๆๆๅ งๅฎน") | |
| with gr.Row(): | |
| # tab refresh_video_ids & by sheets | |
| with gr.Tab("refresh_video_ids"): | |
| refresh_video_ids = gr.Textbox(label="่ผธๅ ฅๅฝฑ็ id๏ผไปฅ , ้่ๅ้") | |
| refresh_btn = gr.Button("refresh", variant="primary") | |
| with gr.Tab("by sheets"): | |
| sheet_url = gr.Textbox(label="่ผธๅ ฅ Google Sheets ็ URL") | |
| sheet_video_column = gr.Textbox(label="่ผธๅ ฅ่ฆ่ฎๅ็ youtube_id ๆฌไฝ", value="D2:D") | |
| sheet_QA_column = gr.Textbox(label="่ผธๅ ฅ่ฆ่ฎๅ็ QA ๆฌไฝ", value="F2:F") | |
| sheet_get_value_btn = gr.Button("ๅๅพ ids", variant="primary") | |
| sheet_get_value_result = gr.Textbox(label="ids", interactive=False) | |
| sheet_refresh_btn = gr.Button("refresh by sheets", variant="primary") | |
| with gr.Row(): | |
| refresh_result = gr.JSON() | |
| refresh_btn.click( | |
| lambda: gr.update(interactive=False), | |
| inputs=[], | |
| outputs=[refresh_btn] | |
| ).then( | |
| refresh_video_LLM_all_content_by_ids, | |
| inputs=[refresh_video_ids], | |
| outputs=[refresh_result] | |
| ) | |
| sheet_get_value_btn.click( | |
| get_sheet_data, | |
| inputs=[sheet_url, sheet_video_column], | |
| outputs=[sheet_get_value_result] | |
| ) | |
| sheet_refresh_btn.click( | |
| lambda: gr.update(interactive=False), | |
| inputs=[], | |
| outputs=[sheet_refresh_btn] | |
| ).then( | |
| refresh_video_LLM_all_content_by_sheet, | |
| inputs=[sheet_url, sheet_QA_column, sheet_get_value_result], | |
| outputs=[refresh_result] | |
| ) | |
| # OPEN AI CHATBOT SELECT | |
| chatbot_select_outputs=[ | |
| chatbot_select_accordion, | |
| all_chatbot_select_btn, | |
| chatbot_open_ai_streaming, | |
| chatbot_ai, | |
| ai_name, | |
| ai_chatbot_ai_type, | |
| ai_chatbot_thread_id | |
| ] | |
| # ่ๅคฉๆบๅจไบบ็้ ็ฝฎๆฐๆฎ | |
| chatbots = [ | |
| { | |
| "button": vaitor_chatbot_select_btn, | |
| "name_state": chatbot_open_ai_name, | |
| "avatar_images": vaitor_chatbot_avatar_images, | |
| "description_value": vaitor_chatbot_description_value, | |
| "chatbot_select_outputs": chatbot_select_outputs, | |
| "chatbot_output": ai_chatbot | |
| }, | |
| { | |
| "button": foxcat_chatbot_select_btn, | |
| "name_state": foxcat_chatbot_name, | |
| "avatar_images": foxcat_avatar_images, | |
| "description_value": foxcat_chatbot_description_value, | |
| "chatbot_select_outputs": chatbot_select_outputs, | |
| "chatbot_output": ai_chatbot | |
| }, | |
| { | |
| "button": lili_chatbot_select_btn, | |
| "name_state": lili_chatbot_name, | |
| "avatar_images": lili_avatar_images, | |
| "description_value": lili_chatbot_description_value, | |
| "chatbot_select_outputs": chatbot_select_outputs, | |
| "chatbot_output": ai_chatbot | |
| }, | |
| { | |
| "button": maimai_chatbot_select_btn, | |
| "name_state": maimai_chatbot_name, | |
| "avatar_images": maimai_avatar_images, | |
| "description_value": maimai_chatbot_description_value, | |
| "chatbot_select_outputs": chatbot_select_outputs, | |
| "chatbot_output": ai_chatbot | |
| } | |
| ] | |
| def setup_chatbot_select_button(chatbot_dict): | |
| button = chatbot_dict["button"] | |
| chatbot_name_state = chatbot_dict["name_state"] | |
| avatar_images = chatbot_dict["avatar_images"] | |
| description_value = chatbot_dict["description_value"] | |
| chatbot_select_outputs = chatbot_dict["chatbot_select_outputs"] | |
| chatbot_output = chatbot_dict["chatbot_output"] | |
| button.click( | |
| chatbot_select, # ไฝ ๅฏ่ฝ้่ฆไฟฎๆน่ฟไธชๅฝๆฐไปฅ้ๅบๅฝๅ็้ป่พ | |
| inputs=[chatbot_name_state], | |
| outputs=chatbot_select_outputs | |
| ).then( | |
| update_avatar_images, | |
| inputs=[avatar_images, description_value], | |
| outputs=[chatbot_output], | |
| scroll_to_output=True | |
| ) | |
| for chatbot_dict in chatbots: | |
| setup_chatbot_select_button(chatbot_dict) | |
| # STREAMING CHATBOT SELECT | |
| chatbot_open_ai_streaming_select_btn.click( | |
| chatbot_select, | |
| inputs=[chatbot_open_ai_streaming_name], | |
| outputs=chatbot_select_outputs | |
| ).then( | |
| create_thread_id, | |
| inputs=[], | |
| outputs=[streaming_chat_thread_id_state] | |
| ) | |
| # ALL CHATBOT SELECT LIST | |
| all_chatbot_select_btn.click( | |
| show_all_chatbot_accordion, | |
| inputs=[], | |
| outputs=[chatbot_select_accordion, all_chatbot_select_btn] | |
| ) | |
| # OPENAI ASSISTANT CHATBOT ้ฃๆฅๆ้้ปๆไบไปถ | |
| def setup_question_button_click(button, inputs_list, outputs_list, chat_func, scroll_to_output=True): | |
| button.click( | |
| chat_func, | |
| inputs=inputs_list, | |
| outputs=outputs_list, | |
| scroll_to_output=scroll_to_output | |
| ) | |
| # ๅ ถไป็ฒพ้ ai_chatbot ๆจกๅผ | |
| ai_send_button.click( | |
| chat_with_any_ai, | |
| inputs=[ai_chatbot_ai_type, password, video_id, user_data, trascript_state, key_moments, ai_msg, ai_chatbot, content_subject, content_grade, questions_answers_json, ai_chatbot_socratic_mode_btn, ai_chatbot_thread_id, ai_name], | |
| outputs=[ai_msg, ai_chatbot, ai_send_button, ai_send_feedback_btn, ai_chatbot_thread_id], | |
| scroll_to_output=True | |
| ) | |
| ai_send_feedback_btn.click( | |
| feedback_with_ai, | |
| inputs=[user_data, ai_chatbot_ai_type, ai_chatbot, ai_chatbot_thread_id], | |
| outputs=[ai_chatbot, ai_send_feedback_btn], | |
| scroll_to_output=True | |
| ) | |
| # ๅ ถไป็ฒพ้ ai_chatbot ่ฟๆฅ QA ๆ้ฎ็นๅปไบไปถ | |
| ai_chatbot_question_buttons = [ai_chatbot_question_1, ai_chatbot_question_2, ai_chatbot_question_3] | |
| for question_btn in ai_chatbot_question_buttons: | |
| inputs_list = [ai_chatbot_ai_type, password, video_id, user_data, trascript_state, key_moments, question_btn, ai_chatbot, content_subject, content_grade, questions_answers_json, ai_chatbot_socratic_mode_btn, ai_chatbot_thread_id, ai_name] | |
| outputs_list = [ai_msg, ai_chatbot, ai_send_button, ai_send_feedback_btn, ai_chatbot_thread_id] | |
| setup_question_button_click(question_btn, inputs_list, outputs_list, chat_with_any_ai) | |
| # ็บ็ๆๅ้กๆ้่จญๅฎ็นๆฎ็้ปๆไบไปถ | |
| question_buttons = [ | |
| ai_chatbot_question_1, | |
| ai_chatbot_question_2, | |
| ai_chatbot_question_3 | |
| ] | |
| create_questions_btn.click( | |
| change_questions, | |
| inputs=[password, df_string_output], | |
| outputs=question_buttons | |
| ) | |
| ai_chatbot_audio_input.change( | |
| process_open_ai_audio_to_chatbot, | |
| inputs=[password, ai_chatbot_audio_input], | |
| outputs=[ai_msg] | |
| ) | |
| # ๅฝ่พๅ ฅ YouTube ้พๆฅๆถ่งฆๅ | |
| process_youtube_link_inputs = [password, youtube_link, LLM_model] | |
| process_youtube_link_outputs = [ | |
| video_id, | |
| questions_answers_json, | |
| df_string_output, | |
| summary_text, | |
| df_summarise, | |
| key_moments, | |
| key_moments_html, | |
| mind_map, | |
| mind_map_html, | |
| transcript_html, | |
| simple_html_content, | |
| reading_passage_text, | |
| reading_passage, | |
| content_subject, | |
| content_grade, | |
| ] | |
| update_state_inputs = [ | |
| content_subject, | |
| content_grade, | |
| df_string_output, | |
| key_moments, | |
| questions_answers_json, | |
| ] | |
| update_state_outputs = [ | |
| content_subject_state, | |
| content_grade_state, | |
| trascript_state, | |
| key_moments_state, | |
| streaming_chat_thread_id_state, | |
| ai_chatbot_question_1, | |
| ai_chatbot_question_2, | |
| ai_chatbot_question_3 | |
| ] | |
| junyi_link.input( | |
| process_junyi_link_to_youtube_link, | |
| inputs=[junyi_link], | |
| outputs=[youtube_link] | |
| ).then( | |
| process_youtube_link, | |
| inputs=process_youtube_link_inputs, | |
| outputs=process_youtube_link_outputs | |
| ).then( | |
| update_state, | |
| inputs=update_state_inputs, | |
| outputs=update_state_outputs | |
| ) | |
| youtube_link.input( | |
| process_youtube_link, | |
| inputs=process_youtube_link_inputs, | |
| outputs=process_youtube_link_outputs | |
| ).then( | |
| update_state, | |
| inputs=update_state_inputs, | |
| outputs=update_state_outputs | |
| ) | |
| youtube_link_btn.click( | |
| process_youtube_link, | |
| inputs=process_youtube_link_inputs, | |
| outputs=process_youtube_link_outputs | |
| ).then( | |
| update_state, | |
| inputs=update_state_inputs, | |
| outputs=update_state_outputs | |
| ) | |
| # --- CRUD admin --- | |
| def setup_content_buttons(buttons_config): | |
| for config in buttons_config: | |
| button = config['button'] | |
| action = config['action'] | |
| inputs = config['inputs'] | |
| outputs = config['outputs'] | |
| button.click( | |
| fn=action, | |
| inputs=inputs, | |
| outputs=outputs | |
| ) | |
| content_buttons_config = [ | |
| # Transcript actions | |
| { | |
| 'button': transcript_get_button, | |
| 'action': get_LLM_content, | |
| 'inputs': [video_id, transcript_kind], | |
| 'outputs': [df_string_output] | |
| }, | |
| { | |
| 'button': transcript_create_button, | |
| 'action': create_LLM_content, | |
| 'inputs': [video_id, df_string_output, transcript_kind, LLM_model], | |
| 'outputs': [df_string_output] | |
| }, | |
| { | |
| 'button': transcript_delete_button, | |
| 'action': delete_LLM_content, | |
| 'inputs': [video_id, transcript_kind], | |
| 'outputs': [df_string_output] | |
| }, | |
| { | |
| 'button': transcript_edit_button, | |
| 'action': enable_edit_mode, | |
| 'inputs': [], | |
| 'outputs': [df_string_output] | |
| }, | |
| { | |
| 'button': transcript_update_button, | |
| 'action': update_LLM_content, | |
| 'inputs': [video_id, df_string_output, transcript_kind], | |
| 'outputs': [df_string_output] | |
| }, | |
| # Reading passage actions | |
| { | |
| 'button': reading_passage_get_button, | |
| 'action': get_LLM_content, | |
| 'inputs': [video_id, reading_passage_kind], | |
| 'outputs': [reading_passage_text] | |
| }, | |
| { | |
| 'button': reading_passage_create_button, | |
| 'action': create_LLM_content, | |
| 'inputs': [video_id, df_string_output, reading_passage_kind, LLM_model], | |
| 'outputs': [reading_passage_text] | |
| }, | |
| { | |
| 'button': reading_passage_delete_button, | |
| 'action': delete_LLM_content, | |
| 'inputs': [video_id, reading_passage_kind], | |
| 'outputs': [reading_passage_text] | |
| }, | |
| { | |
| 'button': reading_passage_edit_button, | |
| 'action': enable_edit_mode, | |
| 'inputs': [], | |
| 'outputs': [reading_passage_text] | |
| }, | |
| { | |
| 'button': reading_passage_update_button, | |
| 'action': update_LLM_content, | |
| 'inputs': [video_id, reading_passage_text, reading_passage_kind], | |
| 'outputs': [reading_passage_text] | |
| }, | |
| # Summary actions | |
| { | |
| 'button': summary_get_button, | |
| 'action': get_LLM_content, | |
| 'inputs': [video_id, summary_kind], | |
| 'outputs': [summary_text] | |
| }, | |
| { | |
| 'button': summary_create_button, | |
| 'action': create_LLM_content, | |
| 'inputs': [video_id, df_string_output, summary_kind, LLM_model], | |
| 'outputs': [summary_text] | |
| }, | |
| { | |
| 'button': summary_delete_button, | |
| 'action': delete_LLM_content, | |
| 'inputs': [video_id, summary_kind], | |
| 'outputs': [summary_text] | |
| }, | |
| { | |
| 'button': summary_edit_button, | |
| 'action': enable_edit_mode, | |
| 'inputs': [], | |
| 'outputs': [summary_text] | |
| }, | |
| { | |
| 'button': summary_update_button, | |
| 'action': update_LLM_content, | |
| 'inputs': [video_id, summary_text, summary_kind], | |
| 'outputs': [summary_text] | |
| }, | |
| # Key moments actions | |
| { | |
| 'button': key_moments_get_button, | |
| 'action': get_LLM_content, | |
| 'inputs': [video_id, key_moments_kind], | |
| 'outputs': [key_moments] | |
| }, | |
| { | |
| 'button': key_moments_create_button, | |
| 'action': create_LLM_content, | |
| 'inputs': [video_id, df_string_output, key_moments_kind, LLM_model], | |
| 'outputs': [key_moments] | |
| }, | |
| { | |
| 'button': key_moments_delete_button, | |
| 'action': delete_LLM_content, | |
| 'inputs': [video_id, key_moments_kind], | |
| 'outputs': [key_moments] | |
| }, | |
| { | |
| 'button': key_moments_edit_button, | |
| 'action': enable_edit_mode, | |
| 'inputs': [], | |
| 'outputs': [key_moments] | |
| }, | |
| { | |
| 'button': key_moments_update_button, | |
| 'action': update_LLM_content, | |
| 'inputs': [video_id, key_moments, key_moments_kind], | |
| 'outputs': [key_moments] | |
| }, | |
| # Questions actions | |
| { | |
| 'button': questions_get_button, | |
| 'action': get_LLM_content, | |
| 'inputs': [video_id, questions_kind], | |
| 'outputs': [questions_json] | |
| }, | |
| { | |
| 'button': questions_create_button, | |
| 'action': create_LLM_content, | |
| 'inputs': [video_id, df_string_output, questions_kind, LLM_model], | |
| 'outputs': [questions_json] | |
| }, | |
| { | |
| 'button': questions_delete_button, | |
| 'action': delete_LLM_content, | |
| 'inputs': [video_id, questions_kind], | |
| 'outputs': [questions_json] | |
| }, | |
| { | |
| 'button': questions_edit_button, | |
| 'action': enable_edit_mode, | |
| 'inputs': [], | |
| 'outputs': [questions_json] | |
| }, | |
| { | |
| 'button': questions_update_button, | |
| 'action': update_LLM_content, | |
| 'inputs': [video_id, questions_json, questions_kind], | |
| 'outputs': [questions_json] | |
| }, | |
| # Questions answers actions | |
| { | |
| 'button': questions_answers_get_button, | |
| 'action': get_LLM_content, | |
| 'inputs': [video_id, questions_answers_kind], | |
| 'outputs': [questions_answers_json] | |
| }, | |
| { | |
| 'button': questions_answers_create_button, | |
| 'action': create_LLM_content, | |
| 'inputs': [video_id, df_string_output, questions_answers_kind, LLM_model], | |
| 'outputs': [questions_answers_json] | |
| }, | |
| { | |
| 'button': questions_answers_delete_button, | |
| 'action': delete_LLM_content, | |
| 'inputs': [video_id, questions_answers_kind], | |
| 'outputs': [questions_answers_json] | |
| }, | |
| { | |
| 'button': questions_answers_edit_button, | |
| 'action': enable_edit_mode, | |
| 'inputs': [], | |
| 'outputs': [questions_answers_json] | |
| }, | |
| { | |
| 'button': questions_answers_update_button, | |
| 'action': update_LLM_content, | |
| 'inputs': [video_id, questions_answers_json, questions_answers_kind], | |
| 'outputs': [questions_answers_json] | |
| }, | |
| # Worksheet actions | |
| { | |
| 'button': worksheet_get_button, | |
| 'action': get_LLM_content, | |
| 'inputs': [video_id, worksheet_kind], | |
| 'outputs': [worksheet_json] | |
| }, | |
| { | |
| 'button': worksheet_create_button, | |
| 'action': create_LLM_content, | |
| 'inputs': [video_id, df_string_output, worksheet_kind, LLM_model], | |
| 'outputs': [worksheet_json] | |
| }, | |
| { | |
| 'button': worksheet_delete_button, | |
| 'action': delete_LLM_content, | |
| 'inputs': [video_id, worksheet_kind], | |
| 'outputs': [worksheet_json] | |
| }, | |
| { | |
| 'button': worksheet_edit_button, | |
| 'action': enable_edit_mode, | |
| 'inputs': [], | |
| 'outputs': [worksheet_json] | |
| }, | |
| { | |
| 'button': worksheet_update_button, | |
| 'action': update_LLM_content, | |
| 'inputs': [video_id, worksheet_json, worksheet_kind], | |
| 'outputs': [worksheet_json] | |
| }, | |
| ] | |
| setup_content_buttons(content_buttons_config) | |
| # --- Education Material --- | |
| def setup_education_buttons(buttons_config): | |
| for config in buttons_config: | |
| button = config["button"] | |
| action = config["action"] | |
| inputs = config["inputs"] | |
| outputs = config["outputs"] | |
| button.click( | |
| fn=action, | |
| inputs=inputs, | |
| outputs=outputs | |
| ) | |
| education_buttons_config = [ | |
| # ๅญธ็ฟๅฎ็ธ้ๆ้ | |
| { | |
| "button": worksheet_content_btn, | |
| "action": get_ai_content, | |
| "inputs": [password, user_data, video_id, df_string_output, content_subject, content_grade, content_level, worksheet_algorithm, worksheet_content_type_name], | |
| "outputs": [worksheet_result_original, worksheet_result, worksheet_prompt, worksheet_result_prompt] | |
| }, | |
| { | |
| "button": worksheet_result_fine_tune_btn, | |
| "action": generate_ai_content_fine_tune_result, | |
| "inputs": [password, user_data, worksheet_result_prompt, df_string_output, worksheet_result, worksheet_result_fine_tune_prompt, worksheet_content_type_name], | |
| "outputs": [worksheet_result] | |
| }, | |
| { | |
| "button": worksheet_download_button, | |
| "action": download_exam_result, | |
| "inputs": [worksheet_result], | |
| "outputs": [worksheet_result_word_link] | |
| }, | |
| { | |
| "button": worksheet_result_retrun_original, | |
| "action": return_original_exam_result, | |
| "inputs": [worksheet_result_original], | |
| "outputs": [worksheet_result] | |
| }, | |
| # ๆๆก็ธ้ๆ้ | |
| { | |
| "button": lesson_plan_btn, | |
| "action": get_ai_content, | |
| "inputs": [password, user_data, video_id, df_string_output, content_subject, content_grade, content_level, lesson_plan_time, lesson_plan_content_type_name], | |
| "outputs": [lesson_plan_result_original, lesson_plan_result, lesson_plan_prompt, lesson_plan_result_prompt] | |
| }, | |
| { | |
| "button": lesson_plan_result_fine_tune_btn, | |
| "action": generate_ai_content_fine_tune_result, | |
| "inputs": [password, user_data, lesson_plan_result_prompt, df_string_output, lesson_plan_result, lesson_plan_result_fine_tune_prompt, lesson_plan_content_type_name], | |
| "outputs": [lesson_plan_result] | |
| }, | |
| { | |
| "button": lesson_plan_download_button, | |
| "action": download_exam_result, | |
| "inputs": [lesson_plan_result], | |
| "outputs": [lesson_plan_result_word_link] | |
| }, | |
| { | |
| "button": lesson_plan_result_retrun_original, | |
| "action": return_original_exam_result, | |
| "inputs": [lesson_plan_result_original], | |
| "outputs": [lesson_plan_result] | |
| }, | |
| # ๅบๅ ดๅธ็ธ้ๆ้ | |
| { | |
| "button": exit_ticket_btn, | |
| "action": get_ai_content, | |
| "inputs": [password, user_data, video_id, df_string_output, content_subject, content_grade, content_level, exit_ticket_time, exit_ticket_content_type_name], | |
| "outputs": [exit_ticket_result_original, exit_ticket_result, exit_ticket_prompt, exit_ticket_result_prompt] | |
| }, | |
| { | |
| "button": exit_ticket_result_fine_tune_btn, | |
| "action": generate_ai_content_fine_tune_result, | |
| "inputs": [password, user_data, exit_ticket_result_prompt, df_string_output, exit_ticket_result, exit_ticket_result_fine_tune_prompt, exit_ticket_content_type_name], | |
| "outputs": [exit_ticket_result] | |
| }, | |
| { | |
| "button": exit_ticket_download_button, | |
| "action": download_exam_result, | |
| "inputs": [exit_ticket_result], | |
| "outputs": [exit_ticket_result_word_link] | |
| }, | |
| { | |
| "button": exit_ticket_result_retrun_original, | |
| "action": return_original_exam_result, | |
| "inputs": [exit_ticket_result_original], | |
| "outputs": [exit_ticket_result] | |
| } | |
| ] | |
| setup_education_buttons(education_buttons_config) | |
| # init_params | |
| init_outputs = [ | |
| admin, | |
| reading_passage_admin, | |
| summary_admmin, | |
| see_details, | |
| worksheet_accordion, | |
| lesson_plan_accordion, | |
| exit_ticket_accordion, | |
| password, | |
| youtube_link, | |
| block_ready_flag, | |
| chatbot_open_ai_streaming, | |
| chatbot_ai, | |
| ai_chatbot_params, | |
| is_env_prod, | |
| ] | |
| demo.load( | |
| init_params, | |
| inputs =[youtube_link], | |
| outputs = init_outputs | |
| ) | |
| app = gr.mount_gradio_app( | |
| app, demo, "/", server_name="0.0.0.0", server_port=7860, show_error=True | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 7860)) | |
| app = create_app() | |
| uvicorn.run(app, host="0.0.0.0", port=port) |