| | import gradio as gr |
| | import pandas as pd |
| | import fitz |
| | import os |
| | import re |
| | from huggingface_hub import HfApi |
| | from huggingface_hub.utils import HfHubHTTPError |
| | import time |
| |
|
| | def sanitize_title(title, max_length=100): |
| | """ |
| | Sanitize the paper title to be safe for use as a filename. |
| | Removes non-alphanumeric characters (except underscores and hyphens) |
| | and truncates to max_length characters. |
| | """ |
| | sanitized = re.sub(r'[^\w\s-]', '', title).strip() |
| | sanitized = re.sub(r'[-\s]+', '_', sanitized) |
| | if len(sanitized) > max_length: |
| | sanitized = sanitized[:max_length] |
| | return sanitized |
| |
|
| | def extract_full_paper_with_labels(pdf_path, progress=None): |
| | print(f"π Starting PDF Processing: {os.path.basename(pdf_path)}") |
| | doc = fitz.open(pdf_path) |
| | content = "" |
| |
|
| | |
| | title = "" |
| | authors = "" |
| | year = "" |
| | doi = "" |
| | abstract = "" |
| | footnotes = "" |
| | references = "" |
| | sources = "" |
| | total_pages = len(doc) |
| | max_iterations = total_pages * 2 |
| | iteration_count = 0 |
| |
|
| | |
| | doi_pattern = r"\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b" |
| | year_pattern = r'\b(19|20)\d{2}\b' |
| | code_pattern = r"(def\s+\w+\s*\(|class\s+\w+|import\s+\w+|for\s+\w+\s+in|if\s+\w+|while\s+\w+|try:|except|{|\}|;)" |
| | reference_keywords = ['reference', 'bibliography', 'sources'] |
| | financial_keywords = ['p/e', 'volatility', 'market cap', 'roi', 'sharpe', 'drawdown'] |
| |
|
| | for page_num, page in enumerate(doc): |
| | iteration_count += 1 |
| | if iteration_count > max_iterations: |
| | raise Exception("β οΈ PDF processing exceeded iteration limit. Possible malformed PDF.") |
| |
|
| | if progress is not None: |
| | progress((page_num + 1) / total_pages, desc=f"Processing Page {page_num + 1}/{total_pages}") |
| |
|
| | blocks = page.get_text("dict")["blocks"] |
| | for block in blocks: |
| | if "lines" in block: |
| | text = "" |
| | max_font_size = 0 |
| | for line in block["lines"]: |
| | for span in line["spans"]: |
| | text += span["text"] + " " |
| | if span["size"] > max_font_size: |
| | max_font_size = span["size"] |
| |
|
| | text = text.strip() |
| |
|
| | |
| | if page_num == 0 and max_font_size > 15 and not title: |
| | title = text |
| | content += f"<TITLE>{title}</TITLE>\n" |
| |
|
| | |
| | elif re.search(r'author|by', text, re.IGNORECASE) and not authors: |
| | authors = text |
| | content += f"<AUTHORS>{authors}</AUTHORS>\n" |
| |
|
| | |
| | elif re.search(year_pattern, text) and not year: |
| | year = re.search(year_pattern, text).group(0) |
| | content += f"<YEAR>{year}</YEAR>\n" |
| |
|
| | |
| | elif re.search(doi_pattern, text) and not doi: |
| | doi = re.search(doi_pattern, text).group(0) |
| | content += f"<DOI>{doi}</DOI>\n" |
| |
|
| | |
| | elif "abstract" in text.lower() and not abstract: |
| | abstract = text |
| | content += f"<ABSTRACT>{abstract}</ABSTRACT>\n" |
| |
|
| | |
| | elif max_font_size < 10: |
| | footnotes += text + " " |
| |
|
| | |
| | elif any(keyword in text.lower() for keyword in reference_keywords): |
| | references += text + " " |
| |
|
| | |
| | elif re.search(r"table\s*\d+", text, re.IGNORECASE): |
| | content += f"<TABLE>{text}</TABLE>\n" |
| |
|
| | |
| | elif re.search(r"figure\s*\d+", text, re.IGNORECASE): |
| | content += f"<FIGURE>{text}</FIGURE>\n" |
| |
|
| | |
| | elif re.search(r"=|β|β|Β±|Γ|Ο|ΞΌ|Ο", text): |
| | content += f"<EQUATION>{text}</EQUATION>\n" |
| |
|
| | |
| | elif re.search(code_pattern, text) and len(text.split()) <= 50: |
| | content += f"<CODE>{text}</CODE>\n" |
| |
|
| | |
| | elif any(fin_kw in text.lower() for fin_kw in financial_keywords): |
| | content += f"<FINANCIAL_METRIC>{text}</FINANCIAL_METRIC>\n" |
| |
|
| | |
| | else: |
| | content += f"<PARAGRAPH>{text}</PARAGRAPH>\n" |
| |
|
| | |
| | if footnotes: |
| | content += f"<FOOTNOTE>{footnotes.strip()}</FOOTNOTE>\n" |
| | if references: |
| | content += f"<REFERENCE>{references.strip()}</REFERENCE>\n" |
| |
|
| | print(f"β
Finished Processing PDF: {os.path.basename(pdf_path)}") |
| | return { |
| | "filename": os.path.basename(pdf_path), |
| | "title": title, |
| | "content": content |
| | } |
| |
|
| | def upload_with_progress(file_path, repo_id, token, progress): |
| | """ |
| | Upload file to Hugging Face Dataset using upload_file() API method. |
| | """ |
| | print(f"π€ Starting upload of Parquet: {file_path}") |
| | file_size = os.path.getsize(file_path) |
| |
|
| | api = HfApi() |
| |
|
| | try: |
| | |
| | api.upload_file( |
| | path_or_fileobj=file_path, |
| | path_in_repo=os.path.basename(file_path), |
| | repo_id=repo_id, |
| | repo_type="dataset", |
| | token=token |
| | ) |
| |
|
| | if progress is not None: |
| | progress(1, desc="β
Upload Complete") |
| |
|
| | print(f"β
Successfully uploaded to {repo_id}") |
| | return f"β
Successfully uploaded to {repo_id}" |
| |
|
| | except HfHubHTTPError as e: |
| | print(f"β Upload failed: {e}") |
| | return f"β Upload failed: {str(e)}" |
| | except Exception as e: |
| | print(f"β Unexpected error: {e}") |
| | return f"β Unexpected error: {str(e)}" |
| |
|
| | def pdf_to_parquet_and_upload(pdf_files, hf_token, dataset_repo_id, action_choice, progress=gr.Progress()): |
| | all_data = [] |
| |
|
| | total_files = len(pdf_files) |
| | print("π Starting PDF to Parquet Conversion Process") |
| |
|
| | for idx, pdf_file in enumerate(pdf_files): |
| | if progress is not None: |
| | progress(idx / total_files, desc=f"Processing File {idx + 1}/{total_files}") |
| |
|
| | |
| | extracted_data = extract_full_paper_with_labels(pdf_file.name, progress=progress) |
| | all_data.append(extracted_data) |
| |
|
| | print("π‘ Converting Processed Data to Parquet") |
| | |
| | df = pd.DataFrame(all_data) |
| | |
| | |
| | if len(all_data) == 1: |
| | paper_title = all_data[0].get("title", "").strip() |
| | if paper_title: |
| | safe_title = sanitize_title(paper_title) |
| | parquet_file = f"{safe_title}.parquet" |
| | else: |
| | parquet_file = 'fully_labeled_papers.parquet' |
| | else: |
| | |
| | parquet_file = f"fully_labeled_papers_{time.strftime('%Y%m%d_%H%M%S')}.parquet" |
| |
|
| | try: |
| | df.to_parquet(parquet_file, engine='pyarrow', index=False) |
| | print("β
Parquet Conversion Completed") |
| | except Exception as e: |
| | print(f"β Parquet Conversion Failed: {str(e)}") |
| | return None, f"β Parquet Conversion Failed: {str(e)}" |
| |
|
| | upload_message = "Skipped Upload" |
| |
|
| | |
| | if action_choice in ["Upload to Hugging Face", "Both"]: |
| | try: |
| | upload_message = upload_with_progress(parquet_file, dataset_repo_id, hf_token, progress) |
| | except Exception as e: |
| | print(f"β Upload Failed: {str(e)}") |
| | upload_message = f"β Upload failed: {str(e)}" |
| |
|
| | print("π Process Completed") |
| | return parquet_file, upload_message |
| |
|
| | |
| | def reset_files_fn(): |
| | |
| | return None, None |
| |
|
| | with gr.Blocks() as demo: |
| | gr.Markdown( |
| | """ |
| | # PDF to Parquet Converter with Full Labeling |
| | |
| | **Clear All Inputs:** The button below (labeled "Clear All Inputs") will reset every field, including your API key and dataset repo ID. |
| | **Reset Files Only:** Use this button if you want to clear the PDF file uploads and the generated Parquet file, while keeping your credentials intact. |
| | """ |
| | ) |
| | |
| | with gr.Row(): |
| | pdf_input = gr.File(file_types=[".pdf"], file_count="multiple", label="Upload PDFs (Drag & Drop or Search)") |
| | with gr.Row(): |
| | hf_token = gr.Textbox(label="Hugging Face API Token", type="password", placeholder="Enter your Hugging Face API token") |
| | dataset_repo = gr.Textbox(label="Your Dataset Repo ID (e.g., username/research-dataset)", placeholder="username/research-dataset") |
| | with gr.Row(): |
| | action_radio = gr.Radio(["Download Locally", "Upload to Hugging Face", "Both"], label="Action", value="Download Locally") |
| | |
| | with gr.Row(): |
| | convert_button = gr.Button("Convert PDF to Parquet") |
| | reset_files_button = gr.Button("Reset Files Only") |
| | clear_all_button = gr.Button("Clear All Inputs") |
| | |
| | with gr.Row(): |
| | output_file = gr.File(label="Download Parquet File") |
| | status_text = gr.Textbox(label="Status") |
| | |
| | convert_button.click( |
| | fn=pdf_to_parquet_and_upload, |
| | inputs=[pdf_input, hf_token, dataset_repo, action_radio], |
| | outputs=[output_file, status_text] |
| | ) |
| | |
| | reset_files_button.click( |
| | fn=reset_files_fn, |
| | inputs=None, |
| | outputs=[pdf_input, output_file] |
| | ) |
| | |
| | |
| | def clear_all_fn(): |
| | return None, None, None, "Download Locally" |
| | |
| | clear_all_button.click( |
| | fn=clear_all_fn, |
| | inputs=None, |
| | outputs=[pdf_input, hf_token, dataset_repo, action_radio] |
| | ) |
| |
|
| | demo.launch() |
| |
|