Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| from datetime import datetime, timedelta | |
| import xml.etree.ElementTree as ET | |
| import json | |
| import os | |
| # Lade Anmeldeinformationen aus Umgebungsvariablen | |
| CLIENT_ID = os.environ.get('client_id') | |
| API_KEY = os.environ.get('api_key') | |
| def json_to_markdown_table(json_data): | |
| """ | |
| Konvertiert JSON-Daten in eine professionell formatierte Markdown-Tabelle | |
| """ | |
| if isinstance(json_data, str): | |
| data = json.loads(json_data) | |
| else: | |
| data = json_data | |
| if not data: | |
| return "Keine Daten vorhanden" | |
| # Check for error | |
| if isinstance(data, dict) and "error" in data: | |
| return f"**Fehler:** {data['error']}" | |
| header_mapping = { | |
| "startort": "🚉 Startort", | |
| "zielort": "🎯 Zielort", | |
| "abfahrtszeit": "⏰ Abfahrt", | |
| "ankunftszeitszeit": "⏱️ Ankunft", | |
| "gleis": "🛤️ Gleis", | |
| "duration": "⏳ Dauer" | |
| } | |
| columns = list(data[0].keys()) | |
| headers = [header_mapping.get(col, col) for col in columns] | |
| markdown = "| " + " | ".join(headers) + " |\n" | |
| markdown += "|" + "|".join(["---" for _ in headers]) + "|\n" | |
| for row in data: | |
| values = [str(row.get(col, "")) for col in columns] | |
| markdown += "| " + " | ".join(values) + " |\n" | |
| return markdown | |
| def get_station_info(pattern, client_id, api_key): | |
| url = f"https://apis.deutschebahn.com/db-api-marketplace/apis/timetables/v1/station/{pattern}" | |
| headers = {'DB-Client-Id': client_id, 'DB-Api-Key': api_key, 'accept': 'application/xml'} | |
| try: | |
| response = requests.get(url, headers=headers, timeout=10) | |
| root = ET.fromstring(response.content) | |
| station = root.find('station') | |
| if station is not None: | |
| return {'name': station.get('name'), 'eva': station.get('eva')} | |
| except Exception: | |
| return None | |
| return None | |
| def fetch_timetable_hour(eva, date_str, hour_str, client_id, api_key): | |
| url = f"https://apis.deutschebahn.com/db-api-marketplace/apis/timetables/v1/plan/{eva}/{date_str}/{hour_str}" | |
| headers = {'DB-Client-Id': client_id, 'DB-Api-Key': api_key, 'accept': 'application/xml'} | |
| try: | |
| response = requests.get(url, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| return ET.fromstring(response.content) | |
| except Exception: | |
| pass | |
| return None | |
| def search_next_3_connections(departure, destination, client_id, api_key): | |
| if not all([departure, destination, client_id, api_key]): | |
| return {"error": "Bitte alle Felder ausfüllen."} | |
| dep_info = get_station_info(departure, client_id, api_key) | |
| dest_info = get_station_info(destination, client_id, api_key) | |
| if not dep_info: return {"error": f"Startort '{departure}' nicht gefunden."} | |
| if not dest_info: return {"error": f"Zielort '{destination}' nicht gefunden."} | |
| now = datetime.now() | |
| found_connections = [] | |
| for hour_offset in range(3): | |
| search_time = now + timedelta(hours=hour_offset) | |
| datum = search_time.strftime("%y%m%d") | |
| stunde = search_time.strftime("%H") | |
| root = fetch_timetable_hour(dep_info['eva'], datum, stunde, client_id, api_key) | |
| if root is None: continue | |
| for train in root.findall('.//s'): | |
| dp = train.find('dp') | |
| tl = train.find('tl') | |
| if dp is None or tl is None: continue | |
| path = dp.get('ppth', '') | |
| if dest_info['name'].lower() in path.lower() or destination.lower() in path.lower(): | |
| pt = dp.get('pt', '') | |
| if not pt: continue | |
| dep_dt = datetime.strptime(pt, "%y%m%d%H%M") | |
| if dep_dt >= now: | |
| arr_time_str = "N/A" | |
| duration_str = "N/A" | |
| arrival_root = fetch_timetable_hour(dest_info['eva'], dep_dt.strftime("%y%m%d"), dep_dt.strftime("%H"), client_id, api_key) | |
| if arrival_root is not None: | |
| for arr_train in arrival_root.findall('.//s'): | |
| arr_tl = arr_train.find('tl') | |
| arr_ar = arr_train.find('ar') | |
| if arr_tl is not None and arr_ar is not None: | |
| if arr_tl.get('n') == tl.get('n'): | |
| arr_pt = arr_ar.get('pt', '') | |
| arr_dt = datetime.strptime(arr_pt, "%y%m%d%H%M") | |
| arr_time_str = arr_dt.strftime("%H:%M") | |
| diff = arr_dt - dep_dt | |
| duration_str = f"{int(diff.total_seconds() // 60)} min" | |
| break | |
| found_connections.append({ | |
| "startort": dep_info['name'], | |
| "zielort": dest_info['name'], | |
| "abfahrtszeit": dep_dt.strftime("%H:%M"), | |
| "ankunftszeitszeit": arr_time_str, | |
| "gleis": dp.get('pp', 'n/a'), | |
| "duration": duration_str, | |
| "_dt": dep_dt | |
| }) | |
| found_connections.sort(key=lambda x: x['_dt']) | |
| return [{k: v for k, v in c.items() if k != "_dt"} for c in found_connections[:3]] | |
| # --- Gradio UI --- | |
| def ui_wrapper(dep, dest): | |
| if not CLIENT_ID or not API_KEY: | |
| error_result = {"error": "API-Anmeldeinformationen nicht konfiguriert."} | |
| error_markdown = "**Fehler:** API-Anmeldeinformationen nicht konfiguriert." | |
| return error_result, error_markdown | |
| results = search_next_3_connections(dep, dest, CLIENT_ID, API_KEY) | |
| markdown_table = json_to_markdown_table(results) | |
| return results, markdown_table | |
| with gr.Blocks(title="DB JSON Fahrplan", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🚆 DB │ Deutsche Bahn Zugverbindungen") | |
| with gr.Row(): | |
| dep_input = gr.Textbox(label="Abfahrtsort", placeholder="z.B. Schweinfurt Hbf") | |
| dest_input = gr.Textbox(label="Zielort", placeholder="z.B. Oerlenbach") | |
| btn = gr.Button("🔍 Suchen", variant="primary") | |
| gr.Markdown("### JSON Ergebnis") | |
| output = gr.JSON(label="Rohdaten") | |
| gr.Markdown("### Formatierte Tabelle") | |
| md_output = gr.Markdown() | |
| btn.click(fn=ui_wrapper, inputs=[dep_input, dest_input], outputs=[output, md_output]) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |