beermen's picture
Upload app.py
7922a2b verified
import io
import re
import zipfile
import tempfile
from datetime import datetime
from typing import List, Dict
import os
import streamlit as st
import pandas as pd
from pypdf import PdfReader
APP_TITLE = "📑 SAP KB Artikel Chunker (Bulk)"
# -----------------------------
# SAP KB Parsing
# -----------------------------
class SAPKBArticle:
"""Repräsentiert einen SAP Knowledge Base Artikel"""
def __init__(self, text: str):
self.raw_text = text
self.note_id = ""
self.title = ""
self.component = ""
self.version = ""
self.release_date = ""
self.sections = {}
self._parse()
def _parse(self):
"""Extrahiert Metadaten und Sektionen aus dem Artikel"""
lines = self.raw_text.split('\n')
# Extrahiere Note ID und Title aus der ersten Zeile
first_line = lines[0] if lines else ""
match = re.match(r'(\d+)\s*-\s*(.+)', first_line)
if match:
self.note_id = match.group(1)
self.title = match.group(2).strip()
# Extrahiere Component, Version, Release Date
for line in lines[:10]:
if 'Component:' in line:
comp_match = re.search(r'Component:\s*([^,]+)', line)
if comp_match:
self.component = comp_match.group(1).strip()
if 'Version:' in line:
ver_match = re.search(r'Version:\s*(\d+)', line)
if ver_match:
self.version = ver_match.group(1)
if 'Released On:' in line:
date_match = re.search(r'Released On:\s*(.+)', line)
if date_match:
self.release_date = date_match.group(1).strip()
# Extrahiere Sektionen
self._extract_sections()
def _extract_sections(self):
"""Extrahiert die verschiedenen Sektionen des Artikels"""
text = self.raw_text
# Symptom
symptom_match = re.search(r'Symptom\s*\n(.*?)(?=\n\s*Environment|\n\s*Reproducing|$)', text, re.DOTALL | re.IGNORECASE)
if symptom_match:
self.sections['symptom'] = self._clean_text(symptom_match.group(1))
# Environment
env_match = re.search(r'Environment\s*\n(.*?)(?=\n\s*Reproducing|\n\s*Cause|$)', text, re.DOTALL | re.IGNORECASE)
if env_match:
self.sections['environment'] = self._clean_text(env_match.group(1))
# Reproducing the Issue
repro_match = re.search(r'Reproducing the Issue\s*\n(.*?)(?=\n\s*Cause|\n\s*Resolution|$)', text, re.DOTALL | re.IGNORECASE)
if repro_match:
self.sections['reproducing'] = self._clean_text(repro_match.group(1))
# Cause
cause_match = re.search(r'Cause\s*\n(.*?)(?=\n\s*Resolution|\n\s*Keywords|$)', text, re.DOTALL | re.IGNORECASE)
if cause_match:
self.sections['cause'] = self._clean_text(cause_match.group(1))
# Resolution
resolution_match = re.search(r'Resolution\s*\n(.*?)(?=\n\s*Keywords|\n\s*Attributes|$)', text, re.DOTALL | re.IGNORECASE)
if resolution_match:
self.sections['resolution'] = self._clean_text(resolution_match.group(1))
# Products/Versions
products_match = re.search(r'Products\s*\n.*?\n(.*?)$', text, re.DOTALL | re.IGNORECASE)
if products_match:
self.sections['products'] = self._clean_text(products_match.group(1))
# Other Components
other_comp = []
for match in re.finditer(r'Other Components\s+(.+)', text):
other_comp.append(match.group(1).strip())
if other_comp:
self.sections['other_components'] = ' | '.join(other_comp)
def _clean_text(self, text: str) -> str:
"""Bereinigt und komprimiert Text"""
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)
text = re.sub(r' +', ' ', text)
return text.strip()
def create_chunks(self) -> List[Dict[str, str]]:
"""Erstellt semantische Chunks im gewünschten Format"""
chunks = []
# 1. METADATA_SUMMARY
metadata_content = f"This note documents an issue in SAP Business One"
if self.title:
metadata_content += f" regarding {self.title.lower()}"
metadata_content += f". ID: {self.note_id}"
if self.component:
metadata_content += f", Component: {self.component}"
if self.version:
metadata_content += f", Version: {self.version}"
if self.release_date:
metadata_content += f", Released: {self.release_date}"
metadata_content += "."
chunks.append({
'section': 'METADATA_SUMMARY',
'content': metadata_content
})
# 2. SYMPTOM_SCENARIO
if 'symptom' in self.sections or 'reproducing' in self.sections:
symptom_content = ""
if 'symptom' in self.sections:
symptom_content = self.sections['symptom']
if 'reproducing' in self.sections:
if symptom_content:
symptom_content += " "
symptom_content += "Scenario: " + self.sections['reproducing']
chunks.append({
'section': 'SYMPTOM_SCENARIO',
'content': symptom_content
})
# 3. CAUSE_PREREQUISITES
if 'cause' in self.sections:
chunks.append({
'section': 'CAUSE_PREREQUISITES',
'content': self.sections['cause']
})
# 4. SOLUTION_PATCH
if 'resolution' in self.sections:
chunks.append({
'section': 'SOLUTION_PATCH',
'content': self.sections['resolution']
})
# 5. AFFECTED_VERSIONS
if 'products' in self.sections or 'environment' in self.sections or 'other_components' in self.sections:
affected_content = "This issue affects "
if 'products' in self.sections:
affected_content += self.sections['products']
elif 'environment' in self.sections:
affected_content += self.sections['environment']
if 'other_components' in self.sections:
affected_content += f". Relevant components include {self.sections['other_components']}"
affected_content += "."
chunks.append({
'section': 'AFFECTED_VERSIONS',
'content': affected_content
})
return chunks
def format_chunk(self, chunk: Dict[str, str]) -> str:
"""Formatiert einen Chunk im gewünschten Output-Format"""
prefix = f"[Note {self.note_id} - {self.title}]"
return f"{prefix} - Sektion [{chunk['section']}]: {chunk['content']}"
def split_long_chunks(self, chunks: List[Dict[str, str]], max_length: int = 1000) -> List[Dict[str, str]]:
"""Teilt zu lange Chunks in kleinere Stücke auf (max_length Zeichen)"""
result = []
for chunk in chunks:
formatted = self.format_chunk(chunk)
# Wenn Chunk kurz genug ist, direkt übernehmen
if len(formatted) <= max_length:
result.append(chunk)
continue
# Chunk ist zu lang - aufteilen
section = chunk['section']
content = chunk['content']
# Berechne Platz für Content (abzüglich Präfix und Formatting)
prefix = f"[Note {self.note_id} - {self.title}]"
section_label = f" - Sektion [{section}_{{}}]: "
overhead = len(prefix) + len(section_label.format("1"))
available_space = max_length - overhead
# Teile Content in Teile
parts = []
words = content.split()
current_part = []
current_length = 0
for word in words:
word_length = len(word) + 1
if current_length + word_length > available_space and current_part:
parts.append(' '.join(current_part))
current_part = [word]
current_length = word_length
else:
current_part.append(word)
current_length += word_length
if current_part:
parts.append(' '.join(current_part))
# Erstelle neue Chunks für jeden Teil
for i, part in enumerate(parts, start=1):
new_section = f"{section}_{i}" if len(parts) > 1 else section
result.append({
'section': new_section,
'content': part
})
return result
def extract_text_from_pdf(file_data: bytes) -> str:
"""Extrahiert Text aus PDF"""
reader = PdfReader(io.BytesIO(file_data))
pages_text = []
for page in reader.pages:
pages_text.append(page.extract_text() or "")
return "\n".join(pages_text)
def create_zip_for_pdf(article, chunks_data, source_name):
"""Erstellt eine ZIP-Datei für ein einzelnes PDF"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for i, chunk_data in enumerate(chunks_data, start=1):
formatted = article.format_chunk(chunk_data)
# Dateiname für den Chunk
fname = f"chunk_{i}_{chunk_data['section']}.txt"
# Dateiinhalt mit Quelle
content = f"Quelle: Note:{source_name}\n"
content += f"SAP KB Note: {article.note_id}\n"
content += f"Sektion: {chunk_data['section']}\n\n"
content += formatted
# Füge Datei zum ZIP hinzu
zf.writestr(fname, content)
zip_buffer.seek(0)
return zip_buffer.getvalue()
# -----------------------------
# Streamlit UI
# -----------------------------
st.set_page_config(page_title=APP_TITLE, layout="wide", page_icon="📑")
st.title(APP_TITLE)
st.markdown("""
Diese App verarbeitet **mehrere SAP Knowledge Base Artikel** (PDFs) gleichzeitig und erstellt für jedes PDF eine separate ZIP-Datei.
**Unterstützte Sektionen:**
- 📋 `METADATA_SUMMARY` - Grundinformationen
- 🔍 `SYMPTOM_SCENARIO` - Problem & Reproduktion
- 💡 `CAUSE_PREREQUISITES` - Ursache
- ✅ `SOLUTION_PATCH` - Lösung
- 🏷️ `AFFECTED_VERSIONS` - Betroffene Versionen
**Features:**
- ✅ Bulk-Verarbeitung mehrerer PDFs
- ✅ Separate ZIP pro PDF
- ✅ Automatische Chunk-Aufteilung (max. 1000 Zeichen)
- ✅ Quelle: `Note:{Dateiname}` (ohne .pdf)
""")
st.divider()
# File Upload
st.subheader("📄 SAP KB Artikel hochladen")
uploaded_files = st.file_uploader(
"PDF-Dateien hochladen (einzeln oder mehrere)",
type=["pdf"],
accept_multiple_files=True,
help="Ein oder mehrere SAP Knowledge Base Artikel als PDF"
)
if uploaded_files:
st.info(f"📦 {len(uploaded_files)} Datei(en) ausgewählt")
# Zeige Liste der hochgeladenen Dateien
with st.expander("📋 Hochgeladene Dateien"):
for idx, file in enumerate(uploaded_files, start=1):
source_name = file.name.replace('.pdf', '')
st.write(f"{idx}. **{file.name}** → Quelle: `Note:{source_name}`")
st.divider()
# Process Button
process_btn = st.button("🚀 Alle Artikel verarbeiten", type="primary", use_container_width=True)
# -----------------------------
# Verarbeitung
# -----------------------------
if process_btn:
if not uploaded_files:
st.error("❌ Bitte laden Sie mindestens eine PDF-Datei hoch.")
st.stop()
# Container für Ergebnisse
results = []
errors = []
# Progress
progress_bar = st.progress(0)
status_text = st.empty()
# Verarbeite jede PDF
for idx, uploaded_file in enumerate(uploaded_files):
try:
# Extrahiere Dateiname ohne .pdf
source_name = uploaded_file.name.replace('.pdf', '')
status_text.text(f"🔄 Verarbeite {idx + 1}/{len(uploaded_files)}: {uploaded_file.name}...")
# Text extrahieren
file_data = uploaded_file.read()
text_content = extract_text_from_pdf(file_data)
if not text_content.strip():
errors.append(f"⚠️ {uploaded_file.name}: Kein Text gefunden")
continue
# Artikel verarbeiten
article = SAPKBArticle(text_content)
chunks_data = article.create_chunks()
# Chunks aufteilen (max 1000 Zeichen)
chunks_data = article.split_long_chunks(chunks_data, max_length=1000)
if not chunks_data:
errors.append(f"⚠️ {uploaded_file.name}: Keine Chunks erzeugt")
continue
# ZIP erstellen
zip_data = create_zip_for_pdf(article, chunks_data, source_name)
# Speichere Ergebnis
results.append({
'filename': uploaded_file.name,
'source_name': source_name,
'note_id': article.note_id,
'title': article.title,
'chunks_count': len(chunks_data),
'zip_data': zip_data,
'article': article,
'chunks': chunks_data
})
progress_bar.progress((idx + 1) / len(uploaded_files))
except Exception as e:
errors.append(f"❌ {uploaded_file.name}: {str(e)}")
continue
progress_bar.empty()
status_text.empty()
# Zeige Ergebnisse
if not results:
st.error("❌ Keine PDFs konnten verarbeitet werden.")
if errors:
st.error("\n\n".join(errors))
st.stop()
st.success(f"✅ **{len(results)} von {len(uploaded_files)} PDF(s) erfolgreich verarbeitet**")
if errors:
with st.expander("⚠️ Fehler anzeigen"):
for error in errors:
st.warning(error)
st.divider()
# Zeige Übersicht
st.subheader("📊 Verarbeitungs-Übersicht")
overview_data = []
for result in results:
overview_data.append({
'Datei': result['filename'],
'Quelle': f"Note:{result['source_name']}",
'Note ID': result['note_id'],
'Chunks': result['chunks_count']
})
df_overview = pd.DataFrame(overview_data)
st.dataframe(df_overview, use_container_width=True)
st.divider()
# Download-Buttons für jede ZIP
st.subheader("💾 Downloads (ZIP pro PDF)")
for result in results:
col1, col2 = st.columns([3, 1])
with col1:
st.write(f"**{result['filename']}**")
st.caption(f"Note {result['note_id']}{result['chunks_count']} Chunks • Quelle: Note:{result['source_name']}")
with col2:
zip_filename = f"{result['source_name']}_chunks.zip"
st.download_button(
"📥 ZIP",
data=result['zip_data'],
file_name=zip_filename,
mime="application/zip",
key=f"download_{result['source_name']}",
use_container_width=True
)
st.divider()
# Optional: Master-ZIP mit allen ZIPs
if len(results) > 1:
st.subheader("📦 Alle ZIPs als Master-ZIP")
master_zip_buffer = io.BytesIO()
with zipfile.ZipFile(master_zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as master_zf:
for result in results:
zip_filename = f"{result['source_name']}_chunks.zip"
master_zf.writestr(zip_filename, result['zip_data'])
master_zip_buffer.seek(0)
st.download_button(
"📥 Master-ZIP herunterladen (alle ZIPs)",
data=master_zip_buffer.getvalue(),
file_name="sap_kb_all_chunks.zip",
mime="application/zip",
use_container_width=True
)
# Footer
st.divider()
st.markdown("""
<div style='text-align: center; color: #666; font-size: 0.9em;'>
SAP KB Artikel Chunker (Bulk) v2.0 | Entwickelt für SAP Business One Knowledge Base Artikel
</div>
""", unsafe_allow_html=True)