Spaces:
Sleeping
Sleeping
| from bs4 import BeautifulSoup | |
| import re | |
| def html_table_to_markdown(html_table_str: str) -> str: | |
| """ | |
| Convert a single HTML table string to markdown pipe table format. | |
| """ | |
| soup = BeautifulSoup(html_table_str, "html.parser") | |
| table = soup.find("table") | |
| if not table: | |
| return "" | |
| rows = table.find_all("tr") | |
| markdown_rows = [] | |
| # Header row (use <th> if present, else first <tr> cells) | |
| header_cells = rows[0].find_all(["th", "td"]) | |
| headers = [cell.get_text(strip=True).replace("\n", " ").replace("|", "\\|") for cell in header_cells] | |
| markdown_rows.append("| " + " | ".join(headers) + " |") | |
| markdown_rows.append("| " + " | ".join(["---"] * len(headers)) + " |") | |
| # Process remaining rows | |
| for row in rows[1:]: | |
| cells = row.find_all(["td", "th"]) | |
| # Skip empty rows or rows with empty cells (including rows with colspan but empty) | |
| if not cells or all(cell.get_text(strip=True) == "" for cell in cells): | |
| continue | |
| row_text = [cell.get_text(strip=True).replace("\n", " ").replace("|", "\\|") for cell in cells] | |
| # Pad if row length is smaller than header length (can happen if colspan) | |
| if len(row_text) < len(headers): | |
| row_text += [""] * (len(headers) - len(row_text)) | |
| markdown_rows.append("| " + " | ".join(row_text) + " |") | |
| return "\n".join(markdown_rows) | |
| def replace_html_tables_in_content(content: str) -> str: | |
| """ | |
| Replace all HTML tables in the input content string with markdown tables, | |
| and return the converted content string. | |
| Args: | |
| content (str): Text that may contain zero or more HTML tables. | |
| Returns: | |
| str: The content with HTML tables replaced by markdown tables. | |
| """ | |
| # Regex to find all <table> ... </table> blocks, including multiline, case-insensitive | |
| table_pattern = re.compile(r"(<table.*?>.*?</table>)", re.DOTALL | re.IGNORECASE) | |
| def replacer(match): | |
| html_table = match.group(1) | |
| md_table = html_table_to_markdown(html_table) | |
| # Surround markdown table with newlines for clarity | |
| return "\n\n" + md_table + "\n\n" | |
| converted_content = table_pattern.sub(replacer, content) | |
| return converted_content | |
| def deduplicate_footers(content: str) -> str: | |
| """ | |
| Replace repeated footer blocks so that: | |
| - The first full footer block is kept as-is. | |
| - Subsequent footer blocks are replaced with only the page number comment. | |
| """ | |
| # Regex pattern to match the footer block: | |
| # The block contains: | |
| # <!-- PageFooter=... --> (optional, one or more) | |
| # <!-- PageNumber=... --> (one or more) | |
| # <!-- PageBreak --> (optional) | |
| # This assumes the block ordering is consistent. | |
| footer_block_pattern = re.compile( | |
| r"((?:<!-- PageFooter=\".*?\" -->\s*)*(<!-- PageNumber=\".*?\" -->\s*)(?:<!-- PageFooter=\".*?\" -->\s*)*(<!-- PageBreak -->\s*)?)", | |
| re.DOTALL | re.IGNORECASE | |
| ) | |
| seen_first = False | |
| def footer_replacer(match): | |
| nonlocal seen_first | |
| full_block = match.group(1) | |
| page_number_comment = match.group(2).strip() # The <!-- PageNumber="..." --> | |
| if not seen_first: | |
| seen_first = True | |
| # Keep entire block as-is | |
| return full_block | |
| else: | |
| # Replace entire block with only the page number comment plus newline for formatting | |
| return page_number_comment + "\n" | |
| # Substitute all matches with footer_replacer | |
| new_content = footer_block_pattern.sub(footer_replacer, content) | |
| return new_content | |
| def process_content(content: str) -> str: | |
| # First, replace HTML tables with markdown | |
| content_with_md_tables = replace_html_tables_in_content(content) | |
| # Then, deduplicate footers & page footers with overlap logic | |
| #content_cleaned = deduplicate_footers(content_with_md_tables) | |
| return content_with_md_tables | |
| if __name__ == "__main__": | |
| from pprint import pprint | |
| input_file = "content.txt" | |
| from chunking import split_content_into_batches | |
| from docling_chunker_fixed import split_to_docling_chunks | |
| from embedding_gen import embed_docling_chunks_azure | |
| from qdrant_setup import batch_upsert_docling_chunks_to_qdrant | |
| # Read content from file | |
| with open(input_file, "r", encoding="utf-8") as f: | |
| original_content = f.read() | |
| # Convert HTML tables to markdown tables | |
| # converted_content = process_content(original_content) | |
| content= split_to_docling_chunks(original_content) | |
| # print(content) | |
| # print(title) | |
| # Print the converted content (or you could write it to a file if needed) | |
| # from openai import AzureOpenAI | |
| # from config import azure_open_ai_key, azure_open_ai_url | |
| # from embedding_gen import embed_text_batches_azure | |
| # client = AzureOpenAI( | |
| # api_key=azure_open_ai_key, | |
| # azure_endpoint=azure_open_ai_url, | |
| # api_version="2023-05-15" | |
| # ) | |
| import time | |
| t1=time.time() | |
| c=embed_docling_chunks_azure(docling_chunks=content) | |
| t2=time.time() | |
| print("time taken",t2-t1) | |
| t1=time.time() | |
| batch_upsert_docling_chunks_to_qdrant(c['chunks_data']) | |
| t2=time.time() | |
| print("time taken to upsert", t2-t1) |