Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import glob | |
| RAW_DIR = "d:/NLP_KMH/Chatbot_NIHE_v2/data/raw/raw_selenium" | |
| PROCESSED_DIR = "d:/NLP_KMH/Chatbot_NIHE_v2/data/processed" | |
| def clean_text(text): | |
| """Clean unwanted artifacts from text.""" | |
| # Remove "CÁC BÀI VIẾT LIÊN QUAN" and everything after | |
| text = re.split(r'CÁC BÀI VIẾT LIÊN QUAN', text, flags=re.IGNORECASE)[0] | |
| # Remove "DANH MỤC..." footer section if present | |
| text = re.split(r'DANH MỤC\s+TIÊM CHỦNG', text, flags=re.IGNORECASE)[0] | |
| # Remove common noise patterns | |
| stop_phrases = ["TIN CÙNG CHUYÊN MỤC", "TIN KHÁC", "BÌNH LUẬN", "Tóm lược bài viết"] | |
| for phrase in stop_phrases: | |
| text = re.split(phrase, text, flags=re.IGNORECASE)[0] | |
| # Remove lines that are just numbers or noise | |
| lines = [line.strip() for line in text.split('\n')] | |
| cleaned_lines = [] | |
| for line in lines: | |
| if not line: continue | |
| # Skip lines that are just navigation or noise | |
| if line.lower() in ["trang chủ", "giới thiệu", "tin tức", "liên hệ"]: continue | |
| if line.startswith("Copyright") or "All rights reserved" in line: continue | |
| cleaned_lines.append(line) | |
| return '\n'.join(cleaned_lines) | |
| def is_garbage_content(text): | |
| """Check if content is irrelevant (procurement, catalog, etc.).""" | |
| text_lower = text.lower() | |
| garbage_keywords = [ | |
| "mời báo giá", "yêu cầu báo giá", "chào giá", "đấu thầu", "gói thầu", "mua sắm hàng hóa", | |
| "kết quả lựa chọn nhà thầu", "danh mục", "catalogue", "sitemap", | |
| "404 not found", "access denied", "đang cập nhật", "danh sách", "file đính kèm" | |
| ] | |
| if any(keyword in text_lower for keyword in garbage_keywords): | |
| return True | |
| return False | |
| def process_file(filepath): | |
| """Parse raw file and save to processed JSON.""" | |
| filename = os.path.basename(filepath) | |
| try: | |
| with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| # Parse Header | |
| header_match = re.search(r'Title: (.*)\nURL: (.*)\nLength: .*\n\n=+', content, re.DOTALL) | |
| if header_match: | |
| title = header_match.group(1).strip() | |
| url = header_match.group(2).strip() | |
| body = content.split('=' * 80)[-1].strip() | |
| else: | |
| # Fallback for manual files or non-conforming files | |
| title = filename.replace('.txt', '').replace('_', ' ') | |
| url = f"file://{filename}" | |
| body = content | |
| # Clean Body | |
| cleaned_body = clean_text(body) | |
| if len(cleaned_body) < 150: # Increased threshold | |
| print(f"Skipping short file: {filename}") | |
| return # Skip very short files | |
| if is_garbage_content(cleaned_body): | |
| print(f"Skipping garbage content (procurement/catalog): {filename}") | |
| return | |
| data = { | |
| "source_file": filename, | |
| "title": title, | |
| "url": url, | |
| "text": cleaned_body | |
| } | |
| # Save to processed | |
| out_name = filename.replace('.txt', '.json') | |
| with open(os.path.join(PROCESSED_DIR, out_name), 'w', encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print(f"Error processing {filename}: {e}") | |
| def main(): | |
| if not os.path.exists(PROCESSED_DIR): | |
| os.makedirs(PROCESSED_DIR) | |
| files = glob.glob(os.path.join(RAW_DIR, "*.txt")) | |
| print(f"Found {len(files)} files.") | |
| for filepath in files: | |
| process_file(filepath) | |
| print("Cleaning complete.") | |
| if __name__ == "__main__": | |
| main() | |