Image_generation / clean.py
manasdhir's picture
minor changes
5d1cbd9
from bs4 import BeautifulSoup
import re
def html_table_to_markdown(html_table_str: str) -> str:
"""
Convert a single HTML table string to markdown pipe table format.
"""
soup = BeautifulSoup(html_table_str, "html.parser")
table = soup.find("table")
if not table:
return ""
rows = table.find_all("tr")
markdown_rows = []
# Header row (use <th> if present, else first <tr> cells)
header_cells = rows[0].find_all(["th", "td"])
headers = [cell.get_text(strip=True).replace("\n", " ").replace("|", "\\|") for cell in header_cells]
markdown_rows.append("| " + " | ".join(headers) + " |")
markdown_rows.append("| " + " | ".join(["---"] * len(headers)) + " |")
# Process remaining rows
for row in rows[1:]:
cells = row.find_all(["td", "th"])
# Skip empty rows or rows with empty cells (including rows with colspan but empty)
if not cells or all(cell.get_text(strip=True) == "" for cell in cells):
continue
row_text = [cell.get_text(strip=True).replace("\n", " ").replace("|", "\\|") for cell in cells]
# Pad if row length is smaller than header length (can happen if colspan)
if len(row_text) < len(headers):
row_text += [""] * (len(headers) - len(row_text))
markdown_rows.append("| " + " | ".join(row_text) + " |")
return "\n".join(markdown_rows)
def replace_html_tables_in_content(content: str) -> str:
"""
Replace all HTML tables in the input content string with markdown tables,
and return the converted content string.
Args:
content (str): Text that may contain zero or more HTML tables.
Returns:
str: The content with HTML tables replaced by markdown tables.
"""
# Regex to find all <table> ... </table> blocks, including multiline, case-insensitive
table_pattern = re.compile(r"(<table.*?>.*?</table>)", re.DOTALL | re.IGNORECASE)
def replacer(match):
html_table = match.group(1)
md_table = html_table_to_markdown(html_table)
# Surround markdown table with newlines for clarity
return "\n\n" + md_table + "\n\n"
converted_content = table_pattern.sub(replacer, content)
return converted_content
def deduplicate_footers(content: str) -> str:
"""
Replace repeated footer blocks so that:
- The first full footer block is kept as-is.
- Subsequent footer blocks are replaced with only the page number comment.
"""
# Regex pattern to match the footer block:
# The block contains:
# <!-- PageFooter=... --> (optional, one or more)
# <!-- PageNumber=... --> (one or more)
# <!-- PageBreak --> (optional)
# This assumes the block ordering is consistent.
footer_block_pattern = re.compile(
r"((?:<!-- PageFooter=\".*?\" -->\s*)*(<!-- PageNumber=\".*?\" -->\s*)(?:<!-- PageFooter=\".*?\" -->\s*)*(<!-- PageBreak -->\s*)?)",
re.DOTALL | re.IGNORECASE
)
seen_first = False
def footer_replacer(match):
nonlocal seen_first
full_block = match.group(1)
page_number_comment = match.group(2).strip() # The <!-- PageNumber="..." -->
if not seen_first:
seen_first = True
# Keep entire block as-is
return full_block
else:
# Replace entire block with only the page number comment plus newline for formatting
return page_number_comment + "\n"
# Substitute all matches with footer_replacer
new_content = footer_block_pattern.sub(footer_replacer, content)
return new_content
def process_content(content: str) -> str:
# First, replace HTML tables with markdown
content_with_md_tables = replace_html_tables_in_content(content)
# Then, deduplicate footers & page footers with overlap logic
#content_cleaned = deduplicate_footers(content_with_md_tables)
return content_with_md_tables
if __name__ == "__main__":
from pprint import pprint
input_file = "content.txt"
from chunking import split_content_into_batches
from docling_chunker_fixed import split_to_docling_chunks
from embedding_gen import embed_docling_chunks_azure
from qdrant_setup import batch_upsert_docling_chunks_to_qdrant
# Read content from file
with open(input_file, "r", encoding="utf-8") as f:
original_content = f.read()
# Convert HTML tables to markdown tables
# converted_content = process_content(original_content)
content= split_to_docling_chunks(original_content)
# print(content)
# print(title)
# Print the converted content (or you could write it to a file if needed)
# from openai import AzureOpenAI
# from config import azure_open_ai_key, azure_open_ai_url
# from embedding_gen import embed_text_batches_azure
# client = AzureOpenAI(
# api_key=azure_open_ai_key,
# azure_endpoint=azure_open_ai_url,
# api_version="2023-05-15"
# )
import time
t1=time.time()
c=embed_docling_chunks_azure(docling_chunks=content)
t2=time.time()
print("time taken",t2-t1)
t1=time.time()
batch_upsert_docling_chunks_to_qdrant(c['chunks_data'])
t2=time.time()
print("time taken to upsert", t2-t1)