Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "en-core-web-sm", | |
| # "spacy", | |
| # ] | |
| # | |
| # [tool.uv.sources] | |
| # en-core-web-sm = { url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl" } | |
| # /// | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import csv | |
| import re | |
| import sys | |
| from pathlib import Path | |
| import spacy | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Tokenize text files and output chunked CSV" | |
| ) | |
| parser.add_argument("files", nargs="+", help="Input text file(s) to process") | |
| parser.add_argument( | |
| "-n", | |
| "--tokens", | |
| type=int, | |
| default=100, | |
| help="Number of tokens per chunk (default: 100)", | |
| ) | |
| parser.add_argument( | |
| "-l", | |
| "--label", | |
| type=str, | |
| help="Custom label for all chunks (defaults to filename)", | |
| ) | |
| parser.add_argument( | |
| "-o", | |
| "--output", | |
| type=str, | |
| default="output.csv", | |
| help="Output CSV filename (default: output.csv)", | |
| ) | |
| parser.add_argument( | |
| "-c", | |
| "--max-chunks", | |
| type=int, | |
| help="Maximum number of chunks to output (default: unlimited)", | |
| ) | |
| parser.add_argument( | |
| "--lemma", | |
| action="store_true", | |
| help="Use lemmatized forms of tokens instead of original text", | |
| ) | |
| args = parser.parse_args() | |
| # Load spaCy model | |
| nlp = spacy.load("en_core_web_sm") | |
| # Process files and collect chunks | |
| all_chunks = [] | |
| chunks_created = 0 | |
| for filename in args.files: | |
| if args.max_chunks and chunks_created >= args.max_chunks: | |
| break | |
| filepath = Path(filename) | |
| if not filepath.exists(): | |
| print(f"Warning: File '{filename}' not found, skipping...") | |
| continue | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| text = f.read() | |
| except Exception as e: | |
| print(f"Error reading '{filename}': {e}") | |
| continue | |
| # Split on one or more newlines | |
| segments = re.split(r"\n+", text) | |
| # Remove empty segments | |
| segments = [seg.strip() for seg in segments if seg.strip()] | |
| # Process segments through spaCy pipe | |
| all_tokens = [] | |
| for doc in nlp.pipe(segments): | |
| # Extract tokens from each processed segment | |
| if args.lemma: | |
| tokens = [token.lemma_ for token in doc] | |
| else: | |
| tokens = [token.text for token in doc] | |
| all_tokens.extend(tokens) | |
| # Determine label | |
| label = args.label if args.label else filepath.name | |
| # Create chunks of n tokens | |
| for i in range(0, len(all_tokens), args.tokens): | |
| if args.max_chunks and chunks_created >= args.max_chunks: | |
| break | |
| chunk = all_tokens[i : i + args.tokens] | |
| # Only include chunks with exactly n tokens | |
| if len(chunk) == args.tokens: | |
| chunk_text = " ".join(chunk) | |
| all_chunks.append({"text": chunk_text, "label": label}) | |
| chunks_created += 1 | |
| # Write to CSV | |
| if all_chunks: | |
| with open(args.output, "w", newline="", encoding="utf-8") as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=["text", "label"]) | |
| writer.writeheader() | |
| writer.writerows(all_chunks) | |
| print(f"Successfully wrote {len(all_chunks)} chunks to '{args.output}'") | |
| if args.lemma: | |
| print("Note: Tokens were lemmatized") | |
| else: | |
| print("No valid chunks to write.") | |
| if __name__ == "__main__": | |
| main() | |