Spaces:
Sleeping
Sleeping
File size: 4,049 Bytes
068aa4e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
import os
import json
from pathlib import Path
from datetime import datetime
class DocumentProcessor:
def __init__(self, input_folder, output_folder):
self.input_folder = input_folder
self.output_folder = output_folder
self.documents = []
# Extract text from markdown or text files
def extract_text(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
return file.read()
except Exception as e:
print(f" β Error reading {file_path}: {e}")
return None
# Clean the text
def clean_text(self, text):
# Remove extra whitespace
text = ' '.join(text.split())
# Remove weird symbols
text = text.replace('\x00', '')
text = text.replace('\n\n\n', '\n')
return text.strip()
# Process all documents
def process_all_documents(self):
doc_id = 1
# Walk through all folders
for root, dirs, files in os.walk(self.input_folder):
for filename in files:
# Only process markdown and text files
if filename.endswith(('.md', '.txt')):
filepath = os.path.join(root, filename)
print(f"Processing: {filename}")
# Extract text
text = self.extract_text(filepath)
if not text:
print(f" β Failed to extract: {filename}")
continue
# Clean the text
clean_text = self.clean_text(text)
# Skip if too short
if len(clean_text) < 50:
print(f" β οΈ Too short, skipping")
continue
# Create document object
document = {
"doc_id": f"doc_{doc_id}",
"title": filename.replace('.md', '').replace('.txt', ''),
"content": clean_text,
"word_count": len(clean_text.split()),
"character_count": len(clean_text),
"processed_date": datetime.now().isoformat(),
"source_file": filename,
"source_path": filepath
}
self.documents.append(document)
doc_id += 1
print(f" β
Processed ({len(clean_text)} chars)")
return self.documents
# Save to JSON file
def save_documents(self):
output_path = os.path.join(self.output_folder, "processed_documents.json")
# Create output folder if doesn't exist
os.makedirs(self.output_folder, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.documents, f, ensure_ascii=False, indent=2)
print(f"\nβ
Saved {len(self.documents)} documents to {output_path}")
# Print statistics
total_words = sum(doc['word_count'] for doc in self.documents)
total_chars = sum(doc['character_count'] for doc in self.documents)
print(f"\nπ STATISTICS:")
print(f" Total documents: {len(self.documents)}")
print(f" Total words: {total_words:,}")
print(f" Total characters: {total_chars:,}")
print(f" Average words per document: {total_words // len(self.documents) if self.documents else 0}")
# Use it
if __name__ == "__main__":
processor = DocumentProcessor(
input_folder="data/raw",
output_folder="data/processed"
)
print("π Starting document processing...\n")
processor.process_all_documents()
processor.save_documents()
print("\nβ
Document processing complete!")
|