Spaces:
Sleeping
Sleeping
| import io | |
| import re | |
| import zipfile | |
| import tempfile | |
| from datetime import datetime | |
| from typing import List, Dict | |
| import os | |
| import streamlit as st | |
| import pandas as pd | |
| from pypdf import PdfReader | |
| APP_TITLE = "📑 SAP KB Artikel Chunker (Bulk)" | |
| # ----------------------------- | |
| # SAP KB Parsing | |
| # ----------------------------- | |
| class SAPKBArticle: | |
| """Repräsentiert einen SAP Knowledge Base Artikel""" | |
| def __init__(self, text: str): | |
| self.raw_text = text | |
| self.note_id = "" | |
| self.title = "" | |
| self.component = "" | |
| self.version = "" | |
| self.release_date = "" | |
| self.sections = {} | |
| self._parse() | |
| def _parse(self): | |
| """Extrahiert Metadaten und Sektionen aus dem Artikel""" | |
| lines = self.raw_text.split('\n') | |
| # Extrahiere Note ID und Title aus der ersten Zeile | |
| first_line = lines[0] if lines else "" | |
| match = re.match(r'(\d+)\s*-\s*(.+)', first_line) | |
| if match: | |
| self.note_id = match.group(1) | |
| self.title = match.group(2).strip() | |
| # Extrahiere Component, Version, Release Date | |
| for line in lines[:10]: | |
| if 'Component:' in line: | |
| comp_match = re.search(r'Component:\s*([^,]+)', line) | |
| if comp_match: | |
| self.component = comp_match.group(1).strip() | |
| if 'Version:' in line: | |
| ver_match = re.search(r'Version:\s*(\d+)', line) | |
| if ver_match: | |
| self.version = ver_match.group(1) | |
| if 'Released On:' in line: | |
| date_match = re.search(r'Released On:\s*(.+)', line) | |
| if date_match: | |
| self.release_date = date_match.group(1).strip() | |
| # Extrahiere Sektionen | |
| self._extract_sections() | |
| def _extract_sections(self): | |
| """Extrahiert die verschiedenen Sektionen des Artikels""" | |
| text = self.raw_text | |
| # Symptom | |
| symptom_match = re.search(r'Symptom\s*\n(.*?)(?=\n\s*Environment|\n\s*Reproducing|$)', text, re.DOTALL | re.IGNORECASE) | |
| if symptom_match: | |
| self.sections['symptom'] = self._clean_text(symptom_match.group(1)) | |
| # Environment | |
| env_match = re.search(r'Environment\s*\n(.*?)(?=\n\s*Reproducing|\n\s*Cause|$)', text, re.DOTALL | re.IGNORECASE) | |
| if env_match: | |
| self.sections['environment'] = self._clean_text(env_match.group(1)) | |
| # Reproducing the Issue | |
| repro_match = re.search(r'Reproducing the Issue\s*\n(.*?)(?=\n\s*Cause|\n\s*Resolution|$)', text, re.DOTALL | re.IGNORECASE) | |
| if repro_match: | |
| self.sections['reproducing'] = self._clean_text(repro_match.group(1)) | |
| # Cause | |
| cause_match = re.search(r'Cause\s*\n(.*?)(?=\n\s*Resolution|\n\s*Keywords|$)', text, re.DOTALL | re.IGNORECASE) | |
| if cause_match: | |
| self.sections['cause'] = self._clean_text(cause_match.group(1)) | |
| # Resolution | |
| resolution_match = re.search(r'Resolution\s*\n(.*?)(?=\n\s*Keywords|\n\s*Attributes|$)', text, re.DOTALL | re.IGNORECASE) | |
| if resolution_match: | |
| self.sections['resolution'] = self._clean_text(resolution_match.group(1)) | |
| # Products/Versions | |
| products_match = re.search(r'Products\s*\n.*?\n(.*?)$', text, re.DOTALL | re.IGNORECASE) | |
| if products_match: | |
| self.sections['products'] = self._clean_text(products_match.group(1)) | |
| # Other Components | |
| other_comp = [] | |
| for match in re.finditer(r'Other Components\s+(.+)', text): | |
| other_comp.append(match.group(1).strip()) | |
| if other_comp: | |
| self.sections['other_components'] = ' | '.join(other_comp) | |
| def _clean_text(self, text: str) -> str: | |
| """Bereinigt und komprimiert Text""" | |
| text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) | |
| text = re.sub(r' +', ' ', text) | |
| return text.strip() | |
| def create_chunks(self) -> List[Dict[str, str]]: | |
| """Erstellt semantische Chunks im gewünschten Format""" | |
| chunks = [] | |
| # 1. METADATA_SUMMARY | |
| metadata_content = f"This note documents an issue in SAP Business One" | |
| if self.title: | |
| metadata_content += f" regarding {self.title.lower()}" | |
| metadata_content += f". ID: {self.note_id}" | |
| if self.component: | |
| metadata_content += f", Component: {self.component}" | |
| if self.version: | |
| metadata_content += f", Version: {self.version}" | |
| if self.release_date: | |
| metadata_content += f", Released: {self.release_date}" | |
| metadata_content += "." | |
| chunks.append({ | |
| 'section': 'METADATA_SUMMARY', | |
| 'content': metadata_content | |
| }) | |
| # 2. SYMPTOM_SCENARIO | |
| if 'symptom' in self.sections or 'reproducing' in self.sections: | |
| symptom_content = "" | |
| if 'symptom' in self.sections: | |
| symptom_content = self.sections['symptom'] | |
| if 'reproducing' in self.sections: | |
| if symptom_content: | |
| symptom_content += " " | |
| symptom_content += "Scenario: " + self.sections['reproducing'] | |
| chunks.append({ | |
| 'section': 'SYMPTOM_SCENARIO', | |
| 'content': symptom_content | |
| }) | |
| # 3. CAUSE_PREREQUISITES | |
| if 'cause' in self.sections: | |
| chunks.append({ | |
| 'section': 'CAUSE_PREREQUISITES', | |
| 'content': self.sections['cause'] | |
| }) | |
| # 4. SOLUTION_PATCH | |
| if 'resolution' in self.sections: | |
| chunks.append({ | |
| 'section': 'SOLUTION_PATCH', | |
| 'content': self.sections['resolution'] | |
| }) | |
| # 5. AFFECTED_VERSIONS | |
| if 'products' in self.sections or 'environment' in self.sections or 'other_components' in self.sections: | |
| affected_content = "This issue affects " | |
| if 'products' in self.sections: | |
| affected_content += self.sections['products'] | |
| elif 'environment' in self.sections: | |
| affected_content += self.sections['environment'] | |
| if 'other_components' in self.sections: | |
| affected_content += f". Relevant components include {self.sections['other_components']}" | |
| affected_content += "." | |
| chunks.append({ | |
| 'section': 'AFFECTED_VERSIONS', | |
| 'content': affected_content | |
| }) | |
| return chunks | |
| def format_chunk(self, chunk: Dict[str, str]) -> str: | |
| """Formatiert einen Chunk im gewünschten Output-Format""" | |
| prefix = f"[Note {self.note_id} - {self.title}]" | |
| return f"{prefix} - Sektion [{chunk['section']}]: {chunk['content']}" | |
| def split_long_chunks(self, chunks: List[Dict[str, str]], max_length: int = 1000) -> List[Dict[str, str]]: | |
| """Teilt zu lange Chunks in kleinere Stücke auf (max_length Zeichen)""" | |
| result = [] | |
| for chunk in chunks: | |
| formatted = self.format_chunk(chunk) | |
| # Wenn Chunk kurz genug ist, direkt übernehmen | |
| if len(formatted) <= max_length: | |
| result.append(chunk) | |
| continue | |
| # Chunk ist zu lang - aufteilen | |
| section = chunk['section'] | |
| content = chunk['content'] | |
| # Berechne Platz für Content (abzüglich Präfix und Formatting) | |
| prefix = f"[Note {self.note_id} - {self.title}]" | |
| section_label = f" - Sektion [{section}_{{}}]: " | |
| overhead = len(prefix) + len(section_label.format("1")) | |
| available_space = max_length - overhead | |
| # Teile Content in Teile | |
| parts = [] | |
| words = content.split() | |
| current_part = [] | |
| current_length = 0 | |
| for word in words: | |
| word_length = len(word) + 1 | |
| if current_length + word_length > available_space and current_part: | |
| parts.append(' '.join(current_part)) | |
| current_part = [word] | |
| current_length = word_length | |
| else: | |
| current_part.append(word) | |
| current_length += word_length | |
| if current_part: | |
| parts.append(' '.join(current_part)) | |
| # Erstelle neue Chunks für jeden Teil | |
| for i, part in enumerate(parts, start=1): | |
| new_section = f"{section}_{i}" if len(parts) > 1 else section | |
| result.append({ | |
| 'section': new_section, | |
| 'content': part | |
| }) | |
| return result | |
| def extract_text_from_pdf(file_data: bytes) -> str: | |
| """Extrahiert Text aus PDF""" | |
| reader = PdfReader(io.BytesIO(file_data)) | |
| pages_text = [] | |
| for page in reader.pages: | |
| pages_text.append(page.extract_text() or "") | |
| return "\n".join(pages_text) | |
| def create_zip_for_pdf(article, chunks_data, source_name): | |
| """Erstellt eine ZIP-Datei für ein einzelnes PDF""" | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf: | |
| for i, chunk_data in enumerate(chunks_data, start=1): | |
| formatted = article.format_chunk(chunk_data) | |
| # Dateiname für den Chunk | |
| fname = f"chunk_{i}_{chunk_data['section']}.txt" | |
| # Dateiinhalt mit Quelle | |
| content = f"Quelle: Note:{source_name}\n" | |
| content += f"SAP KB Note: {article.note_id}\n" | |
| content += f"Sektion: {chunk_data['section']}\n\n" | |
| content += formatted | |
| # Füge Datei zum ZIP hinzu | |
| zf.writestr(fname, content) | |
| zip_buffer.seek(0) | |
| return zip_buffer.getvalue() | |
| # ----------------------------- | |
| # Streamlit UI | |
| # ----------------------------- | |
| st.set_page_config(page_title=APP_TITLE, layout="wide", page_icon="📑") | |
| st.title(APP_TITLE) | |
| st.markdown(""" | |
| Diese App verarbeitet **mehrere SAP Knowledge Base Artikel** (PDFs) gleichzeitig und erstellt für jedes PDF eine separate ZIP-Datei. | |
| **Unterstützte Sektionen:** | |
| - 📋 `METADATA_SUMMARY` - Grundinformationen | |
| - 🔍 `SYMPTOM_SCENARIO` - Problem & Reproduktion | |
| - 💡 `CAUSE_PREREQUISITES` - Ursache | |
| - ✅ `SOLUTION_PATCH` - Lösung | |
| - 🏷️ `AFFECTED_VERSIONS` - Betroffene Versionen | |
| **Features:** | |
| - ✅ Bulk-Verarbeitung mehrerer PDFs | |
| - ✅ Separate ZIP pro PDF | |
| - ✅ Automatische Chunk-Aufteilung (max. 1000 Zeichen) | |
| - ✅ Quelle: `Note:{Dateiname}` (ohne .pdf) | |
| """) | |
| st.divider() | |
| # File Upload | |
| st.subheader("📄 SAP KB Artikel hochladen") | |
| uploaded_files = st.file_uploader( | |
| "PDF-Dateien hochladen (einzeln oder mehrere)", | |
| type=["pdf"], | |
| accept_multiple_files=True, | |
| help="Ein oder mehrere SAP Knowledge Base Artikel als PDF" | |
| ) | |
| if uploaded_files: | |
| st.info(f"📦 {len(uploaded_files)} Datei(en) ausgewählt") | |
| # Zeige Liste der hochgeladenen Dateien | |
| with st.expander("📋 Hochgeladene Dateien"): | |
| for idx, file in enumerate(uploaded_files, start=1): | |
| source_name = file.name.replace('.pdf', '') | |
| st.write(f"{idx}. **{file.name}** → Quelle: `Note:{source_name}`") | |
| st.divider() | |
| # Process Button | |
| process_btn = st.button("🚀 Alle Artikel verarbeiten", type="primary", use_container_width=True) | |
| # ----------------------------- | |
| # Verarbeitung | |
| # ----------------------------- | |
| if process_btn: | |
| if not uploaded_files: | |
| st.error("❌ Bitte laden Sie mindestens eine PDF-Datei hoch.") | |
| st.stop() | |
| # Container für Ergebnisse | |
| results = [] | |
| errors = [] | |
| # Progress | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| # Verarbeite jede PDF | |
| for idx, uploaded_file in enumerate(uploaded_files): | |
| try: | |
| # Extrahiere Dateiname ohne .pdf | |
| source_name = uploaded_file.name.replace('.pdf', '') | |
| status_text.text(f"🔄 Verarbeite {idx + 1}/{len(uploaded_files)}: {uploaded_file.name}...") | |
| # Text extrahieren | |
| file_data = uploaded_file.read() | |
| text_content = extract_text_from_pdf(file_data) | |
| if not text_content.strip(): | |
| errors.append(f"⚠️ {uploaded_file.name}: Kein Text gefunden") | |
| continue | |
| # Artikel verarbeiten | |
| article = SAPKBArticle(text_content) | |
| chunks_data = article.create_chunks() | |
| # Chunks aufteilen (max 1000 Zeichen) | |
| chunks_data = article.split_long_chunks(chunks_data, max_length=1000) | |
| if not chunks_data: | |
| errors.append(f"⚠️ {uploaded_file.name}: Keine Chunks erzeugt") | |
| continue | |
| # ZIP erstellen | |
| zip_data = create_zip_for_pdf(article, chunks_data, source_name) | |
| # Speichere Ergebnis | |
| results.append({ | |
| 'filename': uploaded_file.name, | |
| 'source_name': source_name, | |
| 'note_id': article.note_id, | |
| 'title': article.title, | |
| 'chunks_count': len(chunks_data), | |
| 'zip_data': zip_data, | |
| 'article': article, | |
| 'chunks': chunks_data | |
| }) | |
| progress_bar.progress((idx + 1) / len(uploaded_files)) | |
| except Exception as e: | |
| errors.append(f"❌ {uploaded_file.name}: {str(e)}") | |
| continue | |
| progress_bar.empty() | |
| status_text.empty() | |
| # Zeige Ergebnisse | |
| if not results: | |
| st.error("❌ Keine PDFs konnten verarbeitet werden.") | |
| if errors: | |
| st.error("\n\n".join(errors)) | |
| st.stop() | |
| st.success(f"✅ **{len(results)} von {len(uploaded_files)} PDF(s) erfolgreich verarbeitet**") | |
| if errors: | |
| with st.expander("⚠️ Fehler anzeigen"): | |
| for error in errors: | |
| st.warning(error) | |
| st.divider() | |
| # Zeige Übersicht | |
| st.subheader("📊 Verarbeitungs-Übersicht") | |
| overview_data = [] | |
| for result in results: | |
| overview_data.append({ | |
| 'Datei': result['filename'], | |
| 'Quelle': f"Note:{result['source_name']}", | |
| 'Note ID': result['note_id'], | |
| 'Chunks': result['chunks_count'] | |
| }) | |
| df_overview = pd.DataFrame(overview_data) | |
| st.dataframe(df_overview, use_container_width=True) | |
| st.divider() | |
| # Download-Buttons für jede ZIP | |
| st.subheader("💾 Downloads (ZIP pro PDF)") | |
| for result in results: | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| st.write(f"**{result['filename']}**") | |
| st.caption(f"Note {result['note_id']} • {result['chunks_count']} Chunks • Quelle: Note:{result['source_name']}") | |
| with col2: | |
| zip_filename = f"{result['source_name']}_chunks.zip" | |
| st.download_button( | |
| "📥 ZIP", | |
| data=result['zip_data'], | |
| file_name=zip_filename, | |
| mime="application/zip", | |
| key=f"download_{result['source_name']}", | |
| use_container_width=True | |
| ) | |
| st.divider() | |
| # Optional: Master-ZIP mit allen ZIPs | |
| if len(results) > 1: | |
| st.subheader("📦 Alle ZIPs als Master-ZIP") | |
| master_zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(master_zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as master_zf: | |
| for result in results: | |
| zip_filename = f"{result['source_name']}_chunks.zip" | |
| master_zf.writestr(zip_filename, result['zip_data']) | |
| master_zip_buffer.seek(0) | |
| st.download_button( | |
| "📥 Master-ZIP herunterladen (alle ZIPs)", | |
| data=master_zip_buffer.getvalue(), | |
| file_name="sap_kb_all_chunks.zip", | |
| mime="application/zip", | |
| use_container_width=True | |
| ) | |
| # Footer | |
| st.divider() | |
| st.markdown(""" | |
| <div style='text-align: center; color: #666; font-size: 0.9em;'> | |
| SAP KB Artikel Chunker (Bulk) v2.0 | Entwickelt für SAP Business One Knowledge Base Artikel | |
| </div> | |
| """, unsafe_allow_html=True) | |