import modal import os import random app = modal.App("prepare-finetune-data-parallel") # Volumes vol_census = modal.Volume.from_name("census-data") vol_economy = modal.Volume.from_name("economy-labor-data") vol_dataset = modal.Volume.from_name("finetune-dataset", create_if_missing=True) image = modal.Image.debian_slim().pip_install("pandas", "openpyxl") @app.function( image=image, volumes={ "/data/census": vol_census, "/data/economy": vol_economy } ) def list_csv_files() -> list: """Lists all CSV files in both volumes.""" files = [] # Census for root, _, filenames in os.walk("/data/census"): for f in filenames: if f.lower().endswith('.csv'): files.append({"path": os.path.join(root, f), "source": "Japan Census"}) # Economy for root, _, filenames in os.walk("/data/economy"): for f in filenames: if f.lower().endswith('.csv'): files.append({"path": os.path.join(root, f), "source": "Japan Economy & Labor"}) return files @app.function( image=image, volumes={ "/data/census": vol_census, "/data/economy": vol_economy }, timeout=1200, # 20 minutes for complex files max_containers=100 ) def process_file(file_info: dict) -> list: """Process a single CSV file with robust parsing logic.""" import pandas as pd import re import random file_path = file_info["path"] source_name = file_info["source"] data_points = [] def clean_value(val): """Clean and normalize values""" if pd.isna(val): return None val_str = str(val).strip() # Remove leading codes like "13103_" val_str = re.sub(r'^\d+_', '', val_str) # Remove numpy type wrappers val_str = re.sub(r'^np\.(int|float)\d*\((.+)\)$', r'\2', val_str) return val_str if val_str and val_str.lower() not in ['nan', 'none', ''] else None try: # Extract title from filename filename = os.path.basename(file_path) filename_no_ext = os.path.splitext(filename)[0] parts = filename_no_ext.split('_', 1) title = parts[1].replace('_', ' ') if len(parts) > 1 else filename_no_ext # Strategy 1: Try Cross-Tabulation Parsing (Row 7 + Row 9 headers) # This is common in census data try: df_headers = pd.read_csv(file_path, header=None, nrows=15, low_memory=False) # Check if Row 7 and Row 9 look like headers if len(df_headers) >= 10: row7 = df_headers.iloc[7] row9 = df_headers.iloc[9] # Heuristic: Row 9 has metadata in first few cols, Row 7 has destinations in later cols if pd.notna(row9[0]) and pd.notna(row7[4]): headers = [] # Cols 0-3 from Row 9 for i in range(min(4, len(row9))): val = clean_value(row9[i]) headers.append(val if val else f"Meta_{i}") # Cols 4+ from Row 7 for i in range(4, len(row7)): val = clean_value(row7[i]) headers.append(f"Dest_{val}" if val else f"Col_{i}") # Read data skipping metadata df = pd.read_csv(file_path, header=None, skiprows=10, low_memory=False) # Adjust header length if len(df.columns) < len(headers): headers = headers[:len(df.columns)] else: headers += [f"Extra_{i}" for i in range(len(headers), len(df.columns))] df.columns = headers except: df = None # Strategy 2: Fallback to Smart Header Detection if Strategy 1 failed if df is None or df.empty: df_raw = pd.read_csv(file_path, header=None, low_memory=False) # Find header row header_row_idx = None data_start_idx = 0 for i in range(min(20, len(df_raw))): row = df_raw.iloc[i] non_null_count = row.count() if non_null_count < len(df_raw.columns) * 0.3: continue # Skip if too many Unnamed unnamed_count = sum(1 for val in row if pd.notna(val) and "Unnamed" in str(val)) if unnamed_count > non_null_count * 0.3: continue # Check for string headers header_like = sum(1 for val in row if pd.notna(val) and not str(val).replace('.','').isdigit()) if header_like >= non_null_count * 0.5: header_row_idx = i data_start_idx = i + 1 break if header_row_idx is not None: headers = df_raw.iloc[header_row_idx].tolist() df = df_raw.iloc[data_start_idx:].reset_index(drop=True) df.columns = headers else: return [] # Common Cleaning Steps # Deduplicate headers unique_headers = [] seen_headers = {} for h in df.columns: h_clean = clean_value(h) or "Unknown" if h_clean in seen_headers: seen_headers[h_clean] += 1 unique_headers.append(f"{h_clean}_{seen_headers[h_clean]}") else: seen_headers[h_clean] = 0 unique_headers.append(h_clean) df.columns = unique_headers # Filter valid columns valid_cols = [c for c in df.columns if "Unknown" not in c and "Unnamed" not in c] if len(valid_cols) < 2: return [] df = df[valid_cols] # Clean values for col in df.columns: if df[col].dtype == 'object': df[col] = df[col].apply(clean_value) df = df.dropna(how='all') if len(df) == 0: return [] # Generate QA Pairs # Sample 200 rows per file sample_rows = 200 if len(df) > sample_rows: df_sample = df.sample(sample_rows, random_state=42) else: df_sample = df label_col = df.columns[0] # Assume first column is the label (Area Name) value_cols = df.columns[1:] for _, row in df_sample.iterrows(): row_label = row[label_col] if not row_label: continue # Create 3 QA pairs per row for _ in range(3): if len(value_cols) == 0: break col = random.choice(value_cols) val = row[col] if not val: continue question = f"What is the {col} for {row_label} in the '{title}' dataset?" answer = f"According to '{title}', the {col} for {row_label} is {val}." entry = { "instruction": question, "input": f"Context: {source_name} data.", "output": answer } data_points.append(entry) except Exception as e: pass return data_points @app.local_entrypoint() def main(): import json print("Listing files...") files = list_csv_files.remote() print(f"Found {len(files)} files. Starting parallel processing...") # Process in batches batch_size = 1000 total_train = 0 total_val = 0 for batch_start in range(0, len(files), batch_size): batch_end = min(batch_start + batch_size, len(files)) batch_files = files[batch_start:batch_end] print(f"Processing batch {batch_start//batch_size + 1}/{(len(files)-1)//batch_size + 1} ({len(batch_files)} files)...") batch_data = [] for result in process_file.map(batch_files): batch_data.extend(result) print(f"Batch generated {len(batch_data)} data points") if not batch_data: continue # Shuffle and split random.shuffle(batch_data) split_idx = int(len(batch_data) * 0.9) train_batch = batch_data[:split_idx] val_batch = batch_data[split_idx:] # Save save_batch.remote(train_batch, val_batch, batch_start == 0) total_train += len(train_batch) total_val += len(val_batch) print(f"Saved {len(train_batch)} train, {len(val_batch)} val. Total: {total_train} train, {total_val} val") print(f"✅ Done! Total: {total_train} train, {total_val} val") @app.function( image=image, volumes={"/data/dataset": vol_dataset}, timeout=600 ) def save_batch(train_data, val_data, is_first_batch): import json mode = 'w' if is_first_batch else 'a' with open("/data/dataset/train.jsonl", mode, encoding='utf-8') as f: for entry in train_data: json.dump(entry, f, ensure_ascii=False) f.write('\n') with open("/data/dataset/val.jsonl", mode, encoding='utf-8') as f: for entry in val_data: json.dump(entry, f, ensure_ascii=False) f.write('\n') vol_dataset.commit()