Spaces:
Runtime error
Runtime error
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import traceback | |
| from time import sleep, time | |
| from types import SimpleNamespace | |
| import json_repair | |
| import Levenshtein | |
| from lxml import etree | |
| from pdf2image import convert_from_path | |
| from pptx.dml.color import RGBColor | |
| from pptx.oxml import parse_xml | |
| from pptx.shapes.base import BaseShape | |
| from pptx.shapes.group import GroupShape | |
| from pptx.text.text import _Paragraph, _Run | |
| from pptx.util import Length, Pt | |
| from rich import print | |
| from tenacity import RetryCallState, retry, stop_after_attempt, wait_fixed | |
| from playwright.sync_api import sync_playwright | |
| IMAGE_EXTENSIONS = {"bmp", "jpg", "jpeg", "pgm", "png", "ppm", "tif", "tiff", "webp"} | |
| BLACK = RGBColor(0, 0, 0) | |
| YELLOW = RGBColor(255, 255, 0) | |
| BLUE = RGBColor(0, 0, 255) | |
| BORDER_LEN = Pt(2) | |
| BORDER_OFFSET = Pt(2) | |
| LABEL_LEN = Pt(24) | |
| FONT_LEN = Pt(20) | |
| def is_image_path(file: str): | |
| if file.split(".")[-1].lower() in IMAGE_EXTENSIONS: | |
| return True | |
| return False | |
| def get_font_pptcstyle(font: dict): | |
| font = SimpleNamespace(**font) | |
| return f"Font Style: bold={font.bold}, italic={font.italic}, underline={font.underline}, size={font.size}pt, color={font.color}, font style={font.name}\n" | |
| def run_sync_screenshots(webpage_url: str, output_path: str, wait_seconds: int = 5) -> str: | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch( | |
| headless=True, | |
| args=[ | |
| "--no-sandbox", | |
| "--disable-setuid-sandbox", | |
| "--disable-dev-shm-usage", | |
| "--disable-gpu", | |
| "--disable-web-security", | |
| ] | |
| ) | |
| context = browser.new_context( | |
| viewport={"width": 1920, "height": 1080}, | |
| ignore_https_errors=True, | |
| ) | |
| page = context.new_page() | |
| page.set_default_timeout(0) | |
| try: | |
| page.goto(webpage_url, timeout=60000, wait_until="domcontentloaded") | |
| page.wait_for_timeout(wait_seconds * 1000) | |
| page.evaluate(""" | |
| if (document.fonts && document.fonts.ready) { | |
| document.fonts.ready.catch(() => {}); | |
| } | |
| """) | |
| page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
| page.wait_for_timeout(1500) | |
| page.evaluate("window.scrollTo(0, 0)") | |
| page.wait_for_timeout(500) | |
| page.screenshot(path=output_path, full_page=True, timeout=0) | |
| except Exception as e: | |
| print(f"[WARN] Screenshot exception: {e}") | |
| try: | |
| page.screenshot(path=output_path, full_page=True, timeout=0) | |
| except Exception as ee: | |
| print(f"[ERROR] Fallback screenshot failed: {ee}") | |
| finally: | |
| browser.close() | |
| return output_path | |
| def get_font_style(font: dict): | |
| font = SimpleNamespace(**font) | |
| styles = [] | |
| if font.size: | |
| styles.append(f"font-size: {font.size}pt") | |
| if font.color: | |
| styles.append(f"color: #{font.color}") | |
| if font.bold: | |
| styles.append("font-weight: bold") | |
| if font.italic: | |
| styles.append("font-style: italic") | |
| return "; ".join(styles) | |
| def runs_merge(paragraph: _Paragraph): | |
| runs = paragraph.runs | |
| if len(runs) == 0: | |
| runs = [ | |
| _Run(r, paragraph) | |
| for r in parse_xml(paragraph._element.xml.replace("fld", "r")).r_lst | |
| ] | |
| if len(runs) == 1: | |
| return runs[0] | |
| if len(runs) == 0: | |
| return None | |
| run = max(runs, key=lambda x: len(x.text)) | |
| run.text = paragraph.text | |
| for r in runs: | |
| if r != run: | |
| r._r.getparent().remove(r._r) | |
| return run | |
| def older_than(filepath, seconds: int = 10, wait: bool = False): | |
| if not os.path.exists(filepath): | |
| while wait: | |
| print("waiting for:", filepath) | |
| sleep(1) | |
| if os.path.exists(filepath): | |
| sleep(seconds) | |
| return True | |
| return False | |
| file_creation_time = os.path.getctime(filepath) | |
| current_time = time() | |
| return seconds < (current_time - file_creation_time) | |
| def edit_distance(text1: str, text2: str): | |
| return 1 - Levenshtein.distance(text1, text2) / max(len(text1), len(text2)) | |
| def get_slide_content(doc_json: dict, slide_title: str, slide: dict): | |
| slide_desc = slide.get("description", "") | |
| slide_content = f"Slide Purpose: {slide_title}\nSlide Description: {slide_desc}\n" | |
| for key in slide.get("subsections", []): | |
| slide_content += "Slide Content Source: " | |
| for section in doc_json["sections"]: | |
| subsections = section.get("subsections", []) | |
| if isinstance(subsections, dict) and len(subsections) == 1: | |
| subsections = [ | |
| {"title": k, "content": v} for k, v in subsections.items() | |
| ] | |
| for subsection in subsections: | |
| try: | |
| if edit_distance(key, subsection["title"]) > 0.9: | |
| slide_content += f"# {key} \n{subsection['content']}\n" | |
| except: | |
| pass | |
| return slide_content | |
| def tenacity_log(retry_state: RetryCallState): | |
| print(retry_state) | |
| traceback.print_tb(retry_state.outcome.exception().__traceback__) | |
| def get_json_from_response(raw_response: str): | |
| response = raw_response.strip() | |
| l, r = response.rfind("```json"), response.rfind("```") | |
| try: | |
| if l == -1 or r == -1: | |
| response = json_repair.loads(response) | |
| else: | |
| response = json_repair.loads(response[l + 7 : r].strip()) | |
| return response | |
| except Exception as e: | |
| raise RuntimeError("Failed to parse JSON from response", e) | |
| def extract_html_code_block(raw_response): | |
| response = raw_response.strip() | |
| l = response.rfind("```html") | |
| r = response.rfind("```") | |
| if l == -1 or r == -1 or r <= l: | |
| return None # 没找到合法 HTML 代码块 | |
| html_block = response[l + len("```html"):r].strip() | |
| return html_block | |
| tenacity = retry( | |
| wait=wait_fixed(3), stop=stop_after_attempt(5), after=tenacity_log, reraise=True | |
| ) | |
| def ppt_to_images(file: str, output_dir: str, warning: bool = False, dpi=72, output_type='png'): | |
| assert pexists(file), f"File {file} does not exist" | |
| if pexists(output_dir) and warning: | |
| print(f"ppt2images: {output_dir} already exists") | |
| os.makedirs(output_dir, exist_ok=True) | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| command_list = [ | |
| "soffice", | |
| "--headless", | |
| "--convert-to", | |
| "pdf", | |
| file, | |
| "--outdir", | |
| temp_dir, | |
| ] | |
| subprocess.run(command_list, check=True, stdout=subprocess.DEVNULL) | |
| for f in os.listdir(temp_dir): | |
| if not f.endswith(".pdf"): | |
| continue | |
| temp_pdf = pjoin(temp_dir, f) | |
| images = convert_from_path(temp_pdf, dpi=72) | |
| for i, img in enumerate(images): | |
| if output_type == 'png': | |
| img.save(pjoin(output_dir, f"poster.png"), 'PNG') | |
| else: | |
| img.save(pjoin(output_dir, f"poster.jpg"), 'JPEG') | |
| return | |
| raise RuntimeError("No PDF file was created in the temporary directory", file) | |
| def wmf_to_images(blob: bytes, filepath: str): | |
| if not filepath.endswith(".jpg"): | |
| raise ValueError("filepath must end with .jpg") | |
| dirname = os.path.dirname(filepath) | |
| basename = os.path.basename(filepath).removesuffix(".jpg") | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| with open(pjoin(temp_dir, f"{basename}.wmf"), "wb") as f: | |
| f.write(blob) | |
| command_list = [ | |
| "soffice", | |
| "--headless", | |
| "--convert-to", | |
| "jpg", | |
| pjoin(temp_dir, f"{basename}.wmf"), | |
| "--outdir", | |
| dirname, | |
| ] | |
| subprocess.run(command_list, check=True, stdout=subprocess.DEVNULL) | |
| assert pexists(filepath), f"File {filepath} does not exist" | |
| def extract_fill(shape: BaseShape): | |
| if "fill" not in dir(shape): | |
| return None | |
| else: | |
| return shape.fill._xPr.xml | |
| def apply_fill(shape: BaseShape, fill_xml: str): | |
| if fill_xml is None: | |
| return | |
| new_element = etree.fromstring(fill_xml) | |
| shape.fill._xPr.getparent().replace(shape.fill._xPr, new_element) | |
| def parse_groupshape(groupshape: GroupShape): | |
| assert isinstance(groupshape, GroupShape) | |
| group_top_left_x = groupshape.left | |
| group_top_left_y = groupshape.top | |
| group_width = groupshape.width | |
| group_height = groupshape.height | |
| shape_top_left_x = min([sp.left for sp in groupshape.shapes]) | |
| shape_top_left_y = min([sp.top for sp in groupshape.shapes]) | |
| shape_width = ( | |
| max([sp.left + sp.width for sp in groupshape.shapes]) - shape_top_left_x | |
| ) | |
| shape_height = ( | |
| max([sp.top + sp.height for sp in groupshape.shapes]) - shape_top_left_y | |
| ) | |
| group_shape_xy = [] | |
| for sp in groupshape.shapes: | |
| group_shape_left = ( | |
| sp.left - shape_top_left_x | |
| ) * group_width / shape_width + group_top_left_x | |
| group_shape_top = ( | |
| sp.top - shape_top_left_y | |
| ) * group_height / shape_height + group_top_left_y | |
| group_shape_width = sp.width * group_width / shape_width | |
| group_shape_height = sp.height * group_height / shape_height | |
| group_shape_xy.append( | |
| { | |
| "left": Length(group_shape_left), | |
| "top": Length(group_shape_top), | |
| "width": Length(group_shape_width), | |
| "height": Length(group_shape_height), | |
| } | |
| ) | |
| return group_shape_xy | |
| def is_primitive(obj): | |
| if isinstance(obj, (list, tuple, set, frozenset)): | |
| return all(is_primitive(item) for item in obj) | |
| return isinstance( | |
| obj, (int, float, complex, bool, str, bytes, bytearray, type(None)) | |
| ) | |
| DEFAULT_EXCLUDE = set(["element", "language_id", "ln", "placeholder_format"]) | |
| def object_to_dict(obj, result=None, exclude=None): | |
| if result is None: | |
| result = {} | |
| exclude = DEFAULT_EXCLUDE.union(exclude or set()) | |
| for attr in dir(obj): | |
| if attr in exclude: | |
| continue | |
| try: | |
| if not attr.startswith("_") and not callable(getattr(obj, attr)): | |
| attr_value = getattr(obj, attr) | |
| if "real" in dir(attr_value): | |
| attr_value = attr_value.real | |
| if attr == "size" and isinstance(attr_value, int): | |
| attr_value = Length(attr_value).pt | |
| if is_primitive(attr_value): | |
| result[attr] = attr_value | |
| except: | |
| pass | |
| return result | |
| def merge_dict(d1: dict, d2: list[dict]): | |
| if len(d2) == 0: | |
| return d1 | |
| for key in list(d1.keys()): | |
| values = [d[key] for d in d2] | |
| if d1[key] is not None and len(values) != 1: | |
| values.append(d1[key]) | |
| if values[0] is None or not all(value == values[0] for value in values): | |
| continue | |
| d1[key] = values[0] | |
| for d in d2: | |
| d[key] = None | |
| return d1 | |
| def dict_to_object(dict: dict, obj: object, exclude=None): | |
| if exclude is None: | |
| exclude = set() | |
| for key, value in dict.items(): | |
| if key not in exclude: | |
| setattr(obj, key, value) | |
| class Config: | |
| def __init__(self, rundir=None, session_id=None, debug=True): | |
| self.DEBUG = debug | |
| if session_id is not None: | |
| self.set_session(session_id) | |
| if rundir is not None: | |
| self.set_rundir(rundir) | |
| def set_session(self, session_id): | |
| self.session_id = session_id | |
| self.set_rundir(f"./runs/{session_id}") | |
| def set_rundir(self, rundir: str): | |
| self.RUN_DIR = rundir | |
| self.IMAGE_DIR = pjoin(self.RUN_DIR, "images") | |
| for the_dir in [self.RUN_DIR, self.IMAGE_DIR]: | |
| os.makedirs(the_dir, exist_ok=True) | |
| def set_debug(self, debug: bool): | |
| self.DEBUG = debug | |
| def remove_rundir(self): | |
| if pexists(self.RUN_DIR): | |
| shutil.rmtree(self.RUN_DIR) | |
| if pexists(self.IMAGE_DIR): | |
| shutil.rmtree(self.IMAGE_DIR) | |
| pjoin = os.path.join | |
| pexists = os.path.exists | |
| pbasename = os.path.basename | |
| if __name__ == "__main__": | |
| config = Config() | |
| print(config) | |