Spaces:
Sleeping
Sleeping
| #src.summary.utils.py | |
| import re | |
| from typing import List | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| from typing import Callable | |
| from transformers import AutoTokenizer | |
| from tiktoken import Encoding, encoding_for_model | |
| SCENE_INDICATORS = ['씬/','씬','SS##','S#','s#','S','s','#\d+.','\d+.'] | |
| def delete_special(pre_text, character_list): | |
| for c in character_list: | |
| pre_text = pre_text.replace(c, "") | |
| return pre_text | |
| def preprocess_script(script:str) -> str: | |
| lines = script.split("\n") | |
| new_text = "" | |
| for line in lines: | |
| line = delete_special(line, ["\n", "\t", "\xa0",'၀','ᝰ','ศ','ನ','tุ','\x00Ā\x00\x00\x00']) | |
| cleaned = re.sub('[^가-힣a-zA-Z0-9\s,.!?/#]',' ', line).strip() | |
| cleaned = delete_special(cleaned, [" "]).strip() | |
| cleaned = cleaned.replace("<|start|>", "").replace("<|end|>","") | |
| if len(cleaned)>0: | |
| new_text += f"{line}\n" | |
| new_text = new_text.strip() | |
| return new_text | |
| def preprocess_scripts(scripts:List[str]) -> List[str]: | |
| scripts = [preprocess_script(s) for s in scripts] | |
| return scripts | |
| def break_down2scenes(text: str): | |
| # Split the text using "s#" as the delimiter | |
| scenes = re.split(r'(s#\d+)', text) | |
| # Remove empty elements from the split results | |
| scenes = [scene for scene in scenes if scene.strip()] | |
| scenes_list = [] | |
| current_scene_number = None | |
| for i in range(0, len(scenes), 2): # Process the "s#" marker and corresponding text as pairs | |
| scene_marker = scenes[i].strip() | |
| scene_number = int(scene_marker.split('#')[1]) # Extract only the number | |
| scene_text = scenes[i+1].strip() if i+1 < len(scenes) else "" | |
| # Verify that the scene numbers are in the correct order | |
| if current_scene_number is not None: | |
| expected_scene_number = current_scene_number + 1 | |
| if scene_number != expected_scene_number: | |
| raise ValueError(f"Unexpected scene number: {scene_number}, expected {expected_scene_number}") | |
| # Save the scene number and text together | |
| scenes_list.append({ | |
| 'detected_scene_number': scene_number, | |
| 'text': f"{scene_marker}\n{scene_text}".strip() | |
| }) | |
| return scenes_list | |
| def chunk_script_gpt(script:str, | |
| model_id:str, | |
| chunk_size:int=-1) -> List[str]: | |
| if chunk_size == -1: | |
| chunks = [script] | |
| print("Single Inference Mode") | |
| return chunks | |
| encoding = encoding_for_model(model_id) | |
| scenes = break_down2scenes(script) | |
| len_scenes = len(scenes) | |
| chunks = [] | |
| if len_scenes > 10: | |
| print(f"Num of detected scenes : {len_scenes}") | |
| chunk = "" | |
| token_len_chunk = 0 | |
| for i, scene_data in enumerate(scenes): | |
| scene = scene_data["text"].strip() | |
| token_len_scene = len(encoding.encode_ordinary(scene)) | |
| if token_len_chunk + token_len_scene > chunk_size: | |
| if token_len_chunk == 0: | |
| chunk += scene | |
| token_len_chunk += token_len_scene | |
| else: | |
| chunks.append(chunk) | |
| chunk = scene | |
| token_len_chunk = token_len_scene | |
| else: | |
| chunk += scene | |
| token_len_chunk += token_len_scene | |
| if i == len_scenes-1: | |
| chunks.append(chunk) | |
| else: | |
| print(f"No Detected Scenes ({len_scenes})") | |
| tokenized_script = encoding.encode_ordinary(script) | |
| token_len_script = len(tokenized_script) | |
| for start in range(0,token_len_script,chunk_size): | |
| if start + chunk_size >= token_len_script: | |
| end = token_len_script+1 | |
| else: | |
| end = start+chunk_size | |
| chunk = encoding.decode(tokenized_script[start:end]) | |
| chunks.append(chunk) | |
| print(f"Num of chunks : {len(chunks)}") | |
| return chunks | |