import gradio as gr import gspread from openai import OpenAI import os import time import json from google.cloud import storage from google.oauth2 import service_account from googleapiclient.http import MediaIoBaseDownload from storage_service import GoogleCloudStorage import csv import io import fitz # PyMuPDF import base64 from PIL import Image is_env_local = os.getenv("IS_ENV_LOCAL", "false") == "true" print(f"is_env_local: {is_env_local}") if is_env_local: with open("local_config.json") as f: config = json.load(f) PASSWORD = config["PASSWORD"] OPEN_AI_KEY = config["OPEN_AI_KEY"] GCS_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"]) GSHEET_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"]) else: PASSWORD = os.getenv("PASSWORD") OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") GCS_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") GSHEET_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") OPEN_AI_CLIENT = OpenAI(api_key=OPEN_AI_KEY) GCS_SERVICE = GoogleCloudStorage(GCS_KEY) GCS_CLIENT = GCS_SERVICE.client bucket_name = 'ai_question_to_image' bucket = GCS_CLIENT.bucket(bucket_name) GSHEET_KEY_DICT = json.loads(GSHEET_KEY) sheets_client = gspread.service_account_from_dict(GSHEET_KEY_DICT) CSV_DATA = [] # 函数定义 def upload_image_to_gcs(image_data, bucket): # Generate a unique filename unique_filename = f"{int(time.time())}_image.jpg" blob = bucket.blob(unique_filename) # If image_data is a BytesIO object, upload directly if isinstance(image_data, io.BytesIO): blob.upload_from_file(image_data, content_type='image/jpeg') else: # If it's a file path, open and upload with open(image_data, "rb") as image_file: blob.upload_from_file(image_file) blob.make_public() print("======upload_image_to_gcs=====") print(f"File uploaded to {unique_filename} in GCS.") return blob.public_url def process_image(image_url): print("處理圖片:", image_url) text = image_to_text(image_url) print("======image_to_text=====") print(text) print("========================") question_json = json.loads(text_to_json(text)) print("======text_to_json=====") print(question_json) print("========================") return text, question_json def image_to_text(url): user_prompt = """ 請解讀題目圖片: - 圖片請一定要用 zh-TW 解讀 - [數學用語、題目內的數字、選項上的數字、 數學符號、物理化學符號、英文單字] 請一定要用 LATEX markdown 語法(前後用 $ 包起來),LATEX 這很重要 輸出為 1. 題號: 2. 題目: 3. 選項: 4. 答案:(到選項裡面挑選一個最合理的選項) ex: (A) 或 (B) 或 (C) 或 (D) 5. 解題說明: 1. 步驟一, 2. 步驟二, 3. 步驟三....(最少三個步驟,最多五步驟),最後一個步驟 format 為: 答案選: (A) 或 (B) 或 (C) 或 (D) (選項用LATEX color:fuchsia, mbox) """ response = OPEN_AI_CLIENT.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": [ { "type": "text", "text": user_prompt }, { "type": "image_url", "image_url": { "url": url, }, }, ], } ], max_tokens=4000, ) return response.choices[0].message.content def text_to_json(text): text = str(text) system_prompt = """ 你是專業的轉譯器,看得懂題目,並保留 LATEX 語法($...$),請轉成 json 格式 """ user_prompt = """ 將以內容轉成 json,並保留 latex 語法($...$),請一定要用 LATEX markdown 語法(前後用 $ 包起來的形式) 包含 q_id, question 跟 choice 1~4, answer, hint 1~5 { "q_id" : 1, "question": ......., "choice_1": ...., "choice_2": .... , "choice_3": ...., "choice_4": ...., "answer": ...., "hint_1": ...., "hint_2": ...., "hint_3": ...., "hint_4": ...., "hint_5": .... } 內容如下: """ user_prompt += text response_to_json = OPEN_AI_CLIENT.chat.completions.create( model="gpt-4o", response_format={ "type": "json_object" }, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], max_tokens=4000, ) result = response_to_json.choices[0].message.content return result def build_perseus_json(question_json): question = question_json['question'] choice_1 = question_json['choice_1'] choice_2 = question_json['choice_2'] choice_3 = question_json['choice_3'] choice_4 = question_json['choice_4'] hints = [] for i in range(1, 6): hint_key = f'hint_{i}' if hint_key in question_json: hints.append({"content": question_json[hint_key], "images": {}, "widgets": {}}) else: break perseus_text = """{ "correct_nxt_qid": null, "wrong_nxt_qid": null, "itemDataVersion": { "major": 0, "minor": 1 }, "question": { "content": "", "images": {}, "widgets": { "radio 1": { "version": { "major": 0, "minor": 0 }, "type": "radio", "graded": true, "options": { "onePerLine": true, "noneOfTheAbove": false, "choices": [ {"content": "", "correct": false}, {"content": "", "correct": false}, {"content": "", "correct": false}, {"content": "", "correct": false} ], "displayCount": null, "multipleSelect": false, "randomize": false } } } }, "answerArea": { "calculator": false, "type": "multiple", "options": {} }, "is_start": false, "hints": [] }""" perseus_json = json.loads(perseus_text) widget = "\n\n[[☃ radio 1]]" perseus_json["question"]["content"] = question + widget perseus_json["question"]["widgets"]["radio 1"]["options"]["choices"][0]["content"] = "$\\mbox{(A)}$ " + choice_1 perseus_json["question"]["widgets"]["radio 1"]["options"]["choices"][1]["content"] = "$\\mbox{(B)}$ " + choice_2 perseus_json["question"]["widgets"]["radio 1"]["options"]["choices"][2]["content"] = "$\\mbox{(C)}$ " + choice_3 perseus_json["question"]["widgets"]["radio 1"]["options"]["choices"][3]["content"] = "$\\mbox{(D)}$ " + choice_4 perseus_json["hints"] = hints perseus_json_str = json.dumps(perseus_json) return perseus_json_str def create_csv(processed_data): # 设定一个可写的目录路径 writable_directory = "/tmp/csv_files" if not os.path.exists(writable_directory): os.makedirs(writable_directory) # 如果目录不存在,创它 timestamp = int(time.time()) file_name = f"csv_{timestamp}.csv" file_path = os.path.join(writable_directory, file_name) # 创建并写入 CSV 文件 with open(file_path, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) # 写入标题行 headers = ["圖片URL", "文字", "題號", "題目", "選項1", "選項2", "選項3", "選項4", "答案", "提示1", "提示2", "提示3", "提示4", "提示5", "Perseus JSON"] writer.writerow(headers) # 写入数据行 for row in processed_data: writer.writerow(row) return file_path def process_single_image(image): if isinstance(image, str) and image.startswith("http"): # If the image is a URL, use it directly image_url = image elif isinstance(image, str): # If the image is a file path image_url = upload_image_to_gcs(image, bucket) else: # If the image is an image object (from gr.Image) temp_file_path = "/tmp/temp_image.png" if isinstance(image, np.ndarray): # Convert the NumPy array to a PIL image image = Image.fromarray(image) image.save(temp_file_path) image_url = upload_image_to_gcs(temp_file_path, bucket) text, question_json = process_image(image_url) perseus_json_str = build_perseus_json(question_json) return image_url, text, question_json, perseus_json_str def process_image_to_data(password, images): if password != PASSWORD: raise gr.Error("密码错误,请重新输入") processed_data = [] if isinstance(images, list): for image in images: image_url, text, question_json, perseus_json_str = process_single_image(image) processed_data.append([image_url, text] + list(question_json.values()) + [perseus_json_str]) print("======process_and_upload=====") print("image_url:", image_url) else: image_url, text, question_json, perseus_json_str = process_single_image(images) processed_data.append([image_url, text] + list(question_json.values()) + [perseus_json_str]) print("======process_and_upload=====") print("image_url:", image_url) question_count = len(processed_data) result = f"圖片處理完成總共完成 {question_count} 道題目" csv_file_path = create_csv(processed_data) return processed_data, result, csv_file_path def show_single_question_image(data): if len(data) == 0: return "" question_json = data.iloc[0].to_dict() # 確保訪問的是 DataFrame 的第一行並轉換為字典 image_url = question_json['圖片URL'] return image_url def show_single_question_markdown(data): if len(data) == 0: return "" question_json = data.iloc[0].to_dict() # 確保訪問的是 DataFrame 的第一行並轉換為字典 question = question_json['題目'] choice_1 = question_json['選項1'] choice_2 = question_json['選項2'] choice_3 = question_json['選項3'] choice_4 = question_json['選項4'] answer = question_json['答案'] hints = [] for i in range(1, 6): hint_key = question_json.get(f'提示{i}', None) if hint_key: hints.append(hint_key) else: break markdown = f""" ## 題目 - {question} ## 選項 1. {choice_1} 2. {choice_2} 3. {choice_3} 4. {choice_4} ## 答案: {answer} ## 提示 """ for i, hint in enumerate(hints): markdown += f"{i+1}. {hint}\n" return markdown # ====== PDF 處理 ====== def convert_pdf_to_images(pdf_path): doc = fitz.open(pdf_path) image_paths = [] for page_num in range(len(doc)): page = doc.load_page(page_num) # number of page pix = page.get_pixmap() image_path = f"/tmp/temp_image_{page_num}.png" pix.save(image_path) image_paths.append(image_path) return image_paths def process_pdf_to_data(password, pdf_file): if password != PASSWORD: raise gr.Error("密碼错误,请重新输入") processed_data = [] question_count = 0 pdf_image_paths = convert_pdf_to_images(pdf_file.name) image_urls = [upload_image_to_gcs(pdf_image_path, bucket) for pdf_image_path in pdf_image_paths] text = pdf_image_to_text(image_urls) print("======pdf_image_to_text=====") print(text) print("========================") text = text.replace("```json", "").replace("```", "") text_json = safe_json_loads(text) for text_item in text_json: print("======text_to_json=====") print(text_item) print("========================") question_json = safe_json_loads(text_to_json(text_item)) perseus_json_str = build_perseus_json(question_json) processed_data.append(["", text] + list(question_json.values()) + [perseus_json_str]) question_count += 1 result = f"PDF 處理完成,總共完成 {question_count} 道題目" csv_file_path = create_csv(processed_data) return processed_data, result, csv_file_path def pdf_image_to_text(image_urls): user_prompt = """ 請解讀題目圖片: - 圖片請一定要用 zh-TW 解讀 - [數學用語、題目內的數字、選項上的數字、 數學符號、物理化學符號、英文單字] 請一定要用 LATEX markdown 語法(前後用 $ 包起來),LATEX 這很重要 - 直接給出多題的 JSON 格式,不要有多餘的文字解釋脈絡 如果有多題輸出為 JSON LIST FORMAT [ {{ "1. 題號": "1" "2. 題目": "...." "3. 選項": "...." "4. 答案":"(到選項裡面挑選一個最合理的選項) ex: (A) 或 (B) 或 (C) 或 (D)" "5. 解題說明": "1. 步驟一, 2. 步驟二, 3. 步驟三....(最少三個步驟,最多五步驟),最後一個步驟 format 為: 答案選: (A) 或 (B) 或 (C) 或 (D)" }}, {{ "1. 題號": "1" "2. 題目": "...." "3. 選項": "...." "4. 答案":"(到選項裡面挑選一個最合理的選項) ex: (A) 或 (B) 或 (C) 或 (D)" "5. 解題說明": "1. 步驟一, 2. 步驟二, 3. 步驟三....(最少三個步驟,最多五步驟),最後一個步驟 format 為: 答案選: (A) 或 (B) 或 (C) 或 (D)" }}, ] """ messages=[ { "role": "user", "content": [ { "type": "text", "text": user_prompt } ], } ] for image_url in image_urls: messages[0]["content"].append( { "type": "image_url", "image_url": { "url": image_url, }, } ) response = OPEN_AI_CLIENT.chat.completions.create( model="gpt-4o", messages=messages, max_tokens=4000, ) return response.choices[0].message.content def safe_json_loads(json_string): try: return json.loads(json_string) except json.JSONDecodeError as e: print(f"Initial JSONDecodeError: {e}") # Replace invalid control characters json_string = json_string.replace('\\', '\\\\').replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t') json_string = json_string.replace('\'', '\"') try: return json.loads(json_string) except json.JSONDecodeError as e2: print(f"Second JSONDecodeError: {e2}") raise e2 def show_multiple_questions_markdown(data): if len(data) == 0: return "" markdown = "" for i in range(len(data)): question_json = data.iloc[i].to_dict() # 確保訪問的是 DataFrame 的第一行並轉換為字典 question = question_json['題目'] choice_1 = question_json['選項1'] choice_2 = question_json['選項2'] choice_3 = question_json['選項3'] choice_4 = question_json['選項4'] answer = question_json['答案'] hints = [] for i in range(1, 6): hint_key = question_json.get(f'提示{i}', None) if hint_key: hints.append(hint_key) else: break markdown += f""" --- ## 題目 - {question} ## 選項 - (A) {choice_1} - (B) {choice_2} - (C) {choice_3} - (D) {choice_4} ## 答案: {answer} ## 提示 """ for i, hint in enumerate(hints): markdown += f"{i+1}. {hint}\n" return markdown # ====== Junyi Q_ID ====== def process_qid_to_data(password, q_id): # 檢查密碼 if password != PASSWORD: raise gr.Error("密码错误,请重新输入") # 根據 Junyi Q_ID 取得題目截圖 processed_data = [] # 獲取圖片的方法。格式:https://storage.googleapis.com/exercise-render-img-junyi/P-qid.jpg image_url = f"https://storage.googleapis.com/exercise-render-img-junyi/p_{q_id}.jpg" # 處理圖片 text, question_json = process_image(image_url) # 生成 Perseus JSON perseus_json_str = build_perseus_json(question_json) # 返回處理結果 processed_data.append([image_url, text] + list(question_json.values()) + [perseus_json_str]) print("======process_and_upload=====") print("image_url:", image_url) question_count = len(processed_data) result = f"圖片處理完成,總共完成 {question_count} 道題目" csv_file_path = create_csv(processed_data) return processed_data, result, csv_file_path # 新增函數來處理計算紙截圖 def process_calculation_image(password, image_input, base64_input): if password != PASSWORD: raise gr.Error("密码错误,请重新输入") if base64_input: # 處理 base64 編碼的圖片 image_data = base64.b64decode(base64_input.split(',')[1]) image = Image.open(io.BytesIO(image_data)) elif image_input: if isinstance(image_input, str): # 處理圖片文件路徑 image = Image.open(image_input) else: # 處理上傳的圖片文件 image = Image.open(image_input.name) else: return None, "請上傳圖片或輸入 base64 圖片字串" # 將圖片轉換為 base64 字串 buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() # 直接使用 OpenAI API 分析圖片 analysis = analyze_calculation_image(img_str) return image, analysis def analyze_calculation_image(image_base64): user_prompt = """ 請分析這張學生計算紙的截圖: 1. 如果有計算式,請解釋學生的解題步驟,並指出可能的錯誤或改進點。如果沒有計算式,請提供這道題目的標準教學步驟 2. [數學用語、題目內的數字、選項上的數字、 數學符號、物理化學符號、英文單字] 請一定要用 LATEX markdown 語法(前後用 $ 包起來),LATEX 這很重要 3. 無論哪種情況,都請給出鼓勵性的回饋。 4. 請使用 Markdown 格式輸出,數學公式請用 $...$ 包裹。 5. 請使用繁體中文輸出 ZH-TW 6. 只要輸出 Markdown 格式,不要有多餘的文字解釋脈絡 Example: #### 分析與解釋 #### 步驟 1. 確認計算式 在圖片中,我們看到以下的計算式: $ 6 \times 6 \times 6 \times 6 = 6^{\Box} $ #### 步驟 2. 解題步驟教學 這裡我們需要理解的是指數的意味。當我們看到連續的乘法,例如: $ a \times a \times a \times a = a^4 $ 這代表的是底數 \(a\) 被乘了四次。因此,對於 6 乘了四次的這個例子,我們可以用指數表示: $ 6 \times 6 \times 6 \times 6 = 6^4 $ #### 步驟 3. 確認答案與鼓勵 跟據以上的解題步驟,我們知道應該填入的是 4。 ### 鼓勵 你做得很棒!理解指數的概念是非常重要的,尤其是在進階的數學問題中會經常用到。繼續保持這樣的學習態度,我相信你會越來越厲害的! """ response = OPEN_AI_CLIENT.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": [ { "type": "text", "text": user_prompt }, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{image_base64}", }, }, ], } ], max_tokens=1000, ) return response.choices[0].message.content # Gradio界面 with gr.Blocks() as demo: with gr.Row(): password_input = gr.Textbox(label="密碼", type="password", elem_id="password_input") with gr.Tab("計算紙分析"): with gr.Row(): gr.Markdown("## 學生計算紙分析") with gr.Accordion(open=False,label="上傳物件"): with gr.Row(): calculation_image_input = gr.Image(label="上傳計算紙截圖", type="filepath") with gr.Row(): calculation_base64_input = gr.Textbox(label="或輸入 base64 圖片字串", lines=3, elem_id="calculation_base64_input") with gr.Row(): calculation_submit_button = gr.Button("分析計算紙", elem_id="calculation_submit_button") with gr.Accordion(open=False, label="上傳的圖片"): with gr.Row(): calculation_image_display = gr.Image(label="上傳的圖片") with gr.Row(): calculation_result = gr.Markdown(label="分析結果", latex_delimiters=[{"left": "$", "right": "$", "display": False}]) with gr.Tab("Junyi_Q_ID", elem_id="junyi_q_id_tab"): with gr.Row(): gr.Markdown("## Junyi Q_ID") with gr.Row(): junyi_q_id_input = gr.Textbox(label="Junyi Q_ID", elem_id="junyi_q_id_input") junyi_q_id_submit_button = gr.Button("開始處理 Junyi Q_ID", elem_id="junyi_q_id_submit_button") with gr.Row(): junyi_q_id_result_text = gr.Textbox(label="處理結果") junyi_q_id_download_csv_output = gr.File(label="下载 CSV") with gr.Row(): junyi_q_id_question_image = gr.Image() junyi_q_id_question_markdown = gr.Markdown(show_label=False, latex_delimiters=[{"left": "$", "right": "$", "display": False}]) with gr.Accordion(open=False): with gr.Row(): junyi_q_id_result_table = gr.Dataframe( headers=["圖片URL", "文字", "題號", "題目", "選項1", "選項2", "選項3", "選項4", "答案", "提示1", "提示2", "提示3", "提示4", "提示5", "Perseus JSON"], column_widths=[10, 10, 5, 20, 4, 4, 4, 4, 4,4,4,4,4,4, 10], wrap=True ) with gr.Tab("批量處理"): with gr.Row(): gr.Markdown("## 批量圖片處理 + Perseus JSON 生成") with gr.Row(): image_input = gr.Files(label="選擇圖片", type="filepath") submit_button = gr.Button("開始處理圖片") with gr.Row(): result_text = gr.Textbox(label="處理結果") download_csv_output = gr.File(label="下载 CSV") with gr.Row(): batch_question_markdown = gr.Markdown(show_label=False, latex_delimiters=[{"left": "$", "right": "$", "display": False}]) with gr.Accordion(open=False): with gr.Row(): result_table = gr.Dataframe( headers=["圖片URL", "文字", "題號", "題目", "選項1", "選項2", "選項3", "選項4", "答案", "提示1", "提示2", "提示3", "提示4", "提示5", "Perseus JSON"], column_widths=[10, 10, 5, 20, 4, 4, 4, 4, 4,4,4,4,4,4, 10], wrap=True ) with gr.Tab("單張處理"): with gr.Row(): gr.Markdown("## 單張圖片處理") with gr.Row(): single_image_input = gr.Files(label="選擇圖片", type="filepath") single_submit_button = gr.Button("開始處理圖片") with gr.Row(): single_result_text = gr.Textbox(label="處理結果") single_download_csv_output = gr.File(label="下载 CSV") with gr.Row(): single_question_image = gr.Image() single_question_markdown = gr.Markdown(show_label=False, latex_delimiters=[{"left": "$", "right": "$", "display": False}]) with gr.Accordion(open=False): with gr.Row(): single_result_table = gr.Dataframe( headers=["圖片URL", "文字", "題號", "題目", "選項1", "選項2", "選項3", "選項4", "答案", "提示1", "提示2", "提示3", "提示4", "提示5", "Perseus JSON"], column_widths=[10, 10, 5, 20, 4, 4, 4, 4, 4,4,4,4,4,4, 10], wrap=True ) with gr.Tab("PDF 處理"): with gr.Row(): gr.Markdown("## PDF 文件處理") with gr.Row(): pdf_input = gr.File(type="filepath") pdf_submit_button = gr.Button("開始處理 PDF") with gr.Row(): pdf_result_text = gr.Textbox(label="處理結果") pdf_download_csv_output = gr.File(label="下载 CSV") with gr.Row(): pdf_question_markdown = gr.Markdown(show_label=False, latex_delimiters=[{"left": "$", "right": "$", "display": False}]) with gr.Accordion(open=False): with gr.Row(): pdf_result_table = gr.Dataframe( headers=["圖片URL", "文字", "題號", "題目", "選項1", "選���2", "選項3", "選項4", "答案", "提示1", "提示2", "提示3", "提示4", "提示5", "Perseus JSON"], column_widths=[10, 10, 5, 20, 4, 4, 4, 4, 4,4,4,4,4,4, 10], wrap=True ) submit_button.click( fn=process_image_to_data, inputs=[password_input, image_input], outputs=[result_table, result_text, download_csv_output] ).then( fn=show_multiple_questions_markdown, inputs=[result_table], outputs=[batch_question_markdown] ) single_submit_button.click( fn=process_image_to_data, inputs=[password_input, single_image_input], outputs=[single_result_table, single_result_text, single_download_csv_output] ).then( fn=show_single_question_image, inputs=[single_result_table], outputs=[single_question_image] ).then( fn=show_single_question_markdown, inputs=[single_result_table], outputs=[single_question_markdown] ) pdf_submit_button.click( fn=process_pdf_to_data, inputs=[password_input, pdf_input], outputs=[pdf_result_table, pdf_result_text, pdf_download_csv_output] ).then( fn=show_multiple_questions_markdown, inputs=[pdf_result_table], outputs=[pdf_question_markdown] ) junyi_q_id_submit_button.click( fn=process_qid_to_data, inputs=[password_input, junyi_q_id_input], outputs=[junyi_q_id_result_table, junyi_q_id_result_text, junyi_q_id_download_csv_output] ).then( fn=show_single_question_image, inputs=[junyi_q_id_result_table], outputs=[junyi_q_id_question_image] ).then( fn=show_single_question_markdown, inputs=[junyi_q_id_result_table], outputs=[junyi_q_id_question_markdown] ) calculation_submit_button.click( fn=process_calculation_image, inputs=[password_input, calculation_image_input, calculation_base64_input], outputs=[calculation_image_display, calculation_result] ) demo.launch(share=True)