Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import re | |
| from llama_index.core.node_parser import MarkdownNodeParser | |
| from llama_index.core.schema import Document, MetadataMode | |
| import textstat | |
| from markdownify import markdownify as md | |
| # --- Core Logic Classes --- | |
| class WebpageContentProcessor: | |
| """ | |
| Handles fetching, converting, and parsing webpage content into structured chunks. | |
| This class is responsible for the entire content processing pipeline. | |
| """ | |
| def __init__(self): | |
| pass | |
| def fetch_and_convert_to_markdown(self, url: str) -> str: | |
| """ | |
| Fetches HTML content, removes common boilerplate tags from the entire page, | |
| and then converts the remaining body content to Markdown using markdownify. | |
| """ | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| html_content = response.text | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Remove common boilerplate and non-content tags from the entire document | |
| tags_to_remove = ['nav', 'header', 'footer', 'aside', 'script', 'style', 'noscript', 'form'] | |
| for tag_name in tags_to_remove: | |
| for element in soup.find_all(tag_name): | |
| element.decompose() | |
| # Process the entire remaining body | |
| content_container = soup.find('body') | |
| if not content_container: | |
| return "Error: Could not find the <body> of the webpage." | |
| markdown_output = md(str(content_container)) | |
| # Post-processing to clean up the resulting Markdown | |
| markdown_output = re.sub(r'\n{3,}', '\n\n', markdown_output) | |
| markdown_output = re.sub(r'(\n\s*[\*\-]\s*\n)|(^\s*[\*\-]\s*$)', '\n', markdown_output, flags=re.MULTILINE) | |
| return markdown_output.strip() | |
| except requests.exceptions.Timeout: | |
| return "Error: The request timed out. The server is taking too long to respond." | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching the URL: {e}. Please check the URL and your connection." | |
| except Exception as e: | |
| return f"An unexpected error occurred during content processing: {e}" | |
| def parse_markdown_into_chunks(self, markdown_content: str) -> list: | |
| """ | |
| Parses Markdown content into logically separated chunks based on its structure. | |
| Uses MarkdownNodeParser to respect headers and sections. | |
| """ | |
| if not markdown_content or "Error" in markdown_content: | |
| return [] | |
| parser = MarkdownNodeParser(include_metadata=True) | |
| doc = Document(text=markdown_content) | |
| nodes = parser.get_nodes_from_documents([doc]) | |
| structured_chunks = [] | |
| for i, node in enumerate(nodes): | |
| content = node.get_content(metadata_mode=MetadataMode.NONE).strip() | |
| if not content: | |
| continue | |
| title_match = re.match(r"^(#+)\s*(.*)", content) | |
| if title_match: | |
| title = title_match.group(2).strip() | |
| content_text = content[len(title_match.group(0)):].strip() | |
| else: | |
| first_line = content.split('\n')[0].strip() | |
| title = (first_line[:75] + '...') if len(first_line) > 75 else first_line | |
| content_text = content | |
| if not title: | |
| title = f"[Chunk {i+1}]" | |
| structured_chunks.append({ | |
| "id": i, | |
| "title": title, | |
| "content": content_text | |
| }) | |
| return structured_chunks | |
| class ChunkManager: | |
| """ | |
| Manages the state of chunks, including their content, statistics, and targets. | |
| """ | |
| def __init__(self): | |
| self._chunks = [] | |
| self.target_flesch_min = 60 | |
| self.target_grade_max = 9 | |
| self.target_min_chunk_words = 40 | |
| self.target_max_chunk_words = 600 | |
| def set_chunks(self, chunks: list): | |
| self._chunks = [self._add_stats_to_chunk(chunk) for chunk in chunks] | |
| def get_chunks(self) -> list: | |
| return self._chunks | |
| def _add_stats_to_chunk(self, chunk: dict) -> dict: | |
| chunk['stats'] = self._calculate_chunk_stats(chunk['content']) | |
| return chunk | |
| def _calculate_chunk_stats(self, text: str) -> dict: | |
| """Calculates readability and other metrics for a text chunk.""" | |
| stats = {} | |
| try: | |
| stats['word_count'] = textstat.lexicon_count(text, removepunct=True) | |
| stats['flesch_reading_ease'] = textstat.flesch_reading_ease(text) | |
| stats['flesch_kincaid_grade'] = textstat.flesch_kincaid_grade(text) | |
| except (Exception, TypeError): | |
| stats.update({'word_count': 0, 'flesch_reading_ease': 0, 'flesch_kincaid_grade': 0}) | |
| return stats | |
| def format_chunk_stats(self, stats: dict) -> str: | |
| """Creates a formatted string of stats with color-coding based on targets.""" | |
| flesch_color = "green" if stats.get('flesch_reading_ease', 0) >= self.target_flesch_min else "red" | |
| grade_color = "green" if stats.get('flesch_kincaid_grade', 0) <= self.target_grade_max else "red" | |
| word_color = "green" if self.target_min_chunk_words <= stats.get('word_count', 0) <= self.target_max_chunk_words else "red" | |
| return ( | |
| f"**Word Count:** <span style='color:{word_color};'>{stats.get('word_count', 0)}</span> | " | |
| f"**Reading Ease:** <span style='color:{flesch_color};'>{stats.get('flesch_reading_ease', 0):.2f}</span> | " | |
| f"**Grade Level:** <span style='color:{grade_color};'>{stats.get('flesch_kincaid_grade', 0):.2f}</span>" | |
| ) | |
| def get_document_summary_stats(self) -> str: | |
| """Calculates and formats stats for the entire document.""" | |
| if not self._chunks: | |
| return "No document loaded." | |
| total_words = sum(c['stats']['word_count'] for c in self._chunks) | |
| if len(self._chunks) > 0: | |
| avg_ease = sum(c['stats']['flesch_reading_ease'] for c in self._chunks) / len(self._chunks) | |
| avg_grade = sum(c['stats']['flesch_kincaid_grade'] for c in self._chunks) / len(self._chunks) | |
| else: | |
| avg_ease = avg_grade = 0 | |
| return ( | |
| f"- **Total Chunks:** {len(self._chunks)}\n" | |
| f"- **Total Words:** {total_words}\n" | |
| f"- **Avg. Reading Ease:** {avg_ease:.2f}\n" | |
| f"- **Avg. Grade Level:** {avg_grade:.2f}" | |
| ) | |
| def get_chunk_by_id(self, chunk_id: int) -> dict | None: | |
| return next((c for c in self._chunks if c["id"] == chunk_id), None) | |
| def update_chunk_content(self, chunk_id: int, new_content: str): | |
| chunk = self.get_chunk_by_id(chunk_id) | |
| if chunk: | |
| chunk["content"] = new_content | |
| self._add_stats_to_chunk(chunk) | |
| if chunk["title"].startswith("["): | |
| first_line = new_content.split('\n')[0].strip() | |
| new_title = (first_line[:75] + '...') if len(first_line) > 75 else first_line | |
| if new_title: | |
| chunk["title"] = new_title | |
| def delete_chunk(self, chunk_id: int): | |
| self._chunks = [c for c in self._chunks if c["id"] != chunk_id] | |
| for i, chunk in enumerate(self._chunks): | |
| chunk['id'] = i | |
| def get_final_markdown(self) -> str: | |
| if not self._chunks: | |
| return "No content to display." | |
| final_doc_parts = [] | |
| for c in self._chunks: | |
| is_header = re.match(r"^(#+)\s*(.*)", c['title']) | |
| if not c['title'].startswith("[") and not is_header: | |
| final_doc_parts.append(f"## {c['title']}\n\n{c['content']}") | |
| else: | |
| final_doc_parts.append(c['content']) | |
| return "\n\n---\n\n".join(final_doc_parts) | |
| def set_targets(self, flesch_min: float, grade_max: float, min_words: int, max_words: int): | |
| self.target_flesch_min = flesch_min | |
| self.target_grade_max = grade_max | |
| self.target_min_chunk_words = min_words | |
| self.target_max_chunk_words = max_words | |
| self.set_chunks(self.get_chunks()) | |
| # --- Streamlit UI Application --- | |
| st.set_page_config(layout="wide", page_title="Webpage Content Editor") | |
| # --- MODIFIED: Custom CSS to increase sidebar width --- | |
| st.markdown( | |
| """ | |
| <style> | |
| [data-testid="stSidebar"] { | |
| width: 450px !important; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| def init_session_state(): | |
| if 'processor' not in st.session_state: | |
| st.session_state.processor = WebpageContentProcessor() | |
| if 'manager' not in st.session_state: | |
| st.session_state.manager = ChunkManager() | |
| if 'selected_chunk_id' not in st.session_state: | |
| st.session_state.selected_chunk_id = None | |
| if 'status_message' not in st.session_state: | |
| st.session_state.status_message = "" | |
| init_session_state() | |
| processor = st.session_state.processor | |
| manager = st.session_state.manager | |
| with st.sidebar: | |
| # --- MODIFIED: Removed the st.image line for the logo --- | |
| st.title("Settings & Overview") | |
| with st.expander("About this App & AI Writing Guidelines", expanded=True): | |
| st.info( | |
| """ | |
| This app helps you refine web content for AI synthesis by chunking it into logical, verifiable blocks. | |
| **Writing for AI Verifiability:** | |
| * **Structure with Headers:** Use H1, H2, H3 tags logically. | |
| * **Write for Clarity:** Use short, direct sentences. State facts explicitly. | |
| * **Create Verifiable Blocks:** Format content as definitions, Q&As, or step-by-step guides. | |
| * **Use the Editor's Metrics:** Aim for a **Reading Ease > 60** and a **Word Count** between 40-600 per chunk. The colors will guide you. | |
| """, icon="π‘" | |
| ) | |
| st.subheader("π Document Overview") | |
| st.markdown(manager.get_document_summary_stats(), unsafe_allow_html=True) | |
| st.subheader("π― Content Targets") | |
| with st.form("targets_form"): | |
| st.write("Set readability targets to guide your editing. Colors in the editor will reflect these targets.") | |
| c1, c2 = st.columns(2) | |
| f_min = c1.number_input("Min Flesch Reading Ease", value=float(manager.target_flesch_min), help="Measures readability. Higher scores mean the text is easier to read. Scores of 60-70 are considered plain English.") | |
| g_max = c2.number_input("Max Flesch-Kincaid Grade", value=float(manager.target_grade_max), help="Estimates the U.S. school grade level needed to understand the text. A score of 8.0 means an eighth grader can read it. Lower scores are easier to read.") | |
| w_min = c1.number_input("Min Chunk Words", value=int(manager.target_min_chunk_words)) | |
| w_max = c2.number_input("Max Chunk Words", value=int(manager.target_max_chunk_words)) | |
| if st.form_submit_button("Set New Targets", use_container_width=True): | |
| manager.set_targets(f_min, g_max, w_min, w_max) | |
| st.session_state.status_message = "Content targets have been updated." | |
| st.rerun() | |
| st.subheader("π Final Compiled Document") | |
| st.text_area("Final Markdown Output", manager.get_final_markdown(), height=300, key="final_markdown") | |
| # --- Main Page Layout --- | |
| st.title("π Content Chunk Editor") | |
| st.caption("Developed by [Emilija Gjorgjevska](https://www.linkedin.com/in/emilijagjorgjevska/) | Inspired by Andrea Volpini's [work on content chunking](https://wordlift.io/blog/en/googles-ai-mode-product-pages/).<br>A tool to fetch, chunk, and refine web content for AI synthesis. Best experienced on desktop.", unsafe_allow_html=True) | |
| url_input = st.text_input("Enter a webpage URL to start", key="url_input") | |
| with st.expander("β οΈ Important Information", expanded=False): | |
| st.warning( | |
| """ | |
| **Early Draft:** This is an early version of the application. You may encounter bugs or incomplete features. | |
| """, | |
| icon="π οΈ" | |
| ) | |
| st.warning( | |
| """ | |
| **Restrictive Bot Policy:** This tool fetches content using automated requests. If a target website blocks bots, the app may time out or fail to retrieve content. | |
| """, | |
| icon="π€" | |
| ) | |
| if st.button("Process URL", use_container_width=True, type="primary"): | |
| if url_input: | |
| with st.spinner("Fetching and chunking content..."): | |
| markdown = processor.fetch_and_convert_to_markdown(url_input) | |
| if "Error" in markdown: | |
| st.session_state.status_message = markdown | |
| manager.set_chunks([]) | |
| st.session_state.selected_chunk_id = None | |
| else: | |
| chunks = processor.parse_markdown_into_chunks(markdown) | |
| manager.set_chunks(chunks) | |
| if chunks: | |
| st.session_state.status_message = f"Successfully processed {len(chunks)} chunks." | |
| st.session_state.selected_chunk_id = chunks[0]['id'] | |
| else: | |
| st.session_state.status_message = "Could not extract any content chunks." | |
| st.session_state.selected_chunk_id = None | |
| st.rerun() | |
| if st.session_state.status_message: | |
| st.toast(st.session_state.status_message) | |
| st.session_state.status_message = "" | |
| chunks = manager.get_chunks() | |
| if not chunks: | |
| st.write("Process a URL to begin editing content chunks, or adjust settings in the sidebar.") | |
| with st.expander("Chunking Strategy Examples"): | |
| st.write("See how different websites structure their content, affecting chunking quality.") | |
| st.error("**Bad Chunking Example (Few Structural Headers)**") | |
| st.markdown(""" | |
| * [Wikipedia: Markdown](https://en.wikipedia.org/wiki/Markdown) | |
| """) | |
| st.success("**Good Chunking Examples (Clear, Hierarchical Headers)**") | |
| st.markdown(""" | |
| * [The Blog Starter](https://www.theblogstarter.com/) | |
| * [Google Safety Blog](https://blog.google/technology/safety-security/google-survey-digital-security-2025/) | |
| * [HubSpot: What is a Blog?](https://blog.hubspot.com/marketing/what-is-a-blog) | |
| """) | |
| else: | |
| chunk_ids = [c['id'] for c in chunks] | |
| if st.session_state.selected_chunk_id not in chunk_ids: | |
| st.session_state.selected_chunk_id = chunk_ids[0] if chunk_ids else None | |
| if st.session_state.selected_chunk_id is not None: | |
| chunk_options = {c['id']: c['title'] for c in chunks} | |
| selected_id = st.selectbox( | |
| "Select a chunk to edit", | |
| options=chunk_ids, | |
| format_func=lambda x: f"Chunk {x}: {chunk_options.get(x, 'N/A')}", | |
| index=chunk_ids.index(st.session_state.selected_chunk_id) | |
| ) | |
| if selected_id != st.session_state.selected_chunk_id: | |
| st.session_state.selected_chunk_id = selected_id | |
| st.rerun() | |
| selected_chunk = manager.get_chunk_by_id(st.session_state.selected_chunk_id) | |
| if selected_chunk: | |
| editor_col, preview_col = st.columns(2) | |
| with editor_col: | |
| st.markdown(f"**Editing: {selected_chunk['title']}**") | |
| st.markdown(manager.format_chunk_stats(selected_chunk['stats']), unsafe_allow_html=True) | |
| edited_content = st.text_area( | |
| "Chunk Content (Markdown)", | |
| value=selected_chunk['content'], | |
| height=400, | |
| key=f"editor_{selected_chunk['id']}" | |
| ) | |
| b_col1, b_col2, _ = st.columns([1, 1, 3]) | |
| if b_col1.button("Update Chunk", use_container_width=True, key=f"update_{selected_chunk['id']}"): | |
| manager.update_chunk_content(selected_chunk['id'], edited_content) | |
| st.session_state.status_message = "Chunk updated successfully!" | |
| st.rerun() | |
| if b_col2.button("Delete Chunk", use_container_width=True, key=f"delete_{selected_chunk['id']}"): | |
| manager.delete_chunk(selected_chunk['id']) | |
| st.session_state.status_message = "Chunk deleted." | |
| remaining_chunks = manager.get_chunks() | |
| st.session_state.selected_chunk_id = remaining_chunks[0]['id'] if remaining_chunks else None | |
| st.rerun() | |
| with preview_col: | |
| st.markdown("**Live Preview**") | |
| with st.container(height=525, border=True): | |
| st.markdown(edited_content, unsafe_allow_html=True) |