| """ |
| Gradio frontend for the text processing pipeline. |
| |
| Provides drag-and-drop file upload, URL fetching, search across |
| Project Gutenberg / MIT Classics / Internet Archive, and corpus |
| management with HuggingFace push. |
| |
| Usage: |
| python app.py # Launch on http://localhost:7860 |
| python app.py --share # Launch with public Gradio link |
| """ |
|
|
| import argparse |
| import logging |
| import os |
| import shutil |
| import sys |
| import tempfile |
| from pathlib import Path |
|
|
| |
| SCRIPT_DIR = Path(__file__).resolve().parent |
| sys.path.insert(0, str(SCRIPT_DIR)) |
|
|
| from pipeline import Pipeline |
|
|
| logger = logging.getLogger("app") |
|
|
| |
| |
| |
|
|
| _pipeline: Pipeline | None = None |
|
|
|
|
| def get_pipeline() -> Pipeline: |
| global _pipeline |
| if _pipeline is None: |
| _pipeline = Pipeline() |
| return _pipeline |
|
|
|
|
| |
| |
| |
|
|
| def process_uploaded_files(files) -> str: |
| """Process uploaded files through the pipeline.""" |
| if not files: |
| return "No files uploaded." |
|
|
| pipeline = get_pipeline() |
| results = [] |
|
|
| for file_obj in files: |
| src = Path(file_obj.name) |
| dest = pipeline.inbox / src.name |
|
|
| |
| shutil.copy2(str(src), str(dest)) |
| results.append(f"Copied {src.name} to inbox/") |
|
|
| |
| new_chunks = pipeline.process_inbox() |
|
|
| |
| train_n, val_n = pipeline.rebuild_output() |
|
|
| results.append(f"\nProcessed: {new_chunks} new chunks") |
| results.append(f"Total corpus: {train_n} train / {val_n} val") |
|
|
| return "\n".join(results) |
|
|
|
|
| def fetch_url(url: str) -> str: |
| """Download text from a URL and process it.""" |
| if not url.strip(): |
| return "Please enter a URL." |
|
|
| import requests |
|
|
| pipeline = get_pipeline() |
| url = url.strip() |
|
|
| try: |
| resp = requests.get(url, timeout=30, headers={ |
| "User-Agent": "PhilosophyCorpus-Pipeline/1.0", |
| }) |
| resp.raise_for_status() |
|
|
| |
| fname = url.split("/")[-1] |
| if not fname.endswith(".txt"): |
| fname = fname.replace(".", "_") + ".txt" |
|
|
| |
| dest = pipeline.inbox / fname |
| dest.write_text(resp.text, encoding="utf-8") |
|
|
| |
| new_chunks = pipeline.process_inbox() |
| train_n, val_n = pipeline.rebuild_output() |
|
|
| return ( |
| f"Downloaded: {fname} ({len(resp.text):,} chars)\n" |
| f"Processed: {new_chunks} new chunks\n" |
| f"Total corpus: {train_n} train / {val_n} val" |
| ) |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| |
| |
| |
|
|
| def search_archive(query: str, subject: str) -> list[list]: |
| """Search Internet Archive and return results as table rows.""" |
| if not query.strip(): |
| return [] |
|
|
| from sources.ia_search import search_ia |
|
|
| subject_key = subject.lower() if subject != "All" else None |
| results = search_ia(query, subject=subject_key, rows=20) |
|
|
| rows = [] |
| for r in results: |
| creator = r["creator"] |
| if isinstance(creator, list): |
| creator = ", ".join(creator) |
| rows.append([ |
| r["identifier"], |
| r["title"], |
| creator, |
| str(r["date"])[:10] if r["date"] else "", |
| str(r["downloads"]), |
| ]) |
|
|
| return rows |
|
|
|
|
| def add_ia_text(identifier: str) -> str: |
| """Download an IA text and process it through the pipeline.""" |
| if not identifier.strip(): |
| return "Please enter an Internet Archive identifier." |
|
|
| from sources.ia_search import get_ia_text |
|
|
| pipeline = get_pipeline() |
|
|
| try: |
| text = get_ia_text(identifier.strip()) |
|
|
| fname = f"ia_{identifier.strip()}.txt" |
| dest = pipeline.inbox / fname |
| dest.write_text(text, encoding="utf-8") |
|
|
| new_chunks = pipeline.process_inbox() |
| train_n, val_n = pipeline.rebuild_output() |
|
|
| return ( |
| f"Downloaded: {identifier} ({len(text):,} chars)\n" |
| f"Processed: {new_chunks} new chunks\n" |
| f"Total corpus: {train_n} train / {val_n} val" |
| ) |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| |
| |
| |
|
|
| def search_gutenberg_ui(query: str, topic: str) -> list[list]: |
| """Search Gutenberg via Gutendex and return results as table rows.""" |
| if not query.strip(): |
| return [] |
|
|
| from sources.gutenberg_search import search_gutenberg |
|
|
| topic_key = topic.lower() if topic != "All" else None |
| results = search_gutenberg(query, topic=topic_key, rows=20) |
|
|
| rows = [] |
| for r in results: |
| rows.append([ |
| str(r["id"]), |
| r["title"], |
| r["author"], |
| r["subjects"][:60], |
| str(r["download_count"]), |
| ]) |
|
|
| return rows |
|
|
|
|
| def add_gutenberg_text(book_id: str) -> str: |
| """Download a Gutenberg text and process it through the pipeline.""" |
| if not book_id.strip(): |
| return "Please enter a Gutenberg book ID." |
|
|
| from sources.gutenberg_search import get_gutenberg_text |
|
|
| pipeline = get_pipeline() |
|
|
| try: |
| bid = int(book_id.strip()) |
| text = get_gutenberg_text(bid) |
|
|
| fname = f"gutenberg_{bid}.txt" |
| dest = pipeline.inbox / fname |
| dest.write_text(text, encoding="utf-8") |
|
|
| new_chunks = pipeline.process_inbox() |
| train_n, val_n = pipeline.rebuild_output() |
|
|
| return ( |
| f"Downloaded: Gutenberg #{bid} ({len(text):,} chars)\n" |
| f"Processed: {new_chunks} new chunks\n" |
| f"Total corpus: {train_n} train / {val_n} val" |
| ) |
| except ValueError as e: |
| return f"Error: Invalid book ID '{book_id}' — enter a number (e.g. 1497)" |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| |
| |
| |
|
|
| def search_mit_ui(query: str, author: str) -> list[list]: |
| """Search MIT Classics catalog and return results as table rows.""" |
| from sources.mit_classics_search import search_mit_classics |
|
|
| author_key = author if author != "All" else "" |
| results = search_mit_classics(query=query.strip(), author=author_key) |
|
|
| rows = [] |
| for r in results: |
| rows.append([ |
| r["author"], |
| r["title"], |
| r["work_path"], |
| ]) |
|
|
| return rows |
|
|
|
|
| def get_mit_authors_list() -> list[str]: |
| """Get author list for the dropdown (lazy-loaded).""" |
| try: |
| from sources.mit_classics_search import get_authors |
| return ["All"] + get_authors() |
| except Exception: |
| return ["All"] |
|
|
|
|
| def add_mit_text(work_path: str) -> str: |
| """Download an MIT Classics text and process it through the pipeline.""" |
| if not work_path.strip(): |
| return "Please enter a work path (e.g. /Plato/republic.html)." |
|
|
| from sources.mit_classics_search import get_mit_text |
|
|
| pipeline = get_pipeline() |
|
|
| try: |
| text = get_mit_text(work_path.strip()) |
|
|
| |
| parts = work_path.strip("/").replace(".html", "").split("/") |
| fname = "mit_" + "_".join(parts).lower() + ".txt" |
| dest = pipeline.inbox / fname |
| dest.write_text(text, encoding="utf-8") |
|
|
| new_chunks = pipeline.process_inbox() |
| train_n, val_n = pipeline.rebuild_output() |
|
|
| return ( |
| f"Downloaded: {work_path} ({len(text):,} chars)\n" |
| f"Processed: {new_chunks} new chunks\n" |
| f"Total corpus: {train_n} train / {val_n} val" |
| ) |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| |
| |
| |
|
|
| def get_corpus_stats() -> str: |
| """Get current corpus statistics.""" |
| pipeline = get_pipeline() |
| parsed_files = sorted(pipeline.parsed.glob("*.txt")) |
|
|
| if not parsed_files: |
| return "No parsed files yet. Add texts to get started." |
|
|
| lines_out = ["File Chunks Chars", "-" * 60] |
| total_chunks = 0 |
| total_chars = 0 |
|
|
| for pf in parsed_files: |
| file_lines = [l for l in pf.read_text(encoding="utf-8").splitlines() if l.strip()] |
| chars = sum(len(l) for l in file_lines) |
| total_chunks += len(file_lines) |
| total_chars += chars |
| lines_out.append(f"{pf.name:<40} {len(file_lines):>8} {chars:>10}") |
|
|
| lines_out.append("-" * 60) |
| lines_out.append(f"{'TOTAL':<40} {total_chunks:>8} {total_chars:>10}") |
|
|
| if total_chunks > 0: |
| avg = total_chars / total_chunks |
| lines_out.append(f"\nAverage chunk length: {avg:.0f} chars") |
|
|
| |
| train_path = pipeline.output / "train.txt" |
| val_path = pipeline.output / "val.txt" |
| if train_path.exists() and val_path.exists(): |
| train_n = len([l for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()]) |
| val_n = len([l for l in val_path.read_text(encoding="utf-8").splitlines() if l.strip()]) |
| lines_out.append(f"\nOutput split: {train_n} train / {val_n} val") |
|
|
| |
| text = train_path.read_text(encoding="utf-8") |
| vocab = sorted(set(text) - {"\n"}) |
| lines_out.append(f"Vocabulary: {len(vocab)} chars -> {''.join(vocab)}") |
|
|
| return "\n".join(lines_out) |
|
|
|
|
| def get_sample_chunks() -> str: |
| """Get sample chunks from the training data.""" |
| pipeline = get_pipeline() |
| train_path = pipeline.output / "train.txt" |
|
|
| if not train_path.exists(): |
| return "No training data yet. Process some texts first." |
|
|
| lines = [l.strip() for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()] |
|
|
| if not lines: |
| return "Training file is empty." |
|
|
| import random |
| samples = random.sample(lines, min(10, len(lines))) |
| return "\n\n---\n\n".join(f"[{i+1}] {s}" for i, s in enumerate(samples)) |
|
|
|
|
| def rebuild_dataset() -> str: |
| """Rebuild train/val split from existing parsed chunks.""" |
| pipeline = get_pipeline() |
| train_n, val_n = pipeline.rebuild_output() |
| return f"Rebuilt: {train_n} train / {val_n} val chunks" |
|
|
|
|
| def push_to_hf(repo_id: str) -> str: |
| """Push dataset to HuggingFace Hub.""" |
| if not repo_id.strip(): |
| return "Please enter a HuggingFace repo ID (e.g. username/philosophy-corpus)." |
|
|
| pipeline = get_pipeline() |
|
|
| try: |
| url = pipeline.push_to_hub(repo_id=repo_id.strip()) |
| return f"Dataset pushed successfully!\n{url}" |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| |
| |
| |
|
|
| def build_ui(): |
| import gradio as gr |
|
|
| with gr.Blocks(title="Philosophy Corpus Pipeline", theme=gr.themes.Soft()) as app: |
| gr.Markdown("# Philosophy Corpus Pipeline\nBuild training data for JuliaGPT") |
|
|
| with gr.Tab("Add Texts"): |
| gr.Markdown("### Upload Files") |
| file_upload = gr.File( |
| label="Drag and drop .txt, .epub, or .zip files", |
| file_count="multiple", |
| file_types=[".txt", ".epub", ".zip"], |
| ) |
| upload_btn = gr.Button("Process Uploaded Files", variant="primary") |
| upload_output = gr.Textbox(label="Result", lines=6) |
| upload_btn.click(process_uploaded_files, inputs=[file_upload], outputs=[upload_output]) |
|
|
| gr.Markdown("### Fetch from URL") |
| url_input = gr.Textbox( |
| label="Text URL (Gutenberg, MIT Classics, Internet Archive, or any .txt URL)", |
| placeholder="https://www.gutenberg.org/cache/epub/21076/pg21076.txt", |
| ) |
| fetch_btn = gr.Button("Fetch and Process") |
| fetch_output = gr.Textbox(label="Result", lines=4) |
| fetch_btn.click(fetch_url, inputs=[url_input], outputs=[fetch_output]) |
|
|
| with gr.Tab("Search Gutenberg"): |
| gr.Markdown("### Search Project Gutenberg for public domain texts") |
| with gr.Row(): |
| gut_query = gr.Textbox(label="Search Query", placeholder="aristotle philosophy") |
| gut_topic = gr.Dropdown( |
| choices=["All", "Philosophy", "Ethics", "Politics", |
| "Metaphysics", "Science", "Mathematics", |
| "Classical", "Religion", "History"], |
| value="Philosophy", |
| label="Topic Filter", |
| ) |
| gut_search_btn = gr.Button("Search", variant="primary") |
| gut_results = gr.Dataframe( |
| headers=["ID", "Title", "Author", "Subjects", "Downloads"], |
| label="Search Results", |
| interactive=False, |
| ) |
| gut_search_btn.click( |
| search_gutenberg_ui, |
| inputs=[gut_query, gut_topic], |
| outputs=[gut_results], |
| ) |
|
|
| gr.Markdown("### Add a text to the corpus") |
| gut_id_input = gr.Textbox( |
| label="Gutenberg Book ID", |
| placeholder="Paste a book ID from the search results above (e.g. 1497)", |
| ) |
| gut_add_btn = gr.Button("Download and Process") |
| gut_add_output = gr.Textbox(label="Result", lines=4) |
| gut_add_btn.click(add_gutenberg_text, inputs=[gut_id_input], outputs=[gut_add_output]) |
|
|
| with gr.Tab("Browse MIT Classics"): |
| gr.Markdown("### Search the MIT Internet Classics Archive (441 works by 59 authors)") |
| with gr.Row(): |
| mit_query = gr.Textbox(label="Search Query", placeholder="republic") |
| mit_author = gr.Dropdown( |
| choices=get_mit_authors_list(), |
| value="All", |
| label="Author Filter", |
| ) |
| mit_search_btn = gr.Button("Search", variant="primary") |
| mit_results = gr.Dataframe( |
| headers=["Author", "Title", "Work Path"], |
| label="Search Results", |
| interactive=False, |
| ) |
| mit_search_btn.click( |
| search_mit_ui, |
| inputs=[mit_query, mit_author], |
| outputs=[mit_results], |
| ) |
|
|
| gr.Markdown("### Add a text to the corpus") |
| mit_path_input = gr.Textbox( |
| label="Work Path", |
| placeholder="Paste a work path from the results above (e.g. /Plato/republic.html)", |
| ) |
| mit_add_btn = gr.Button("Download and Process") |
| mit_add_output = gr.Textbox(label="Result", lines=4) |
| mit_add_btn.click(add_mit_text, inputs=[mit_path_input], outputs=[mit_add_output]) |
|
|
| with gr.Tab("Search Internet Archive"): |
| gr.Markdown("### Search the Internet Archive for classical texts") |
| with gr.Row(): |
| search_input = gr.Textbox(label="Search Query", placeholder="aristotle philosophy") |
| subject_dropdown = gr.Dropdown( |
| choices=["All", "Philosophy", "Mathematics", "Rhetoric", |
| "Logic", "Ethics", "Metaphysics", "Politics", "Classical"], |
| value="Philosophy", |
| label="Subject Filter", |
| ) |
| search_btn = gr.Button("Search", variant="primary") |
| search_results = gr.Dataframe( |
| headers=["Identifier", "Title", "Author", "Date", "Downloads"], |
| label="Search Results", |
| interactive=False, |
| ) |
| search_btn.click( |
| search_archive, |
| inputs=[search_input, subject_dropdown], |
| outputs=[search_results], |
| ) |
|
|
| gr.Markdown("### Add a text to the corpus") |
| ia_id_input = gr.Textbox( |
| label="Internet Archive Identifier", |
| placeholder="Paste an identifier from the search results above", |
| ) |
| add_btn = gr.Button("Download and Process") |
| add_output = gr.Textbox(label="Result", lines=4) |
| add_btn.click(add_ia_text, inputs=[ia_id_input], outputs=[add_output]) |
|
|
| with gr.Tab("Corpus"): |
| gr.Markdown("### Corpus Statistics") |
| stats_output = gr.Textbox(label="Statistics", lines=15, value=get_corpus_stats) |
| refresh_btn = gr.Button("Refresh Stats") |
| refresh_btn.click(get_corpus_stats, outputs=[stats_output]) |
|
|
| gr.Markdown("### Sample Chunks") |
| sample_output = gr.Textbox(label="Random samples from training data", lines=15) |
| sample_btn = gr.Button("Show Samples") |
| sample_btn.click(get_sample_chunks, outputs=[sample_output]) |
|
|
| gr.Markdown("### Actions") |
| with gr.Row(): |
| rebuild_btn = gr.Button("Rebuild Dataset") |
| rebuild_output = gr.Textbox(label="Result", lines=2) |
| rebuild_btn.click(rebuild_dataset, outputs=[rebuild_output]) |
|
|
| with gr.Row(): |
| hf_repo_input = gr.Textbox( |
| label="HuggingFace Repo ID", |
| placeholder="username/philosophy-corpus", |
| ) |
| push_btn = gr.Button("Push to HuggingFace", variant="primary") |
| push_output = gr.Textbox(label="Result", lines=2) |
| push_btn.click(push_to_hf, inputs=[hf_repo_input], outputs=[push_output]) |
|
|
| return app |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Philosophy Corpus Pipeline UI") |
| parser.add_argument("--share", action="store_true", help="Create a public Gradio link") |
| parser.add_argument("--port", type=int, default=7860, help="Port to run on") |
| args = parser.parse_args() |
|
|
| app = build_ui() |
| app.queue() |
| app.launch(share=args.share, server_name="0.0.0.0", server_port=args.port) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|