dasco / convert_data.py
toilachuoituyet's picture
Upload folder using huggingface_hub
8a91a08 verified
#!/usr/bin/env python3
"""
Convert JSON data to DASCO pkl format for training.
Splits data into train/dev/test and converts to required format.
"""
import json
import pickle
import os
import numpy as np
import stanza
import argparse
import torch
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings("ignore")
# Polarity mapping - must match ParseData expected format
POLARITY_MAP = {
"Positive": "POS",
"Negative": "NEG",
"Neutral": "NEU",
"Unknown": "NEU"
}
def run_dependency_parsing(text: str, nlp) -> dict:
"""Run Stanza dependency parsing."""
try:
doc = nlp(text)
tokens = []
postag = []
edges = []
deprels = []
for sent in doc.sentences:
for word in sent.words:
tokens.append(word.text)
postag.append(word.upos)
edges.append(word.head) # head index (1-indexed, 0 for root)
deprels.append(word.deprel)
return {
"token": tokens,
"postag": postag,
"edges": edges,
"deprels": deprels
}
except Exception as e:
print(f"Parsing error: {e}")
# Fallback: simple tokenization
tokens = text.split()
n = len(tokens)
return {
"token": tokens,
"postag": ["NOUN"] * n,
"edges": [0] + list(range(1, n)), # Simple chain dependency
"deprels": ["root"] + ["dep"] * (n - 1) if n > 0 else []
}
def create_scope_offsets(aspect_from: int, aspect_to: int, text_len: int, window: int = 3) -> list:
"""
Create scope as [left_offset, right_offset].
ParseData expects: aspect_scope = [id_b - s_b, s_e - id_e]
So we need to return [start_pos, end_pos] where:
- start_pos = aspect_from - left_offset
- end_pos = aspect_to + right_offset
"""
left_offset = min(window, aspect_from)
right_offset = min(window, text_len - 1 - aspect_to) if text_len > 0 else 0
# scope format: [start_pos, end_pos] of the scope window
start_pos = aspect_from - left_offset
end_pos = aspect_to + right_offset
return [start_pos, end_pos]
def convert_sample(sample: dict, image_features: dict, nlp) -> dict:
"""Convert single JSON sample to DASCO pkl format."""
# Get image features
url = sample.get("photo_url", "")
if url in image_features:
image_feature = image_features[url]["features"]
else:
print(f"Warning: Image not found in cache: {url[:50]}...")
image_feature = np.zeros((257, 1408), dtype=np.float32)
# Scene graph (photo_caption)
scene_graph = sample.get("photo_caption", "")
# Text processing
text = sample.get("review", "")
# Dependency parsing - MUST run before building aspects
parse_result = run_dependency_parsing(text, nlp)
text_tokens = parse_result["token"] # Use parsed tokens, not simple split
text_len = len(text_tokens)
# Build aspects with polarity and scope
aspects = []
review_aspects = sample.get("review_aspects", [])
opinion_categories = sample.get("review_opinion_categories", [])
for i, aspect in enumerate(review_aspects):
polarity = POLARITY_MAP.get(
opinion_categories[i] if i < len(opinion_categories) else "Unknown",
"NEU"
)
# Validate aspect positions
aspect_from = aspect["from"]
aspect_to = aspect["to"]
# Clamp to valid range
aspect_from = max(0, min(aspect_from, text_len - 1)) if text_len > 0 else 0
aspect_to = max(0, min(aspect_to, text_len - 1)) if text_len > 0 else 0
# Create scope window
scope = create_scope_offsets(aspect_from, aspect_to, text_len)
aspects.append({
"term": aspect["term"],
"from": aspect_from,
"to": aspect_to,
"polarity": polarity,
"scope": scope
})
# Build nouns from aspects (each aspect term is also a noun)
nouns = []
for aspect in aspects:
nouns.append({
"term": aspect["term"],
"from": aspect["from"],
"to": aspect["to"],
"scope": aspect["scope"] # Same scope format
})
# Query input
query_input = "Extract aspect terms and their sentiment polarity"
# Target: list of aspect terms
target = [asp["term"] for asp in aspects] if aspects else [text]
# Parse info - critical structure for ParseData function
parse_info = {
"token": parse_result["token"],
"postag": parse_result["postag"],
"edges": parse_result["edges"],
"deprels": parse_result["deprels"],
"aspects": [{
"term": asp["term"],
"from": asp["from"],
"to": asp["to"],
"polarity": asp["polarity"],
"scope": asp["scope"]
} for asp in aspects]
}
return {
"image_feature": image_feature,
"query_input": query_input,
"scene_graph": scene_graph,
"text_input": text,
"target": target,
"nouns": nouns,
"parse_info": parse_info
}
def save_as_pkl(data: list, output_dir: str, batch_size: int = 100):
"""Save data as pkl files in batches."""
os.makedirs(output_dir, exist_ok=True)
# Calculate number of batches
num_batches = (len(data) + batch_size - 1) // batch_size
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
batch_idx = i // batch_size
output_path = os.path.join(output_dir, f"data_{batch_idx}.pkl")
with open(output_path, 'wb') as f:
pickle.dump(batch, f)
print(f"Saved {len(data)} samples to {output_dir} ({num_batches} files)")
def main(args):
# Load image cache
print(f"Loading image cache from {args.image_cache}...")
with open(args.image_cache, 'rb') as f:
image_features = pickle.load(f)
print(f"✓ Loaded {len(image_features)} cached images")
# Initialize Stanza with GPU if available
print("Initializing Stanza NLP pipeline...")
use_gpu = torch.cuda.is_available()
try:
stanza.download('en', verbose=False)
except:
pass
nlp = stanza.Pipeline(
'en',
processors='tokenize,pos,lemma,depparse',
verbose=False,
use_gpu=use_gpu
)
print(f"✓ Stanza initialized (GPU: {use_gpu})")
# Load input data
print(f"Loading {args.input}...")
with open(args.input, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"✓ Loaded {len(data)} samples")
# Filter samples with valid images
print("Filtering samples with cached images...")
valid_data = [s for s in tqdm(data, desc="Filtering") if s.get("photo_url") in image_features]
print(f"✓ Valid samples: {len(valid_data)} / {len(data)}")
if len(valid_data) == 0:
print("ERROR: No valid samples found! Check image cache.")
return
# Split data
train_data, temp_data = train_test_split(
valid_data,
test_size=(args.val_ratio + args.test_ratio),
random_state=42
)
val_ratio_adjusted = args.val_ratio / (args.val_ratio + args.test_ratio)
val_data, test_data = train_test_split(
temp_data,
test_size=(1 - val_ratio_adjusted),
random_state=42
)
print(f"Split: Train={len(train_data)}, Val={len(val_data)}, Test={len(test_data)}")
# Convert and save each split
for split_name, split_data in [("train", train_data), ("dev", val_data), ("test", test_data)]:
print(f"\n{'='*50}")
print(f"Converting {split_name} set ({len(split_data)} samples)...")
converted = []
errors = 0
pbar = tqdm(split_data, desc=f"Converting {split_name}", unit="sample")
for sample in pbar:
try:
result = convert_sample(sample, image_features, nlp)
converted.append(result)
pbar.set_postfix({"converted": len(converted), "errors": errors})
except Exception as e:
errors += 1
pbar.set_postfix({"converted": len(converted), "errors": errors})
continue
output_dir = os.path.join(args.output, split_name)
save_as_pkl(converted, output_dir, args.batch_size)
print(f"✓ {split_name}: {len(converted)} samples saved ({errors} errors)")
print("\n=== Conversion Complete ===")
print(f"Output directory: {args.output}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert JSON to DASCO pkl format")
parser.add_argument("--input", type=str, default="./data/text_image_dataset.json",
help="Input JSON file")
parser.add_argument("--image_cache", type=str, default="./image_cache/features.pkl",
help="Image features cache file")
parser.add_argument("--output", type=str, default="./finetune_dataset/custom",
help="Output directory")
parser.add_argument("--train_ratio", type=float, default=0.8)
parser.add_argument("--val_ratio", type=float, default=0.1)
parser.add_argument("--test_ratio", type=float, default=0.1)
parser.add_argument("--batch_size", type=int, default=100,
help="Samples per pkl file")
args = parser.parse_args()
main(args)