|
|
import json |
|
|
from datetime import datetime |
|
|
from typing import Any, Dict, Tuple |
|
|
|
|
|
from scipy.stats import spearmanr |
|
|
|
|
|
with open("../../data/human_log_attribution.jsonl", "r") as f: |
|
|
data = [json.loads(line) for line in f] |
|
|
|
|
|
|
|
|
def convert_to_unix_timestamp_milliseconds(date_time: str) -> int: |
|
|
timestamp_dt = datetime.fromisoformat(date_time.rstrip("Z")) |
|
|
unix_timestamp_milliseconds = int(timestamp_dt.timestamp() * 1000) |
|
|
return unix_timestamp_milliseconds |
|
|
|
|
|
|
|
|
def preprocessing( |
|
|
data: list[Dict[str, Any]] |
|
|
) -> Tuple[list[Any], list[Any], list[Any]]: |
|
|
count_single = 0 |
|
|
count_double = 0 |
|
|
result = [] |
|
|
convs = [] |
|
|
key_uttrs = [] |
|
|
for item in data: |
|
|
conv = [] |
|
|
key_uttr = [] |
|
|
for key in item["attributed_utterances"]: |
|
|
if ( |
|
|
len(item["attributed_utterances"][key]) > 2 |
|
|
and len(item["attributed_utterances"][key][2]) > 0 |
|
|
): |
|
|
count_single += 1 |
|
|
if len(item["attributed_utterances"][key][2]) > 1: |
|
|
count_double += 1 |
|
|
result.append(item["attributed_utterances"][key]) |
|
|
conv.append(item["attributed_utterances"][key][2]) |
|
|
key_uttr.append(item["attributed_utterances"][key][3]) |
|
|
if len(conv) > 1: |
|
|
convs.append(conv) |
|
|
key_uttrs.append(key_uttr) |
|
|
print(f"Count of single annotated: {count_single}") |
|
|
print(f"Count of double annotated: {count_double}") |
|
|
return result, convs, key_uttrs |
|
|
|
|
|
|
|
|
def calc_correlation(result: list[Any]) -> None: |
|
|
ann0 = [] |
|
|
ann1 = [] |
|
|
for res in result: |
|
|
lis = [] |
|
|
for key, item in res[2].items(): |
|
|
time_int = convert_to_unix_timestamp_milliseconds(key) |
|
|
lis.append((time_int, item)) |
|
|
lis.sort(key=lambda x: x[0]) |
|
|
ann0.append(lis[0][1]) |
|
|
ann1.append(lis[1][1]) |
|
|
|
|
|
|
|
|
corr: Tuple[float, float] = spearmanr(ann0, ann1) |
|
|
print(f"spearmanr: {corr}") |
|
|
|
|
|
|
|
|
count_exact_match = 0 |
|
|
for i in range(len(ann0)): |
|
|
if ann0[i] == ann1[i]: |
|
|
count_exact_match += 1 |
|
|
|
|
|
average_difference = sum([abs(a - b) for a, b in zip(ann0, ann1)]) / len( |
|
|
ann0 |
|
|
) |
|
|
print(f"Count of exact match: {count_exact_match}") |
|
|
print(f"Percentage of exact match: {count_exact_match / len(ann0)}") |
|
|
print(f"Average difference: {average_difference}") |
|
|
|
|
|
|
|
|
def calc_3_agreement(convs: list[Any]) -> None: |
|
|
agreement_list = [] |
|
|
agreement_linient_list = [] |
|
|
for conv in convs: |
|
|
ann0, ann1 = [], [] |
|
|
for pair in conv: |
|
|
lis = [] |
|
|
for key, item in pair.items(): |
|
|
time_int = convert_to_unix_timestamp_milliseconds(key) |
|
|
lis.append((time_int, item)) |
|
|
lis.sort(key=lambda x: x[0]) |
|
|
ann0.append(lis[0][1]) |
|
|
ann1.append(lis[1][1]) |
|
|
key_utter_0 = ann0.index(3) if 3 in ann0 else -1 |
|
|
key_utter_1 = ann1.index(3) if 3 in ann1 else -1 |
|
|
agreement_list.append(key_utter_0 == key_utter_1) |
|
|
agreement_linient_list.append(abs(key_utter_0 - key_utter_1) <= 1) |
|
|
|
|
|
print( |
|
|
"3 Rating Agreement Rate: ", sum(agreement_list) / len(agreement_list) |
|
|
) |
|
|
print( |
|
|
"3 Rating Agreement Linient Rate: ", |
|
|
sum(agreement_linient_list) / len(agreement_linient_list), |
|
|
) |
|
|
|
|
|
|
|
|
def calc_key_uttr_agreement(convs: list[Any]) -> None: |
|
|
agreement_list = [] |
|
|
agreement_linient_list = [] |
|
|
ann0, ann1 = [], [] |
|
|
for conv in convs: |
|
|
|
|
|
for pair in conv: |
|
|
|
|
|
lis = [] |
|
|
for key, item in pair.items(): |
|
|
time_int = convert_to_unix_timestamp_milliseconds(key) |
|
|
lis.append((time_int, item)) |
|
|
lis.sort(key=lambda x: x[0]) |
|
|
if len(lis) < 2: |
|
|
continue |
|
|
assert lis[0][0] != lis[1][0] |
|
|
|
|
|
ann0.append(1 if lis[0][1] == "Yes" else 0) |
|
|
ann1.append(1 if lis[1][1] == "Yes" else 0) |
|
|
|
|
|
|
|
|
print(len(ann0)) |
|
|
for i in range(len(ann0)): |
|
|
if ann0[i] == ann1[i]: |
|
|
agreement_list.append(1) |
|
|
else: |
|
|
agreement_list.append(0) |
|
|
|
|
|
for i in range(len(ann0)): |
|
|
if abs(ann0[i] - ann1[i]) <= 1: |
|
|
agreement_linient_list.append(1) |
|
|
else: |
|
|
agreement_linient_list.append(0) |
|
|
|
|
|
print( |
|
|
"Key Utterance Agreement Rate: ", |
|
|
sum(agreement_list) / len(agreement_list), |
|
|
) |
|
|
print( |
|
|
"Key Utterance Agreement Linient Rate: ", |
|
|
sum(agreement_linient_list) / len(agreement_linient_list), |
|
|
) |
|
|
print("Confusion Matrix with Annotator 0 as Ground Truth:") |
|
|
|
|
|
tp, tn, fp, fn = 0, 0, 0, 0 |
|
|
for i in range(len(ann0)): |
|
|
if ann0[i] == 1: |
|
|
if ann1[i] == 1: |
|
|
tp += 1 |
|
|
else: |
|
|
fn += 1 |
|
|
else: |
|
|
if ann1[i] == 1: |
|
|
fp += 1 |
|
|
else: |
|
|
tn += 1 |
|
|
print(f"True Positive: {tp}") |
|
|
print(f"True Negative: {tn}") |
|
|
print(f"False Positive: {fp}") |
|
|
print(f"False Negative: {fn}") |
|
|
|
|
|
print(f"Precision: {tp / (tp + fp)}") |
|
|
print(f"Recall: {tp / (tp + fn)}") |
|
|
print(f"Specificity: {tn / (tn + fp)}") |
|
|
print(f"F1 Score: {2 * tp / (2 * tp + fp + fn)}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
result, convs, key_uttrs = preprocessing(data) |
|
|
calc_correlation(result) |
|
|
calc_3_agreement(convs) |
|
|
calc_key_uttr_agreement(key_uttrs) |
|
|
|