Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Data Cleaning Script - Cleans all data using a simplified regex method and saves the results | |
| Features: | |
| 1. Cleans all cases using a simplified regex method. | |
| 2. Saves the cleaned data for each case. | |
| 3. Ensures the relative order of dicts remains unchanged. | |
| 4. Generates a before-and-after cleaning report. | |
| """ | |
| import json | |
| import re | |
| import os | |
| from typing import Dict, List, Tuple, Optional, Any | |
| from dataclasses import dataclass | |
| from collections import Counter | |
| import traceback | |
| class CleanedData: | |
| """Data structure for cleaned data""" | |
| case_id: int | |
| original_type: str # 'list' or 'str' | |
| original_length: int | |
| cleaned_data: List[Dict] | |
| cleaning_operations: Dict[str, Any] # Records the cleaning operations performed | |
| success: bool | |
| class OutputCleaner: | |
| """Data Cleaner - Based on a simplified regex method""" | |
| def __init__(self): | |
| # Simplified regular expression patterns | |
| self.dict_pattern = re.compile(r'\{[^{}]*?"bbox"\s*:\s*\[[^\]]*?\][^{}]*?\}', re.DOTALL) | |
| self.bbox_pattern = re.compile(r'"bbox"\s*:\s*\[([^\]]+)\]') | |
| self.missing_delimiter_pattern = re.compile(r'\}\s*\{(?!")') | |
| self.cleaned_results: List[CleanedData] = [] | |
| def clean_list_data(self, data: List[Dict], case_id: int) -> CleanedData: | |
| """Cleans list-type data""" | |
| print(f"π§ Cleaning List data - Case {case_id}") | |
| print(f" Original items: {len(data)}") | |
| cleaned_data = [] | |
| operations = { | |
| 'type': 'list', | |
| 'bbox_fixes': 0, | |
| 'removed_items': 0, | |
| 'original_count': len(data) | |
| } | |
| for i, item in enumerate(data): | |
| if not isinstance(item, dict): | |
| operations['removed_items'] += 1 | |
| continue | |
| # Check the bbox field | |
| if 'bbox' in item: | |
| bbox = item['bbox'] | |
| # Check bbox length - core logic | |
| if isinstance(bbox, list) and len(bbox) == 3: | |
| print(f" β οΈ Item {i}: bbox has only 3 coordinates. Removing bbox, keeping category and text.") | |
| # Keep only category and text, ensuring order is preserved | |
| new_item = {} | |
| if 'category' in item: | |
| new_item['category'] = item['category'] | |
| if 'text' in item: | |
| new_item['text'] = item['text'] | |
| if new_item: # Add only if there is valid content | |
| cleaned_data.append(new_item) | |
| operations['bbox_fixes'] += 1 | |
| else: | |
| operations['removed_items'] += 1 | |
| continue | |
| elif isinstance(bbox, list) and len(bbox) == 4: | |
| # bbox is normal, add directly, preserving original order | |
| cleaned_data.append(item.copy()) | |
| continue | |
| else: | |
| print(f" β Item {i}: Abnormal bbox format, skipping.") | |
| operations['removed_items'] += 1 | |
| continue | |
| else: | |
| # No bbox field, keep if category exists | |
| if 'category' in item: | |
| cleaned_data.append(item.copy()) | |
| continue | |
| else: | |
| operations['removed_items'] += 1 | |
| operations['final_count'] = len(cleaned_data) | |
| print(f" β Cleaning complete: {len(cleaned_data)} items, {operations['bbox_fixes']} bbox fixes, {operations['removed_items']} items removed") | |
| return CleanedData( | |
| case_id=case_id, | |
| original_type='list', | |
| original_length=len(data), | |
| cleaned_data=cleaned_data, | |
| cleaning_operations=operations, | |
| success=True | |
| ) | |
| def clean_string_data(self, data_str: str, case_id: int) -> CleanedData: | |
| """Cleans string-type data""" | |
| print(f"π§ Cleaning String data - Case {case_id}") | |
| print(f" Original length: {len(data_str):,}") | |
| operations = { | |
| 'type': 'str', | |
| 'original_length': len(data_str), | |
| 'delimiter_fixes': 0, | |
| 'tail_truncated': False, | |
| 'truncated_length': 0, | |
| 'duplicate_dicts_removed': 0, | |
| 'final_objects': 0 | |
| } | |
| try: | |
| # Step 1: Detect and fix missing delimiters | |
| data_str, delimiter_fixes = self._fix_missing_delimiters(data_str) | |
| operations['delimiter_fixes'] = delimiter_fixes | |
| # Step 2: Truncate the last incomplete element | |
| data_str, tail_truncated = self._truncate_last_incomplete_element(data_str) | |
| operations['tail_truncated'] = tail_truncated | |
| operations['truncated_length'] = len(data_str) | |
| # Step 3: Remove duplicate complete dict objects, preserving order | |
| data_str, duplicate_removes = self._remove_duplicate_complete_dicts_preserve_order(data_str) | |
| operations['duplicate_dicts_removed'] = duplicate_removes | |
| # Step 4: Ensure correct JSON format | |
| data_str = self._ensure_json_format(data_str) | |
| # Step 5: Try to parse the final result | |
| final_data = self._parse_final_json(data_str) | |
| if final_data is not None: | |
| operations['final_objects'] = len(final_data) | |
| print(f" β Cleaning complete: {len(final_data)} objects") | |
| return CleanedData( | |
| case_id=case_id, | |
| original_type='str', | |
| original_length=operations['original_length'], | |
| cleaned_data=final_data, | |
| cleaning_operations=operations, | |
| success=True | |
| ) | |
| else: | |
| raise Exception("Could not parse the cleaned data") | |
| except Exception as e: | |
| print(f" β Cleaning failed: {e}") | |
| return CleanedData( | |
| case_id=case_id, | |
| original_type='str', | |
| original_length=operations['original_length'], | |
| cleaned_data=[], | |
| cleaning_operations=operations, | |
| success=False | |
| ) | |
| def _fix_missing_delimiters(self, text: str) -> Tuple[str, int]: | |
| """Fixes missing delimiters""" | |
| fixes = 0 | |
| def replace_delimiter(match): | |
| nonlocal fixes | |
| fixes += 1 | |
| return '},{' | |
| text = self.missing_delimiter_pattern.sub(replace_delimiter, text) | |
| if fixes > 0: | |
| print(f" β Fixed {fixes} missing delimiters") | |
| return text, fixes | |
| def _truncate_last_incomplete_element(self, text: str) -> Tuple[str, bool]: | |
| """Truncates the last incomplete element""" | |
| # For very long text (>50k) or text not ending with ']', directly truncate the last '{"bbox":' | |
| needs_truncation = ( | |
| len(text) > 50000 or | |
| not text.strip().endswith(']') | |
| ) | |
| if needs_truncation: | |
| # Check how many dict objects there are | |
| bbox_count = text.count('{"bbox":') | |
| # If there is only one dict object, do not truncate to avoid deleting the only object | |
| if bbox_count <= 1: | |
| print(f" β οΈ Only {bbox_count} dict objects found, skipping truncation to avoid deleting all content") | |
| return text, False | |
| # Find the position of the last '{"bbox":' | |
| last_bbox_pos = text.rfind('{"bbox":') | |
| if last_bbox_pos > 0: | |
| # Truncate before this position | |
| truncated_text = text[:last_bbox_pos].rstrip() | |
| # Remove trailing comma | |
| if truncated_text.endswith(','): | |
| truncated_text = truncated_text[:-1] | |
| print(f" βοΈ Truncated the last incomplete element, length reduced from {len(text):,} to {len(truncated_text):,}") | |
| return truncated_text, True | |
| return text, False | |
| def _remove_duplicate_complete_dicts_preserve_order(self, text: str) -> Tuple[str, int]: | |
| """Removes duplicate complete dict objects, preserving original order""" | |
| # Extract all dict objects, preserving order | |
| dict_matches = list(self.dict_pattern.finditer(text)) | |
| if not dict_matches: | |
| return text, 0 | |
| print(f" π Found {len(dict_matches)} dict objects") | |
| # Deduplication while preserving order: only keep the first occurrence of a dict | |
| unique_dicts = [] | |
| seen_dict_strings = set() | |
| total_duplicates = 0 | |
| for match in dict_matches: | |
| dict_str = match.group() | |
| if dict_str not in seen_dict_strings: | |
| unique_dicts.append(dict_str) | |
| seen_dict_strings.add(dict_str) | |
| else: | |
| total_duplicates += 1 | |
| if total_duplicates > 0: | |
| # Reconstruct the JSON array, preserving the original order | |
| new_text = '[' + ', '.join(unique_dicts) + ']' | |
| print(f" β Removed {total_duplicates} duplicate dicts, keeping {len(unique_dicts)} unique dicts (order preserved)") | |
| return new_text, total_duplicates | |
| else: | |
| print(f" β No duplicate dict objects found") | |
| return text, 0 | |
| def _ensure_json_format(self, text: str) -> str: | |
| """Ensures correct JSON format""" | |
| text = text.strip() | |
| if not text.startswith('['): | |
| text = '[' + text | |
| if not text.endswith(']'): | |
| # Remove trailing comma | |
| text = text.rstrip(',').rstrip() | |
| text += ']' | |
| return text | |
| def _parse_final_json(self, text: str) -> Optional[List[Dict]]: | |
| """Tries to parse the final JSON""" | |
| try: | |
| data = json.loads(text) | |
| if isinstance(data, list): | |
| return data | |
| except json.JSONDecodeError as e: | |
| print(f" β JSON parsing failed: {e}") | |
| # fallback1: Extract valid dict objects | |
| valid_dicts = [] | |
| for match in self.dict_pattern.finditer(text): | |
| dict_str = match.group() | |
| try: | |
| dict_obj = json.loads(dict_str) | |
| valid_dicts.append(dict_obj) | |
| except: | |
| continue | |
| if valid_dicts: | |
| print(f" β Extracted {len(valid_dicts)} valid dicts") | |
| return valid_dicts | |
| # fallback2: Special handling for a single incomplete dict | |
| return self._handle_single_incomplete_dict(text) | |
| return None | |
| def _handle_single_incomplete_dict(self, text: str) -> Optional[List[Dict]]: | |
| """Handles the special case of a single incomplete dict""" | |
| # Check if it's a single incomplete dict case | |
| if not text.strip().startswith('[{"bbox":'): | |
| return None | |
| try: | |
| # Try to extract bbox coordinates | |
| bbox_match = re.search(r'"bbox"\s*:\s*\[([^\]]+)\]', text) | |
| if not bbox_match: | |
| return None | |
| bbox_str = bbox_match.group(1) | |
| bbox_coords = [int(x.strip()) for x in bbox_str.split(',')] | |
| if len(bbox_coords) != 4: | |
| return None | |
| # Try to extract category | |
| category_match = re.search(r'"category"\s*:\s*"([^"]+)"', text) | |
| category = category_match.group(1) if category_match else "Text" | |
| # Try to extract the beginning of the text (first 10000 characters) | |
| text_match = re.search(r'"text"\s*:\s*"([^"]{0,10000})', text) | |
| if text_match: | |
| text_content = text_match.group(1) | |
| else: | |
| text_content = "" | |
| # Construct the fixed dict | |
| fixed_dict = { | |
| "bbox": bbox_coords, | |
| "category": category | |
| } | |
| if text_content: | |
| fixed_dict["text"] = text_content | |
| print(f" π§ Special fix: single incomplete dict β {fixed_dict}") | |
| return [fixed_dict] | |
| except Exception as e: | |
| print(f" β Special fix failed: {e}") | |
| return None | |
| def remove_duplicate_category_text_pairs_and_bbox(self, data_list: List[dict], case_id: int) -> List[dict]: | |
| """Removes duplicate category-text pairs and duplicate bboxes""" | |
| if not data_list or len(data_list) <= 1: | |
| print(f" π Data length {len(data_list)} <= 1, skipping deduplication check") | |
| return data_list | |
| print(f" π Original data length: {len(data_list)}") | |
| # 1. Count occurrences and positions of each category-text pair | |
| category_text_pairs = {} | |
| for i, item in enumerate(data_list): | |
| if isinstance(item, dict) and 'category' in item and 'text' in item: | |
| pair_key = (item.get('category', ''), item.get('text', '')) | |
| if pair_key not in category_text_pairs: | |
| category_text_pairs[pair_key] = [] | |
| category_text_pairs[pair_key].append(i) | |
| # 2. Count occurrences and positions of each bbox | |
| bbox_pairs = {} | |
| for i, item in enumerate(data_list): | |
| if isinstance(item, dict) and 'bbox' in item: | |
| bbox = item.get('bbox') | |
| if isinstance(bbox, list) and len(bbox) > 0: | |
| bbox_key = tuple(bbox) # Convert to tuple to use as a dictionary key | |
| if bbox_key not in bbox_pairs: | |
| bbox_pairs[bbox_key] = [] | |
| bbox_pairs[bbox_key].append(i) | |
| # 3. Identify items to be removed | |
| duplicates_to_remove = set() | |
| # 3a. Process category-text pairs that appear 5 or more times | |
| for pair_key, positions in category_text_pairs.items(): | |
| if len(positions) >= 5: | |
| category, text = pair_key | |
| # Keep the first occurrence, remove subsequent duplicates | |
| positions_to_remove = positions[1:] | |
| duplicates_to_remove.update(positions_to_remove) | |
| print(f" π Found duplicate category-text pair: category='{category}', first 50 chars of text='{text[:50]}...'") | |
| print(f" Count: {len(positions)}, removing at positions: {positions_to_remove}") | |
| # 3b. Process bboxes that appear 2 or more times | |
| for bbox_key, positions in bbox_pairs.items(): | |
| if len(positions) >= 2: | |
| # Keep the first occurrence, remove subsequent duplicates | |
| positions_to_remove = positions[1:] | |
| duplicates_to_remove.update(positions_to_remove) | |
| print(f" π Found duplicate bbox: {list(bbox_key)}") | |
| print(f" Count: {len(positions)}, removing at positions: {positions_to_remove}") | |
| if not duplicates_to_remove: | |
| print(f" β No category-text pairs or bboxes found exceeding the duplication threshold") | |
| return data_list | |
| # 4. Remove duplicate items from the original data (preserving order) | |
| cleaned_data = [] | |
| removed_count = 0 | |
| for i, item in enumerate(data_list): | |
| if i not in duplicates_to_remove: | |
| cleaned_data.append(item) | |
| else: | |
| removed_count += 1 | |
| print(f" β Deduplication complete: Removed {removed_count} duplicate items") | |
| print(f" π Cleaned data length: {len(cleaned_data)}") | |
| return cleaned_data | |
| def clean_model_output(self, model_output: str): | |
| try: | |
| # Select cleaning method based on data type | |
| if isinstance(model_output, list): | |
| result = self.clean_list_data(model_output, case_id=0) | |
| else: | |
| result = self.clean_string_data(str(model_output), case_id=0) | |
| # Add deduplication step: remove duplicate category-text pairs and bboxes | |
| if result and hasattr(result, 'success') and result.success and result.cleaned_data: | |
| original_data = result.cleaned_data | |
| deduplicated_data = self.remove_duplicate_category_text_pairs_and_bbox(original_data, case_id=0) | |
| # Update the cleaned_data in the CleanedData object | |
| result.cleaned_data = deduplicated_data | |
| return result.cleaned_data | |
| except Exception as e: | |
| print(f"β Case cleaning failed: {e}") | |
| return model_output | |
| def clean_all_data(self, jsonl_path: str) -> List[CleanedData]: | |
| """Cleans all data from a JSONL file""" | |
| print(f"π Starting to clean JSONL file: {jsonl_path}") | |
| with open(jsonl_path, 'r', encoding='utf-8') as f: | |
| lines = f.readlines() | |
| datas = [] | |
| for i, line in enumerate(lines): | |
| if line.strip(): | |
| try: | |
| data = json.loads(line) | |
| predict_field = data.get('predict') | |
| case_id = i + 1 | |
| print(f"\n{'='*50}") | |
| print(f"π― Cleaning Case {case_id}") | |
| print(f"{'='*50}") | |
| # Select cleaning method based on data type | |
| if isinstance(predict_field, list): | |
| print("π Data type: List") | |
| result = self.clean_list_data(predict_field, case_id) | |
| else: | |
| print("π Data type: String") | |
| result = self.clean_string_data(str(predict_field), case_id) | |
| # Add deduplication step: remove duplicate category-text pairs and bboxes | |
| if result and hasattr(result, 'success') and result.success and result.cleaned_data: | |
| print("π Checking for and removing duplicate category-text pairs and bboxes...") | |
| original_data = result.cleaned_data | |
| deduplicated_data = self.remove_duplicate_category_text_pairs_and_bbox(original_data, case_id) | |
| # Update the cleaned_data in the CleanedData object | |
| result.cleaned_data = deduplicated_data | |
| data['predict_resized'] = result.cleaned_data | |
| datas.append(data) | |
| self.cleaned_results.append(result) | |
| except Exception as e: | |
| print(f"β Case {i+1} cleaning failed: {e}") | |
| traceback.print_exc() | |
| save_path = jsonl_path.replace('.jsonl', '_filtered.jsonl') | |
| with open(save_path, 'w') as w: | |
| for data in datas: | |
| w.write(json.dumps(data, ensure_ascii=False) + '\n') | |
| print(f"β Saved cleaned data to: {save_path}") | |
| return self.cleaned_results | |
| def save_cleaned_data(self, output_dir: str): | |
| """Saves the cleaned data""" | |
| print(f"\nπΎ Saving cleaned data to: {output_dir}") | |
| os.makedirs(output_dir, exist_ok=True) | |
| # 1. Save cleaned data for each case | |
| for result in self.cleaned_results: | |
| case_filename = f"cleaned_case_{result.case_id:02d}.json" | |
| case_filepath = os.path.join(output_dir, case_filename) | |
| # Save the cleaned data | |
| with open(case_filepath, 'w', encoding='utf-8') as f: | |
| json.dump(result.cleaned_data, f, ensure_ascii=False, indent=2) | |
| print(f" β Case {result.case_id}: {len(result.cleaned_data)} objects β {case_filename}") | |
| # 2. Save all cleaned data to a single file | |
| all_cleaned_data = [] | |
| for result in self.cleaned_results: | |
| all_cleaned_data.append({ | |
| 'case_id': result.case_id, | |
| 'original_type': result.original_type, | |
| 'original_length': result.original_length, | |
| 'cleaned_objects_count': len(result.cleaned_data), | |
| 'success': result.success, | |
| 'cleaning_operations': result.cleaning_operations, | |
| 'cleaned_data': result.cleaned_data | |
| }) | |
| all_data_filepath = os.path.join(output_dir, "all_cleaned_data.json") | |
| with open(all_data_filepath, 'w', encoding='utf-8') as f: | |
| json.dump(all_cleaned_data, f, ensure_ascii=False, indent=2) | |
| print(f" π All data: {len(all_cleaned_data)} cases β all_cleaned_data.json") | |
| # 3. Generate a cleaning report | |
| self._generate_cleaning_report(output_dir) | |
| def _generate_cleaning_report(self, output_dir: str): | |
| """Generates a cleaning report""" | |
| report = [] | |
| report.append("π Data Cleaning Report") | |
| report.append("=" * 60) | |
| import datetime | |
| report.append(f"Processing Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| report.append("") | |
| # Overall statistics | |
| total_cases = len(self.cleaned_results) | |
| successful_cases = sum(1 for r in self.cleaned_results if r.success) | |
| total_objects = sum(len(r.cleaned_data) for r in self.cleaned_results) | |
| report.append("π Overall Statistics:") | |
| report.append(f" Total Cases: {total_cases}") | |
| report.append(f" Successfully Cleaned: {successful_cases}") | |
| report.append(f" Success Rate: {successful_cases/total_cases*100:.1f}%") | |
| report.append(f" Total Recovered Objects: {total_objects}") | |
| report.append("") | |
| # Detailed statistics | |
| list_results = [r for r in self.cleaned_results if r.original_type == 'list'] | |
| str_results = [r for r in self.cleaned_results if r.original_type == 'str'] | |
| if list_results: | |
| report.append("π List Type Cleaning Statistics:") | |
| for r in list_results: | |
| ops = r.cleaning_operations | |
| report.append(f" Case {r.case_id}: {ops['original_count']} β {ops['final_count']} objects") | |
| if ops['bbox_fixes'] > 0: | |
| report.append(f" - bbox fixes: {ops['bbox_fixes']}") | |
| if ops['removed_items'] > 0: | |
| report.append(f" - invalid items removed: {ops['removed_items']}") | |
| report.append("") | |
| if str_results: | |
| report.append("π String Type Cleaning Statistics:") | |
| for r in str_results: | |
| ops = r.cleaning_operations | |
| status = "β " if r.success else "β" | |
| report.append(f" Case {r.case_id} {status}: {ops['original_length']:,} chars β {ops['final_objects']} objects") | |
| details = [] | |
| if ops['delimiter_fixes'] > 0: | |
| details.append(f"Delimiter fixes: {ops['delimiter_fixes']}") | |
| if ops['tail_truncated']: | |
| reduction = ops['original_length'] - ops['truncated_length'] | |
| details.append(f"Tail truncation: -{reduction:,} chars") | |
| if ops['duplicate_dicts_removed'] > 0: | |
| details.append(f"Duplicates removed: {ops['duplicate_dicts_removed']}") | |
| if details: | |
| report.append(f" - {', '.join(details)}") | |
| report.append("") | |
| # Note on data order | |
| report.append("π Data Order Guarantee:") | |
| report.append(" β The relative order of all dict objects is preserved during cleaning.") | |
| report.append(" β When deduplicating, the first occurrence of a dict is kept, and subsequent duplicates are removed.") | |
| report.append(" β The order of items in List-type data is fully preserved.") | |
| # Save the report | |
| report_filepath = os.path.join(output_dir, "cleaning_report.txt") | |
| with open(report_filepath, 'w', encoding='utf-8') as f: | |
| f.write('\n'.join(report)) | |
| print(f" π Cleaning report: cleaning_report.txt") | |
| # Also print to console | |
| print(f"\n{chr(10).join(report)}") | |
| def main(): | |
| """Main function""" | |
| # Create a data cleaner instance | |
| cleaner = OutputCleaner() | |
| # Input file | |
| jsonl_path = "output_with_failcase.jsonl" | |
| # Output directory | |
| output_dir = "output_with_failcase_cleaned" | |
| # Clean all data | |
| results = cleaner.clean_all_data(jsonl_path) | |
| # Save the cleaned data | |
| cleaner.save_cleaned_data(output_dir) | |
| print(f"\nπ Data cleaning complete!") | |
| print(f"π Cleaned data saved in: {output_dir}") | |
| if __name__ == "__main__": | |
| main() |