|
|
import typing as T |
|
|
import os |
|
|
import sys |
|
|
import argparse |
|
|
import json |
|
|
import nflx_copilot as ncp |
|
|
import pandas as pd |
|
|
import re |
|
|
|
|
|
sys.path.append("/root/workspace") |
|
|
|
|
|
from timedtext.adapters.translation.generation.pldl import TimedTextAdapter, ConverterDialogContext |
|
|
from timedtext.manager import TimedTextManager |
|
|
from timedtext.handlers import OriginalLanguagePivotLanguageHandler, EnglishTemplateSubtitleHandler |
|
|
from timedprompts.evaluation.pldl_prompt_one.prompt import ( |
|
|
ReferenceFreeFeedbackTransform, |
|
|
ContextFreeFeedbackTransform, |
|
|
ReferenceFreeDirectTransform, |
|
|
ReferenceBasedFeedbackTransform, |
|
|
ReferenceFreeExampleTransform, |
|
|
) |
|
|
from tqdm import tqdm |
|
|
from timedtune.convert.tq_for_pldl.pldl_train_one import PldlTrainOneReferenceFreeTransform |
|
|
from timedtext.adapters.translation.evaluation import compute_score_delta |
|
|
|
|
|
def compute_32_point_score(response, generation): |
|
|
parsed, score = {}, -1 |
|
|
try: |
|
|
score = ( |
|
|
int(response["Accuracy Score"]) |
|
|
+ int(response["Readability Score"]) |
|
|
+ compute_score_delta(response, "Accuracy Issues", generation) |
|
|
+ compute_score_delta(response, "Readability Issues", generation) |
|
|
) |
|
|
score = score * 4 |
|
|
except: |
|
|
score = -1 |
|
|
return parsed, score |
|
|
|
|
|
|
|
|
class TimedTextAdapterFromCache_PLDL(TimedTextAdapter): |
|
|
def __init__( |
|
|
self, |
|
|
data_dir: str, |
|
|
cache_size: int = 0, |
|
|
ol_dialog_list_version: str = "", |
|
|
pl_dialog_list_version: str = "", |
|
|
ol_dialog_list_pl_dialog_list_version: str = "", |
|
|
num_prev_events: int = 16, |
|
|
num_next_events: int = 16, |
|
|
) -> None: |
|
|
super().__init__(num_prev_events, num_next_events) |
|
|
self.timed_text_manager = TimedTextManager( |
|
|
data_dir, |
|
|
cache_size=cache_size, |
|
|
ol_dialog_list_version=ol_dialog_list_version, |
|
|
pl_dialog_list_version=pl_dialog_list_version, |
|
|
ol_dialog_list_pl_dialog_list_version=ol_dialog_list_pl_dialog_list_version, |
|
|
) |
|
|
|
|
|
def _get_timed_text( |
|
|
self, movie_id: int, start_frame: int, end_frame: int, src_lang: str, tgt_lang: str |
|
|
) -> T.Dict[str, T.Union[T.Dict, T.List[T.Dict]]]: |
|
|
results = self.timed_text_manager.match_and_get_timed_text( |
|
|
handler_class=OriginalLanguagePivotLanguageHandler, |
|
|
movie_id=movie_id, |
|
|
start_frame=start_frame, |
|
|
end_frame=end_frame, |
|
|
src_lang=src_lang, |
|
|
tgt_lang=tgt_lang, |
|
|
mid_lang="", |
|
|
**self.timed_text_kwargs, |
|
|
) |
|
|
|
|
|
curr_srcs = [result["curr"]["src"]["txt"] for result in results] |
|
|
curr_tgts = [result["curr"]["tgt"]["txt"] for result in results] |
|
|
|
|
|
return { |
|
|
"curr": {"src": {"txt": "\n\n".join(curr_srcs)}, "tgt": {"txt": "\n\n".join(curr_tgts)}}, |
|
|
"prev": results[0]["prev"], |
|
|
"next": results[-1]["next"], |
|
|
} |
|
|
|
|
|
class TimedTextAdapterFromCache_SUBS(TimedTextAdapter): |
|
|
def __init__( |
|
|
self, |
|
|
data_dir: str, |
|
|
cache_size: int = 0, |
|
|
num_prev_events: int = 16, |
|
|
num_next_events: int = 16, |
|
|
) -> None: |
|
|
super().__init__(num_prev_events, num_next_events) |
|
|
self.timed_text_manager = TimedTextManager( |
|
|
data_dir, |
|
|
cache_size=cache_size, |
|
|
) |
|
|
|
|
|
def _get_timed_text( |
|
|
self, movie_id: int, start_frame: int, end_frame: int, src_lang: str, tgt_lang: str |
|
|
) -> T.Dict[str, T.Union[T.Dict, T.List[T.Dict]]]: |
|
|
results = self.timed_text_manager.match_and_get_timed_text( |
|
|
handler_class=EnglishTemplateSubtitleHandler, |
|
|
movie_id=movie_id, |
|
|
start_frame=start_frame, |
|
|
end_frame=end_frame, |
|
|
src_lang=src_lang, |
|
|
tgt_lang=tgt_lang, |
|
|
mid_lang="", |
|
|
**self.timed_text_kwargs, |
|
|
) |
|
|
|
|
|
curr_srcs = [result["curr"]["src"]["txt"] for result in results] |
|
|
curr_tgts = [result["curr"]["tgt"]["txt"] for result in results] |
|
|
|
|
|
return { |
|
|
"curr": {"src": {"txt": "\n\n".join(curr_srcs)}, "tgt": {"txt": "\n\n".join(curr_tgts)}}, |
|
|
"prev": results[0]["prev"], |
|
|
"next": results[-1]["next"], |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def fetch_contextual_information(timed_text_adapter, row): |
|
|
""" |
|
|
Fetches the required context information for each sample using timed_text_adapter. |
|
|
|
|
|
Args: |
|
|
timed_text_adapter (TimedTextAdapterFromCache): Adapter to fetch data from. |
|
|
row (dict): Row containing the necessary information to fetch the context. |
|
|
|
|
|
Returns: |
|
|
dict: Contextual information containing src_text, tgt_text, prev_context, next_context, src_prev, src_next, tgt_prev, tgt_next. |
|
|
""" |
|
|
|
|
|
src_text, tgt_text, prev_context, next_context = timed_text_adapter.get_timed_text( |
|
|
movie_id=row["movie_id"], |
|
|
start_frame=row["start_frame"], |
|
|
end_frame=row["end_frame"], |
|
|
src_lang=row["src_lang"], |
|
|
tgt_lang=row["tgt_lang"], |
|
|
) |
|
|
|
|
|
timed_text_converter = ConverterDialogContext(timed_text_adapter) |
|
|
|
|
|
|
|
|
src_prev, src_next, tgt_prev, tgt_next, _ = timed_text_converter.__context__( |
|
|
row["src_lang"], row["tgt_lang"], prev_context, next_context, None |
|
|
) |
|
|
|
|
|
return { |
|
|
"tt_src_text": src_text, |
|
|
"tt_tgt_text": tgt_text, |
|
|
"tt_src_prev": src_prev, |
|
|
"tt_src_next": src_next, |
|
|
"tt_tgt_prev": tgt_prev, |
|
|
"tt_tgt_next": tgt_next, |
|
|
} |
|
|
|
|
|
def transform_json(input_json): |
|
|
|
|
|
project_key = list(input_json['projects'].keys())[0] |
|
|
project = input_json['projects'][project_key] |
|
|
|
|
|
final_output = {"labelers": []} |
|
|
|
|
|
for index, label in enumerate(project['labels']): |
|
|
|
|
|
output = { |
|
|
"annotation": { |
|
|
"Accuracy Issues": [], |
|
|
"Readability Issues": [], |
|
|
"Accuracy Score": "", |
|
|
"Readability Score": "", |
|
|
"Confidence Level": "", |
|
|
"Main Vs Alternate": "", |
|
|
"Score": "-1" |
|
|
}, |
|
|
} |
|
|
|
|
|
if 'objects' in label['annotations']: |
|
|
for obj in label['annotations']['objects']: |
|
|
issue = { |
|
|
"Error Location": obj['conversational_location']['message_id'], |
|
|
"Error Span": [ |
|
|
obj['conversational_location']['location']['start'], |
|
|
obj['conversational_location']['location']['end'] |
|
|
], |
|
|
"Error Explanation": "", |
|
|
"Error Quality Category": obj['name'], |
|
|
"Error Quality Tags": [], |
|
|
"Error Severity": "" |
|
|
} |
|
|
|
|
|
|
|
|
for classification in obj['classifications']: |
|
|
if classification['name'] == 'Explanation': |
|
|
issue["Error Explanation"] = classification['text_answer']['content'] |
|
|
elif classification['name'] == 'Quality Tag': |
|
|
issue["Error Quality Tags"] = [ans['name'].lower() for ans in classification['checklist_answers']] |
|
|
elif classification['name'] == 'Quality SubCategory': |
|
|
severity = classification['radio_answer']['name'] |
|
|
if 'Major' in severity: |
|
|
issue["Error Severity"] = "Major" |
|
|
else: |
|
|
issue["Error Severity"] = "Minor" |
|
|
|
|
|
|
|
|
if obj['name'] == 'Style': |
|
|
output['annotation']['Readability Issues'].append(issue) |
|
|
else: |
|
|
output['annotation']['Accuracy Issues'].append(issue) |
|
|
|
|
|
|
|
|
for classification in label['annotations']['classifications']: |
|
|
if classification['name'] == 'Accuracy Score': |
|
|
output['annotation']['Accuracy Score'] = classification['radio_answer']['name'].split(' - ')[0] |
|
|
elif classification['name'] == 'Readability Score': |
|
|
output['annotation']['Readability Score'] = classification['radio_answer']['name'].split(' - ')[0] |
|
|
elif classification['name'] == 'Confidence Level': |
|
|
output['annotation']['Confidence Level'] = classification['radio_answer']['value'] |
|
|
elif classification['name'] == 'Main vs Alternate': |
|
|
output['annotation']['Main Vs Alternate'] = classification['radio_answer']['name'] |
|
|
final_output["labelers"].append(output) |
|
|
return final_output |
|
|
|
|
|
|
|
|
def load_meta_json(priority_key, data_row_key, meta_path): |
|
|
""" |
|
|
Loads and validates metadata json from the specified path based on the priority key and data row key. |
|
|
|
|
|
Args: |
|
|
priority_key (str): Priority key from the label metadata. |
|
|
data_row_key (str): Data row key to find the relevant file. |
|
|
meta_path (str): Path to the metadata folder. |
|
|
|
|
|
Returns: |
|
|
dict: Loaded metadata. |
|
|
""" |
|
|
with open(os.path.join(meta_path, f'{priority_key}.json')) as fread: |
|
|
meta_dict = json.load(fread) |
|
|
|
|
|
_, movie_id, start_end_frame, _, _, _, _ = data_row_key.split('.') |
|
|
start_frame, end_frame = start_end_frame.split('_') |
|
|
|
|
|
if int(meta_dict['movie_id']) != int(movie_id): |
|
|
print("Movie Ids didn't match:", int(meta_dict['movie_id']), int(movie_id), os.path.join(meta_path, f'{priority_key}.json'), data_row_key) |
|
|
exit(0) |
|
|
assert int(meta_dict['start_frame']) == int(start_frame) |
|
|
assert int(meta_dict['end_frame']) == int(end_frame) |
|
|
|
|
|
return meta_dict |
|
|
|
|
|
|
|
|
def process_json(timed_text_adapter, example_row, meta_path, conv_path): |
|
|
""" |
|
|
Takes the full input json, converts it to the required format, and adds context using metadata. |
|
|
|
|
|
Args: |
|
|
timed_text_adapter (TimedTextAdapterFromCache): Adapter to fetch context. |
|
|
example_row (dict): The full input JSON (like the example_row you provided). |
|
|
meta_path (str): Path to the metadata folder to fetch meta json. |
|
|
|
|
|
Returns: |
|
|
dict: The enriched annotation format with context and annotation data. |
|
|
""" |
|
|
|
|
|
annotation_result = transform_json(example_row) |
|
|
|
|
|
|
|
|
data_row_key = example_row['data_row']['global_key'] |
|
|
priority_key = example_row['projects'][list(example_row["projects"].keys())[0]]['project_details']['priority'] |
|
|
|
|
|
annotation_result["Data_Row_Key"] = data_row_key |
|
|
key = ".".join(data_row_key.split(".")[:3]) |
|
|
with open(conv_path + "/" + key + ".json") as file: |
|
|
data = json.load(file) |
|
|
annotation_result["main_tgt_text"] = data["messages"][0]["content"] |
|
|
annotation_result["src_text"] = data["messages"][1]["content"] |
|
|
annotation_result["alt_tgt_text"] = data["messages"][2]["content"] |
|
|
|
|
|
|
|
|
meta_dict = load_meta_json(priority_key, data_row_key, meta_path) |
|
|
|
|
|
|
|
|
annotation_result.update({ |
|
|
"title_id": meta_dict['movie_id'], |
|
|
"start_frame": meta_dict['start_frame'], |
|
|
"end_frame": meta_dict['end_frame'], |
|
|
"src_lang": meta_dict['src_lang'], |
|
|
"tgt_lang": meta_dict['tgt_lang'], |
|
|
}) |
|
|
|
|
|
|
|
|
context_info = fetch_contextual_information(timed_text_adapter, meta_dict) |
|
|
|
|
|
annotation_result.update(context_info) |
|
|
|
|
|
|
|
|
for labeler in annotation_result["labelers"]: |
|
|
|
|
|
for issue in labeler["annotation"]["Accuracy Issues"]: |
|
|
error_location = issue["Error Location"] |
|
|
start, end = issue["Error Span"][0], issue["Error Span"][1] |
|
|
|
|
|
|
|
|
if error_location == "src": |
|
|
actual_text = annotation_result["src_text"][start:end] |
|
|
else: |
|
|
actual_text = annotation_result["main_tgt_text"][start:end] |
|
|
|
|
|
|
|
|
issue["Error Span"] = actual_text |
|
|
|
|
|
|
|
|
for issue in labeler["annotation"]["Readability Issues"]: |
|
|
error_location = issue["Error Location"] |
|
|
start, end = issue["Error Span"] |
|
|
|
|
|
|
|
|
if error_location == "src": |
|
|
actual_text = annotation_result["src_text"][start:end] |
|
|
else: |
|
|
actual_text = annotation_result["main_tgt_text"][start:end] |
|
|
|
|
|
|
|
|
issue["Error Span"] = actual_text |
|
|
|
|
|
return annotation_result |
|
|
|
|
|
|
|
|
def main(): |
|
|
base_path = "MT_TQ/Caches/May2025/tquality.annotated.data/" |
|
|
json_files = [base_path + "raw/" + f for f in os.listdir(base_path + "raw/") if f.endswith('.json')] |
|
|
|
|
|
for json_file in tqdm(json_files): |
|
|
if "calibration" in json_file: |
|
|
print("Warning: Skipping Calibration Data, Remove this if you want to use Calibration data") |
|
|
continue |
|
|
|
|
|
if "PLDL" in json_file: |
|
|
folder = "pldl" |
|
|
timed_text_adapter = TimedTextAdapterFromCache_PLDL( |
|
|
data_dir="/fsx_l10n/l10n_dse_timedtext/cache", num_prev_events=32, num_next_events=32 |
|
|
) |
|
|
elif "SUBS" in json_file: |
|
|
folder = "subs" |
|
|
timed_text_adapter = TimedTextAdapterFromCache_SUBS( |
|
|
data_dir="/fsx_l10n/l10n_dse_timedtext/cache", num_prev_events=32, num_next_events=32 |
|
|
) |
|
|
else: |
|
|
folder = "" |
|
|
assert "invalid json file" |
|
|
|
|
|
langs_type = json_file.split("/")[-1].split("-")[1].replace("_",".") |
|
|
phase = json_file.split("/")[-1].split("-")[3] |
|
|
phase_number = int(''.join(re.findall(r'\d+', phase))) if re.findall(r'\d+', phase) else None |
|
|
phase_date = json_file.split("/")[-1].split("-")[4].replace(".json", "") |
|
|
|
|
|
zzmetapath = f"/root/notebooks/MT_TQ/Caches/labelspace/tquality.zzmeta.data/{folder}/{langs_type}/phase {phase_number} - {phase_date}" |
|
|
|
|
|
meta_path = zzmetapath + "/meta" |
|
|
conv_path = zzmetapath + "/conv" |
|
|
|
|
|
with open(json_file) as file: |
|
|
data = json.load(file) |
|
|
|
|
|
output_data = [] |
|
|
for data_point in tqdm(data): |
|
|
annotation_result = process_json(timed_text_adapter, data_point, meta_path, conv_path) |
|
|
for labeler in annotation_result["labelers"]: |
|
|
_, score = compute_32_point_score(labeler["annotation"], annotation_result["main_tgt_text"]) |
|
|
labeler["annotation"]["Score"] = score |
|
|
|
|
|
output_data.append(annotation_result) |
|
|
|
|
|
with open(base_path + "parsed/" + json_file.split("/")[-1], 'w') as json_file: |
|
|
json.dump({"data": output_data}, json_file, indent=4) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|