import io import re import zipfile import tempfile from datetime import datetime from typing import List, Dict import os import streamlit as st import pandas as pd from pypdf import PdfReader APP_TITLE = "📑 SAP KB Artikel Chunker (Bulk)" # ----------------------------- # SAP KB Parsing # ----------------------------- class SAPKBArticle: """Repräsentiert einen SAP Knowledge Base Artikel""" def __init__(self, text: str): self.raw_text = text self.note_id = "" self.title = "" self.component = "" self.version = "" self.release_date = "" self.sections = {} self._parse() def _parse(self): """Extrahiert Metadaten und Sektionen aus dem Artikel""" lines = self.raw_text.split('\n') # Extrahiere Note ID und Title aus der ersten Zeile first_line = lines[0] if lines else "" match = re.match(r'(\d+)\s*-\s*(.+)', first_line) if match: self.note_id = match.group(1) self.title = match.group(2).strip() # Extrahiere Component, Version, Release Date for line in lines[:10]: if 'Component:' in line: comp_match = re.search(r'Component:\s*([^,]+)', line) if comp_match: self.component = comp_match.group(1).strip() if 'Version:' in line: ver_match = re.search(r'Version:\s*(\d+)', line) if ver_match: self.version = ver_match.group(1) if 'Released On:' in line: date_match = re.search(r'Released On:\s*(.+)', line) if date_match: self.release_date = date_match.group(1).strip() # Extrahiere Sektionen self._extract_sections() def _extract_sections(self): """Extrahiert die verschiedenen Sektionen des Artikels""" text = self.raw_text # Symptom symptom_match = re.search(r'Symptom\s*\n(.*?)(?=\n\s*Environment|\n\s*Reproducing|$)', text, re.DOTALL | re.IGNORECASE) if symptom_match: self.sections['symptom'] = self._clean_text(symptom_match.group(1)) # Environment env_match = re.search(r'Environment\s*\n(.*?)(?=\n\s*Reproducing|\n\s*Cause|$)', text, re.DOTALL | re.IGNORECASE) if env_match: self.sections['environment'] = self._clean_text(env_match.group(1)) # Reproducing the Issue repro_match = re.search(r'Reproducing the Issue\s*\n(.*?)(?=\n\s*Cause|\n\s*Resolution|$)', text, re.DOTALL | re.IGNORECASE) if repro_match: self.sections['reproducing'] = self._clean_text(repro_match.group(1)) # Cause cause_match = re.search(r'Cause\s*\n(.*?)(?=\n\s*Resolution|\n\s*Keywords|$)', text, re.DOTALL | re.IGNORECASE) if cause_match: self.sections['cause'] = self._clean_text(cause_match.group(1)) # Resolution resolution_match = re.search(r'Resolution\s*\n(.*?)(?=\n\s*Keywords|\n\s*Attributes|$)', text, re.DOTALL | re.IGNORECASE) if resolution_match: self.sections['resolution'] = self._clean_text(resolution_match.group(1)) # Products/Versions products_match = re.search(r'Products\s*\n.*?\n(.*?)$', text, re.DOTALL | re.IGNORECASE) if products_match: self.sections['products'] = self._clean_text(products_match.group(1)) # Other Components other_comp = [] for match in re.finditer(r'Other Components\s+(.+)', text): other_comp.append(match.group(1).strip()) if other_comp: self.sections['other_components'] = ' | '.join(other_comp) def _clean_text(self, text: str) -> str: """Bereinigt und komprimiert Text""" text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) text = re.sub(r' +', ' ', text) return text.strip() def create_chunks(self) -> List[Dict[str, str]]: """Erstellt semantische Chunks im gewünschten Format""" chunks = [] # 1. METADATA_SUMMARY metadata_content = f"This note documents an issue in SAP Business One" if self.title: metadata_content += f" regarding {self.title.lower()}" metadata_content += f". ID: {self.note_id}" if self.component: metadata_content += f", Component: {self.component}" if self.version: metadata_content += f", Version: {self.version}" if self.release_date: metadata_content += f", Released: {self.release_date}" metadata_content += "." chunks.append({ 'section': 'METADATA_SUMMARY', 'content': metadata_content }) # 2. SYMPTOM_SCENARIO if 'symptom' in self.sections or 'reproducing' in self.sections: symptom_content = "" if 'symptom' in self.sections: symptom_content = self.sections['symptom'] if 'reproducing' in self.sections: if symptom_content: symptom_content += " " symptom_content += "Scenario: " + self.sections['reproducing'] chunks.append({ 'section': 'SYMPTOM_SCENARIO', 'content': symptom_content }) # 3. CAUSE_PREREQUISITES if 'cause' in self.sections: chunks.append({ 'section': 'CAUSE_PREREQUISITES', 'content': self.sections['cause'] }) # 4. SOLUTION_PATCH if 'resolution' in self.sections: chunks.append({ 'section': 'SOLUTION_PATCH', 'content': self.sections['resolution'] }) # 5. AFFECTED_VERSIONS if 'products' in self.sections or 'environment' in self.sections or 'other_components' in self.sections: affected_content = "This issue affects " if 'products' in self.sections: affected_content += self.sections['products'] elif 'environment' in self.sections: affected_content += self.sections['environment'] if 'other_components' in self.sections: affected_content += f". Relevant components include {self.sections['other_components']}" affected_content += "." chunks.append({ 'section': 'AFFECTED_VERSIONS', 'content': affected_content }) return chunks def format_chunk(self, chunk: Dict[str, str]) -> str: """Formatiert einen Chunk im gewünschten Output-Format""" prefix = f"[Note {self.note_id} - {self.title}]" return f"{prefix} - Sektion [{chunk['section']}]: {chunk['content']}" def split_long_chunks(self, chunks: List[Dict[str, str]], max_length: int = 1000) -> List[Dict[str, str]]: """Teilt zu lange Chunks in kleinere Stücke auf (max_length Zeichen)""" result = [] for chunk in chunks: formatted = self.format_chunk(chunk) # Wenn Chunk kurz genug ist, direkt übernehmen if len(formatted) <= max_length: result.append(chunk) continue # Chunk ist zu lang - aufteilen section = chunk['section'] content = chunk['content'] # Berechne Platz für Content (abzüglich Präfix und Formatting) prefix = f"[Note {self.note_id} - {self.title}]" section_label = f" - Sektion [{section}_{{}}]: " overhead = len(prefix) + len(section_label.format("1")) available_space = max_length - overhead # Teile Content in Teile parts = [] words = content.split() current_part = [] current_length = 0 for word in words: word_length = len(word) + 1 if current_length + word_length > available_space and current_part: parts.append(' '.join(current_part)) current_part = [word] current_length = word_length else: current_part.append(word) current_length += word_length if current_part: parts.append(' '.join(current_part)) # Erstelle neue Chunks für jeden Teil for i, part in enumerate(parts, start=1): new_section = f"{section}_{i}" if len(parts) > 1 else section result.append({ 'section': new_section, 'content': part }) return result def extract_text_from_pdf(file_data: bytes) -> str: """Extrahiert Text aus PDF""" reader = PdfReader(io.BytesIO(file_data)) pages_text = [] for page in reader.pages: pages_text.append(page.extract_text() or "") return "\n".join(pages_text) def create_zip_for_pdf(article, chunks_data, source_name): """Erstellt eine ZIP-Datei für ein einzelnes PDF""" zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf: for i, chunk_data in enumerate(chunks_data, start=1): formatted = article.format_chunk(chunk_data) # Dateiname für den Chunk fname = f"chunk_{i}_{chunk_data['section']}.txt" # Dateiinhalt mit Quelle content = f"Quelle: Note:{source_name}\n" content += f"SAP KB Note: {article.note_id}\n" content += f"Sektion: {chunk_data['section']}\n\n" content += formatted # Füge Datei zum ZIP hinzu zf.writestr(fname, content) zip_buffer.seek(0) return zip_buffer.getvalue() # ----------------------------- # Streamlit UI # ----------------------------- st.set_page_config(page_title=APP_TITLE, layout="wide", page_icon="📑") st.title(APP_TITLE) st.markdown(""" Diese App verarbeitet **mehrere SAP Knowledge Base Artikel** (PDFs) gleichzeitig und erstellt für jedes PDF eine separate ZIP-Datei. **Unterstützte Sektionen:** - 📋 `METADATA_SUMMARY` - Grundinformationen - 🔍 `SYMPTOM_SCENARIO` - Problem & Reproduktion - 💡 `CAUSE_PREREQUISITES` - Ursache - ✅ `SOLUTION_PATCH` - Lösung - 🏷️ `AFFECTED_VERSIONS` - Betroffene Versionen **Features:** - ✅ Bulk-Verarbeitung mehrerer PDFs - ✅ Separate ZIP pro PDF - ✅ Automatische Chunk-Aufteilung (max. 1000 Zeichen) - ✅ Quelle: `Note:{Dateiname}` (ohne .pdf) """) st.divider() # File Upload st.subheader("📄 SAP KB Artikel hochladen") uploaded_files = st.file_uploader( "PDF-Dateien hochladen (einzeln oder mehrere)", type=["pdf"], accept_multiple_files=True, help="Ein oder mehrere SAP Knowledge Base Artikel als PDF" ) if uploaded_files: st.info(f"📦 {len(uploaded_files)} Datei(en) ausgewählt") # Zeige Liste der hochgeladenen Dateien with st.expander("📋 Hochgeladene Dateien"): for idx, file in enumerate(uploaded_files, start=1): source_name = file.name.replace('.pdf', '') st.write(f"{idx}. **{file.name}** → Quelle: `Note:{source_name}`") st.divider() # Process Button process_btn = st.button("🚀 Alle Artikel verarbeiten", type="primary", use_container_width=True) # ----------------------------- # Verarbeitung # ----------------------------- if process_btn: if not uploaded_files: st.error("❌ Bitte laden Sie mindestens eine PDF-Datei hoch.") st.stop() # Container für Ergebnisse results = [] errors = [] # Progress progress_bar = st.progress(0) status_text = st.empty() # Verarbeite jede PDF for idx, uploaded_file in enumerate(uploaded_files): try: # Extrahiere Dateiname ohne .pdf source_name = uploaded_file.name.replace('.pdf', '') status_text.text(f"🔄 Verarbeite {idx + 1}/{len(uploaded_files)}: {uploaded_file.name}...") # Text extrahieren file_data = uploaded_file.read() text_content = extract_text_from_pdf(file_data) if not text_content.strip(): errors.append(f"⚠️ {uploaded_file.name}: Kein Text gefunden") continue # Artikel verarbeiten article = SAPKBArticle(text_content) chunks_data = article.create_chunks() # Chunks aufteilen (max 1000 Zeichen) chunks_data = article.split_long_chunks(chunks_data, max_length=1000) if not chunks_data: errors.append(f"⚠️ {uploaded_file.name}: Keine Chunks erzeugt") continue # ZIP erstellen zip_data = create_zip_for_pdf(article, chunks_data, source_name) # Speichere Ergebnis results.append({ 'filename': uploaded_file.name, 'source_name': source_name, 'note_id': article.note_id, 'title': article.title, 'chunks_count': len(chunks_data), 'zip_data': zip_data, 'article': article, 'chunks': chunks_data }) progress_bar.progress((idx + 1) / len(uploaded_files)) except Exception as e: errors.append(f"❌ {uploaded_file.name}: {str(e)}") continue progress_bar.empty() status_text.empty() # Zeige Ergebnisse if not results: st.error("❌ Keine PDFs konnten verarbeitet werden.") if errors: st.error("\n\n".join(errors)) st.stop() st.success(f"✅ **{len(results)} von {len(uploaded_files)} PDF(s) erfolgreich verarbeitet**") if errors: with st.expander("⚠️ Fehler anzeigen"): for error in errors: st.warning(error) st.divider() # Zeige Übersicht st.subheader("📊 Verarbeitungs-Übersicht") overview_data = [] for result in results: overview_data.append({ 'Datei': result['filename'], 'Quelle': f"Note:{result['source_name']}", 'Note ID': result['note_id'], 'Chunks': result['chunks_count'] }) df_overview = pd.DataFrame(overview_data) st.dataframe(df_overview, use_container_width=True) st.divider() # Download-Buttons für jede ZIP st.subheader("💾 Downloads (ZIP pro PDF)") for result in results: col1, col2 = st.columns([3, 1]) with col1: st.write(f"**{result['filename']}**") st.caption(f"Note {result['note_id']} • {result['chunks_count']} Chunks • Quelle: Note:{result['source_name']}") with col2: zip_filename = f"{result['source_name']}_chunks.zip" st.download_button( "📥 ZIP", data=result['zip_data'], file_name=zip_filename, mime="application/zip", key=f"download_{result['source_name']}", use_container_width=True ) st.divider() # Optional: Master-ZIP mit allen ZIPs if len(results) > 1: st.subheader("📦 Alle ZIPs als Master-ZIP") master_zip_buffer = io.BytesIO() with zipfile.ZipFile(master_zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as master_zf: for result in results: zip_filename = f"{result['source_name']}_chunks.zip" master_zf.writestr(zip_filename, result['zip_data']) master_zip_buffer.seek(0) st.download_button( "📥 Master-ZIP herunterladen (alle ZIPs)", data=master_zip_buffer.getvalue(), file_name="sap_kb_all_chunks.zip", mime="application/zip", use_container_width=True ) # Footer st.divider() st.markdown("""
SAP KB Artikel Chunker (Bulk) v2.0 | Entwickelt für SAP Business One Knowledge Base Artikel
""", unsafe_allow_html=True)