import base64 from io import BytesIO import subprocess # 1) 安装 spaCy 的 en_core_web_sm 模型 # --quiet 可以减少安装时的输出内容(可选) subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm", "--quiet"]) import gradio as gr import cv2 import numpy as np import spacy import shutil from PIL.Image import Image from bs4 import BeautifulSoup import requests import pandas as pd import threading import time import os import sys import logging from logging.handlers import TimedRotatingFileHandler import psutil from SEM.run_single_sem import run_single_pp from CDM.run_single import run_single_img from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload # from dotenv import load_dotenv import os from PIL import Image # 加载 .env 文件中的环境变量 # load_dotenv() title = "Cpp4App_test" description = "Automated Contextual Privacy Policies Generation for Mobile Apps" # log drive_folder_id = '1Ukx6pQcbUQssV7tyWLGZE0QAzmSjBKUK' # Google Drive上的文件夹ID credentials_url = 'google_drive_credentials/cpp4app-logs-9abbc749fda1.json' # create Google Drive service def authenticate_gdrive(): SCOPES = ['https://www.googleapis.com/auth/drive.file'] creds = service_account.Credentials.from_service_account_file(credentials_url, scopes=SCOPES) service = build('drive', 'v3', credentials=creds) return service service = authenticate_gdrive() # upload the log file to Google Drive def upload_to_gdrive(file_path): file_metadata = { 'name': os.path.basename(file_path), 'parents': [drive_folder_id] } media = MediaFileUpload(file_path, resumable=True) file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() print(f"Uploaded {file_path} to Google Drive with file ID: {file.get('id')}") def delete_old_logs(max_files=14): """ 删除 Google Drive 上文件夹中的旧日志文件,只保留最新的 max_files 个文件 """ # 获取文件夹中的所有文件 query = f"'{drive_folder_id}' in parents and mimeType != 'application/vnd.google-apps.folder'" results = service.files().list(q=query, fields="files(id, name, createdTime)", orderBy="createdTime").execute() items = results.get('files', []) # 如果文件数量超过 max_files,删除最早的文件 if len(items) > max_files: files_to_delete = items[:-max_files] for file in files_to_delete: service.files().delete(fileId=file['id']).execute() print(f"Deleted old log file: {file['name']}") # 自定义的 TimedRotatingFileHandler,重写 doRollover 方法 class CustomTimedRotatingFileHandler(TimedRotatingFileHandler): def doRollover(self): super().doRollover() # 调用父类的轮替方法 # 触发日志文件轮替前,记录当前文件名 current_log_files = sorted([f for f in os.listdir(log_directory) if f.startswith('app.log.')], key=lambda x: os.path.getmtime(os.path.join(log_directory, x)), reverse=True) if current_log_files: rotated_file_name = os.path.join(log_directory, current_log_files[0]) # 上传轮替后的文件到 Google Drive if os.path.exists(rotated_file_name): upload_to_gdrive(rotated_file_name) delete_old_logs() log_directory = './logs' log_file_path = os.path.join(log_directory, 'app.log') if not os.path.exists(log_directory): os.makedirs(log_directory) # set log handler(Generate one log file per day and keep only the latest 7 files) handler = CustomTimedRotatingFileHandler(log_file_path, when='midnight', interval=1, backupCount=5, utc=False) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') formatter.converter = time.localtime handler.setLevel(logging.INFO) handler.setFormatter(formatter) # set logger logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(handler) logger.info('Application started') def schedule_monitoring(interval_hours): """ Logging system resource usage every 'interval_hours' hours """ service = authenticate_gdrive() while True: time.sleep(interval_hours * 3600) # 等待指定小时数 # 记录系统资源使用情况 cpu_usage = psutil.cpu_percent(interval=1) memory_info = psutil.virtual_memory() disk_usage = psutil.disk_usage('/') logger.info(f"CPU usage: {cpu_usage}%") logger.info( f"Memory usage: {memory_info.percent}% (Total: {memory_info.total}, Used: {memory_info.used}, Free: {memory_info.free})") logger.info( f"Disk usage: {disk_usage.percent}% (Total: {disk_usage.total}, Used: {disk_usage.used}, Free: {disk_usage.free})") def schedule_restarts(interval_hours): """ Auto-restart every 'interval_hours' hours """ while True: time.sleep(interval_hours * 3600) # convert hour to second python = sys.executable os.execl(python, python, *sys.argv) def write_and_read(): # Write with open('myfile.txt', 'w') as f: f.write('Hello, World!') # Read with open('myfile.txt', 'r') as f: data = f.read() # print("this is data: ", data) return data def run_demo(img_root, output_root, segment_root, file): # print(type(file)) # file_content = file.read().decode('utf-8') run_single_pp(file) output_boards = run_single_img(img_root, output_root, segment_root) return output_boards # 定义全局变量存储图集和当前显示的图片索引 output_boards = [] current_image_index = 0 def inference(img, html): start = time.time() global output_boards, current_image_index # 让output_boards成为全局变量 write_and_read() if img is None or html is None: return None, None output_root = "./CDM/result_classification" segment_root = './SEM/txt' img_root = "./CDM/input_examples/1-1-write.jpg" pp_root = "1.txt" # output_root = "" # segment_root = "" # img_root = "demo_img.jpg" img_array = np.array(img) cv2.imwrite(img_root, img_array) # replace example string with real example # if html == 'html content 1': # with open("examples/6.txt", "r") as f: # html = f.read() # elif html == 'html content 2': # with open("examples/11.txt", "r") as f: # html = f.read() # print("string: ", html) # with open(pp_root, 'w', encoding='utf-8') as file: # Open the destination file in text mode # file.write(html) # Write the HTML content to the destination file # 使用网址命名html文件 html_folder = "./html_files" if not os.path.exists(html_folder): os.makedirs(html_folder) # 使用网址创建合法的文件名 html_filename = os.path.join(html_folder, html.replace("://", "_").replace("/", "_") + ".html") # 如果html文件不存在则下载并保存 if not os.path.exists(html_filename): try: # 定义请求头 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" } response = requests.get(html, headers=headers) response.raise_for_status() # Will raise an exception if the status is an error input_text = response.text # 将HTML内容保存到文件 with open(html_filename, 'w', encoding='utf-8') as file: file.write(input_text) except requests.HTTPError: input_text = "" else: # 如果文件存在,则直接读取 with open(html_filename, 'r', encoding='utf-8') as file: input_text = file.read() with open(pp_root, 'w', encoding='utf-8') as file: file.write(input_text) soup = BeautifulSoup(open(pp_root, encoding='utf-8'), features="html.parser") # print("pp_root soup: ", soup.contents) output_boards = run_demo(img_root, output_root, segment_root, pp_root) # 指定保存目录 save_folder = "output_images" os.makedirs(save_folder, exist_ok=True) # 确保目录存在 for i in range(len(output_boards)): # 确保 output_board 是图像对象 if isinstance(output_boards[i], tuple): # 如果是 tuple,转换为 NumPy 数组或其他图像格式 output_boards[i] = np.array(output_boards[i]) # print(output_data) current_image_index = 0 # 每次运行inference后重置图片索引 total = time.time() - start print(f"total processing time of this screenshot: {total:.2f}s") return output_boards[current_image_index] # 返回图集中的第一张图像 # 向左切换图片 def previous_image(): global current_image_index if current_image_index > 0: current_image_index -= 1 return output_boards[current_image_index] # 向右切换图片 def next_image(): global current_image_index if current_image_index < len(output_boards) - 1: current_image_index += 1 return output_boards[current_image_index] # inputs = [ # gr.inputs.Image(type="pil", label="Image Upload"), # # gr.inputs.File(label="HTML File Upload"), # gr.inputs.Textbox(label="Text Input") # # gr.inputs.Textbox(lines=True, label="HTML Text") # ] # output = [ # gr.outputs.Image(type="pil", label="Result Image"), # gr.outputs.Dataframe(type="pandas", label="Result Excel") # ] # gr.Interface( # inference, # # inputs, # # output, # inputs=[image_input_row, textbox_input_row], # outputs=[image_output_row, dataframe_output_row], # title=title, # description=description, # # examples=[['examples/6-8.jpg', 'examples/6.txt'], ['examples/11-9.jpg', 'examples/11.html']], # # examples=[['examples/6-8.jpg', example_file_content_1], ['examples/11-9.jpg', example_file_content_2]], # examples=[['examples/6-8.jpg', 'html content 1'], ['examples/11-9.jpg', 'html content 2']], # enable_queue=True, # capture_session=True, # layout='vertical' # ).launch(debug=False) # def example_inference(): # image_input_bgr = cv2.imread('examples/6-8.jpg') # image_input = cv2.cvtColor(image_input_bgr, cv2.COLOR_BGR2RGB) # # text_input = 'html content 1' # example string # text_input = 'https://www.whatsapp.com/legal/privacy-policy' # # out_result, out_segment = inference(image_input, text_input) # # return image_input, text_input, out_result, out_segment def example_inference_1(): image_input_bgr = cv2.imread("examples/6-8.jpg") image_input = cv2.cvtColor(image_input_bgr, cv2.COLOR_BGR2RGB) text_input = 'https://www.whatsapp.com/legal/privacy-policy' out_result, out_segment, complete_result = inference(image_input, text_input) return image_input, text_input, out_result, out_segment, complete_result def example_inference_2(): image_input_bgr = cv2.imread("examples/11-9.jpg") image_input = cv2.cvtColor(image_input_bgr, cv2.COLOR_BGR2RGB) text_input = 'https://values.snap.com/privacy/privacy-policy' out_result, out_segment, complete_result = inference(image_input, text_input) return image_input, text_input, out_result, out_segment, complete_result def example_inference_3(): image_input_bgr = cv2.imread("examples/1-1.jpg") image_input = cv2.cvtColor(image_input_bgr, cv2.COLOR_BGR2RGB) text_input = 'https://mcdonalds.com.au/privacy-policy' out_result, out_segment, complete_result = inference(image_input, text_input) return image_input, text_input, out_result, out_segment, complete_result def new_example_inference_1(): image_input_bgr = cv2.imread("examples/6-8.jpg") image_input = cv2.cvtColor(image_input_bgr, cv2.COLOR_BGR2RGB) text_input = 'https://www.whatsapp.com/legal/privacy-policy' global output_boards, current_image_index folder_path = "results/result1" # 获取文件夹中的所有文件,并直接过滤出 PNG 文件 png_files = [ f for f in os.listdir(folder_path) if f.endswith('.png') and not f.startswith('.') ] png_files = sorted(png_files) # 获取文件夹中的所有 PNG 文件并按文件名排序 output_boards = [] for file_name in png_files: file_path = os.path.join(folder_path, file_name) img_bgr = cv2.imread(file_path) # OpenCV 加载 BGR 格式 img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # 转换为 RGB 格式并添加到列表 output_boards.append(img_rgb) current_image_index = 0 # 重置当前图片索引 out_result = output_boards[current_image_index] # out_result_bgr = cv2.imread("results/result_1.png") # out_result = cv2.cvtColor(out_result_bgr, cv2.COLOR_BGR2RGB) out_segment = pd.read_excel("results/result_1_S.xlsx") complete_result = pd.read_excel("results/result_1_C.xlsx") # # return generate_html_with_image_and_text(out_result, text_input), image_input return image_input, text_input, out_result, out_segment, complete_result def new_example_inference_2(): image_input_bgr = cv2.imread("examples/11-9.jpg") image_input = cv2.cvtColor(image_input_bgr, cv2.COLOR_BGR2RGB) text_input = 'https://values.snap.com/privacy/privacy-policy' global output_boards, current_image_index folder_path = "results/result2" # 获取文件夹中的所有文件,并直接过滤出 PNG 文件 png_files = [ f for f in os.listdir(folder_path) if f.endswith('.png') and not f.startswith('.') ] png_files = sorted(png_files) # 获取文件夹中的所有 PNG 文件并按文件名排序 output_boards = [] for file_name in png_files: file_path = os.path.join(folder_path, file_name) img_bgr = cv2.imread(file_path) # OpenCV 加载 BGR 格式 img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # 转换为 RGB 格式并添加到列表 output_boards.append(img_rgb) current_image_index = 0 # 重置当前图片索引 out_result = output_boards[current_image_index] # out_result_bgr = cv2.imread("results/result_2.png") # out_result = cv2.cvtColor(out_result_bgr, cv2.COLOR_BGR2RGB) out_segment = pd.read_excel("results/result_2_S.xlsx") complete_result = pd.read_excel("results/result_2_C.xlsx") return image_input, text_input, out_result, out_segment, complete_result def new_example_inference_3(): image_input_bgr = cv2.imread("examples/1-1.jpg") image_input = cv2.cvtColor(image_input_bgr, cv2.COLOR_BGR2RGB) text_input = 'https://mcdonalds.com.au/privacy-policy' global output_boards, current_image_index folder_path = "results/result3" # 获取文件夹中的所有文件,并直接过滤出 PNG 文件 png_files = [ f for f in os.listdir(folder_path) if f.endswith('.png') and not f.startswith('.') ] png_files = sorted(png_files) # 获取文件夹中的所有 PNG 文件并按文件名排序 output_boards = [] for file_name in png_files: file_path = os.path.join(folder_path, file_name) img_bgr = cv2.imread(file_path) # OpenCV 加载 BGR 格式 img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # 转换为 RGB 格式并添加到列表 output_boards.append(img_rgb) current_image_index = 0 # 重置当前图片索引 out_result = output_boards[current_image_index] # out_result_bgr = cv2.imread("results/result_3.png") # out_result = cv2.cvtColor(out_result_bgr, cv2.COLOR_BGR2RGB) out_segment = pd.read_excel("results/result_3_S.xlsx") complete_result = pd.read_excel("results/result_3_C.xlsx") return image_input, text_input, out_result, out_segment, complete_result # def toggle_dataframe_callback(): # complete_result_dataframe.visible = not complete_result_dataframe.visible with gr.Blocks(css=''' button[class*="btn"] { all: unset; /* 移除所有样式 */ display: inline-block; /* 确保按钮仍然是块元素 */ padding: 10px 20px; /* 添加一些基本的填充 */ border: none; /* 去掉边框 */ background: none; /* 去掉背景 */ color: inherit; /* 继承颜色 */ cursor: pointer; /* 使按钮有点击手感 */ width: 150px; height: 30px; border-radius: 5px !important; text-align: center; } .icon-buttons { display: none; } body { display: flex; justify-content: center; margin: 0; /* 去除默认的 body 外边距 */ } h1, h2{ width: 100%; } .example_imgs { width: 100%; display: flex; justify-content: flex-start; /* 图片居左对齐 */ align-items: flex-start; /* 垂直居中对齐 */ gap: 20px; /* 控制图片之间的间距 */ } .eg_img { width: 40%; /* 调整图片大小 */ padding: 0; display: block; margin: 0 auto; /* 图片居中 */ border: 2px solid rgb(234, 234, 234) !important; transition: border-color 0.5s ease, border-width 0.5s ease; background-size: cover !important; background-position: center !important; background-repeat: no-repeat !important; padding-bottom: 80% !important; } .eg_img:hover { border-color: rgb(117, 209, 255) !important; /* 悬停时边框颜色 */ border-width: 2px !important; /* 悬停时边框粗细 */ } .eg1 { background-image: url('https://buyanghc.github.io/eg1.jpg') !important; } .eg2 { background-image: url('https://buyanghc.github.io/eg2.jpg') !important; } .eg3 { background-image: url('https://buyanghc.github.io/eg3.jpg') !important; } .eg_img_msg { text-align: center; /* 文本居中 */ } .col{ width: 20% !important; } .btn-container { width: 100% display: flex; justify-content: center; /* 按钮容器中的内容右对齐 */ gap: 45px; /* 按钮之间的间距 */ margin-top: 15px; margin-bottom: 15px; } .btn { width: 150px; height: 45px; border: 2px solid rgb(40, 40, 40); transition: border-color 0.3s ease, color 0.3s ease, background-color 0.3s ease !important; border-radius: 5px; } .btn_run { color: white !important; background-color: rgb(40, 40, 40) !important; } .btn_reset { background-color: rgb(234, 234, 234) !important; } .btn:hover{ color: white !important; background-color: rgb(117, 209, 255) !important; border: white !important; } .change_btn{ width: 55px !important; height: 25px !important; border-radius: 50% !important; background-color: rgb(234, 234, 234) !important; padding: 0 !important; margin: 0 !important; } .btn-container2 { width: 40%; display: flex; justify-content: flex-end; /* 按钮容器中的内容右对齐 */ gap: 20px; /* 按钮之间的间距 */ } ''') as demo: with gr.Row(): with gr.Column(scale=1): gr.Markdown("

Cpp4App

", elem_classes="title") gr.Markdown( "

Automated Contextual Privacy Policies Generation for Mobile Apps.

", elem_classes="title_msg1") gr.Markdown( "

[Usage] Upload the screenshot and privacy policy URL link, then click 'RUN' button.

", elem_classes="title_msg1") # with gr.Column(scale=1): # gr.Image("images/head.jpg", elem_classes="title_img img") with gr.Row(elem_classes="btn-container"): run_btn = gr.Button("RUN", elem_classes="btn btn_run") reset_btn = gr.Button("RESET", elem_classes="btn btn_reset") with gr.Column(): with gr.Row(elem_classes="work2"): with gr.Column(elem_classes="col_big", scale=1): gr.Markdown("

Step 1 : Input privacy policy URL ⬇️

", elem_classes="work_msg") text_input = gr.Textbox(label="URL Input for the Privacy Policy of the App", elem_classes="url") with gr.Column(elem_classes="col_big", scale=1): gr.Markdown("

Step 2 : Upload the screenshot ⬇️

", elem_classes="work_msg") image_input = gr.Image(type="pil", label="Screenshot Upload", elem_classes="image_show") with gr.Column(elem_classes="col_big", scale=1): with gr.Row(): gr.Markdown("

Output 🌟:

", elem_classes="work_msg") with gr.Row(elem_classes="btn-container2"): previous_button = gr.Button("<", elem_classes="change_btn") next_button = gr.Button(">", elem_classes="change_btn") result_image = gr.Image(type="pil", label="Result Screenshot", elem_classes="image_show") with gr.Column(): gr.Markdown( "

You can try with three examples we provided:

", elem_classes="eg_title") gr.Markdown("

· Click the picture to run.

", elem_classes="eg_msg") with gr.Row(elem_classes="example_imgs"): with gr.Column(elem_classes="col"): eg1_img = gr.Button("", elem_classes="eg_img eg1") gr.Markdown("

· WhatsApp

", elem_classes="eg_img_msg") with gr.Column(elem_classes="col"): eg2_img = gr.Button("", elem_classes="eg_img eg2") gr.Markdown("

· Snap

", elem_classes="eg_img_msg") with gr.Column(elem_classes="col"): eg3_img = gr.Button("", elem_classes="eg_img eg3") gr.Markdown("

· Mcdonald's

", elem_classes="eg_img_msg") run_btn.click(inference, inputs=[image_input, text_input], outputs=[result_image]) reset_btn.click(lambda: [None, None, None, None, None, None], inputs=[], outputs=[image_input, text_input, result_image]) # 点击左右切换按钮时切换显示的图片 previous_button.click(previous_image, outputs=result_image) next_button.click(next_image, outputs=result_image) # example_button.click(example_inference, inputs=[], outputs=[image_input, text_input, result_image, result_dataframe]) eg1_img.click(new_example_inference_1, inputs=[], outputs=[image_input, text_input, result_image]) eg2_img.click(new_example_inference_2, inputs=[], outputs=[image_input, text_input, result_image]) eg3_img.click(new_example_inference_3, inputs=[], outputs=[image_input, text_input, result_image]) # # Create a unique CSS ID for the dataframe output # dataframe_id = id(complete_result_dataframe) # # # Define CSS styles for hiding/showing the dataframe # hide_style = f"#{dataframe_id} {{ display: none; }}" # show_style = f"#{dataframe_id} {{ display: block; }}" # # # def toggle_dataframe_callback(): # if toggle_dataframe_button.label == "Show Complete Result Excel": # toggle_dataframe_button.label = "Hide Complete Result Excel" # gr.Html(style=show_style).show() # else: # toggle_dataframe_button.label = "Show Complete Result Excel" # gr.Html(style=hide_style).show() threading.Thread(target=schedule_restarts, args=(24,)).start() # restart per 24 hours threading.Thread(target=schedule_monitoring, args=(12,)).start() # monitor resources using per 12 hours try: demo.launch() logger.info('Gradio app launched successfully') except Exception as e: logger.error('Error occurred while launching Gradio app', exc_info=True)