File size: 6,661 Bytes
728e272
0dd3fa8
2c877f9
728e272
2c877f9
2bfd637
 
37fd47e
2bfd637
 
047910f
37fd47e
 
 
 
 
 
 
 
 
 
 
 
2430bd2
 
 
 
37fd47e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7189de1
 
015eda9
7189de1
 
 
8dd52e3
23f05a3
 
 
 
 
0dd3fa8
23f05a3
 
015eda9
8dd52e3
 
23f05a3
 
 
 
8dd52e3
 
fd470c7
2387771
 
057338c
591899b
 
fd470c7
 
 
7189de1
0dd3fa8
23f05a3
fd470c7
015eda9
23f05a3
 
 
fd470c7
23f05a3
fd470c7
 
23f05a3
 
015eda9
 
fd470c7
591899b
fd470c7
015eda9
fd470c7
 
2387771
fd470c7
 
015eda9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fd470c7
 
 
95c14d5
015eda9
fd470c7
015eda9
 
fd470c7
d796615
23f05a3
fd470c7
015eda9
d796615
23f05a3
fd470c7
2bfd637
 
2430bd2
 
 
2bfd637
 
2430bd2
 
 
d4fbc40
 
2430bd2
2387771
d796615
015eda9
 
fd470c7
2430bd2
 
 
 
 
 
37fd47e
fd470c7
2430bd2
d796615
 
0ac6697
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import gradio as gr
import requests
from datetime import datetime, timedelta
import xml.etree.ElementTree as ET
import json
import os

# Lade Anmeldeinformationen aus Umgebungsvariablen
CLIENT_ID = os.environ.get('client_id')
API_KEY = os.environ.get('api_key')

def json_to_markdown_table(json_data):
    """
    Konvertiert JSON-Daten in eine professionell formatierte Markdown-Tabelle
    """
    if isinstance(json_data, str):
        data = json.loads(json_data)
    else:
        data = json_data
    
    if not data:
        return "Keine Daten vorhanden"
    
    # Check for error
    if isinstance(data, dict) and "error" in data:
        return f"**Fehler:** {data['error']}"
    
    header_mapping = {
        "startort": "🚉 Startort",
        "zielort": "🎯 Zielort",
        "abfahrtszeit": "⏰ Abfahrt",
        "ankunftszeitszeit": "⏱️ Ankunft",
        "gleis": "🛤️ Gleis",
        "duration": "⏳ Dauer"
    }
    
    columns = list(data[0].keys())
    headers = [header_mapping.get(col, col) for col in columns]
    
    markdown = "| " + " | ".join(headers) + " |\n"
    markdown += "|" + "|".join(["---" for _ in headers]) + "|\n"
    
    for row in data:
        values = [str(row.get(col, "")) for col in columns]
        markdown += "| " + " | ".join(values) + " |\n"
    
    return markdown

def get_station_info(pattern, client_id, api_key):
    url = f"https://apis.deutschebahn.com/db-api-marketplace/apis/timetables/v1/station/{pattern}"
    headers = {'DB-Client-Id': client_id, 'DB-Api-Key': api_key, 'accept': 'application/xml'}
    try:
        response = requests.get(url, headers=headers, timeout=10)
        root = ET.fromstring(response.content)
        station = root.find('station')
        if station is not None:
            return {'name': station.get('name'), 'eva': station.get('eva')}
    except Exception:
        return None
    return None

def fetch_timetable_hour(eva, date_str, hour_str, client_id, api_key):
    url = f"https://apis.deutschebahn.com/db-api-marketplace/apis/timetables/v1/plan/{eva}/{date_str}/{hour_str}"
    headers = {'DB-Client-Id': client_id, 'DB-Api-Key': api_key, 'accept': 'application/xml'}
    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code == 200:
            return ET.fromstring(response.content)
    except Exception:
        pass
    return None

def search_next_3_connections(departure, destination, client_id, api_key):
    if not all([departure, destination, client_id, api_key]):
        return {"error": "Bitte alle Felder ausfüllen."}

    dep_info = get_station_info(departure, client_id, api_key)
    dest_info = get_station_info(destination, client_id, api_key)
    
    if not dep_info: return {"error": f"Startort '{departure}' nicht gefunden."}
    if not dest_info: return {"error": f"Zielort '{destination}' nicht gefunden."}

    now = datetime.now()
    found_connections = []
    
    for hour_offset in range(3): 
        search_time = now + timedelta(hours=hour_offset)
        datum = search_time.strftime("%y%m%d")
        stunde = search_time.strftime("%H")
        
        root = fetch_timetable_hour(dep_info['eva'], datum, stunde, client_id, api_key)
        if root is None: continue
            
        for train in root.findall('.//s'):
            dp = train.find('dp')
            tl = train.find('tl')
            if dp is None or tl is None: continue
            
            path = dp.get('ppth', '')
            if dest_info['name'].lower() in path.lower() or destination.lower() in path.lower():
                pt = dp.get('pt', '')
                if not pt: continue
                
                dep_dt = datetime.strptime(pt, "%y%m%d%H%M")
                
                if dep_dt >= now:
                    arr_time_str = "N/A"
                    duration_str = "N/A"
                    
                    arrival_root = fetch_timetable_hour(dest_info['eva'], dep_dt.strftime("%y%m%d"), dep_dt.strftime("%H"), client_id, api_key)
                    
                    if arrival_root is not None:
                        for arr_train in arrival_root.findall('.//s'):
                            arr_tl = arr_train.find('tl')
                            arr_ar = arr_train.find('ar')
                            if arr_tl is not None and arr_ar is not None:
                                if arr_tl.get('n') == tl.get('n'):
                                    arr_pt = arr_ar.get('pt', '')
                                    arr_dt = datetime.strptime(arr_pt, "%y%m%d%H%M")
                                    arr_time_str = arr_dt.strftime("%H:%M")
                                    
                                    diff = arr_dt - dep_dt
                                    duration_str = f"{int(diff.total_seconds() // 60)} min"
                                    break

                    found_connections.append({
                        "startort": dep_info['name'],
                        "zielort": dest_info['name'],
                        "abfahrtszeit": dep_dt.strftime("%H:%M"),
                        "ankunftszeitszeit": arr_time_str,
                        "gleis": dp.get('pp', 'n/a'),
                        "duration": duration_str,
                        "_dt": dep_dt 
                    })

    found_connections.sort(key=lambda x: x['_dt'])
    
    return [{k: v for k, v in c.items() if k != "_dt"} for c in found_connections[:3]]

# --- Gradio UI ---

def ui_wrapper(dep, dest):
    if not CLIENT_ID or not API_KEY:
        error_result = {"error": "API-Anmeldeinformationen nicht konfiguriert."}
        error_markdown = "**Fehler:** API-Anmeldeinformationen nicht konfiguriert."
        return error_result, error_markdown
    
    results = search_next_3_connections(dep, dest, CLIENT_ID, API_KEY)
    markdown_table = json_to_markdown_table(results)
    
    return results, markdown_table

with gr.Blocks(title="DB JSON Fahrplan", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# 🚆 DB │ Deutsche Bahn Zugverbindungen")
    
    with gr.Row():
        dep_input = gr.Textbox(label="Abfahrtsort", placeholder="z.B. Schweinfurt Hbf")
        dest_input = gr.Textbox(label="Zielort", placeholder="z.B. Oerlenbach")
        
    btn = gr.Button("🔍 Suchen", variant="primary")
    
    gr.Markdown("### JSON Ergebnis")
    output = gr.JSON(label="Rohdaten")
    
    gr.Markdown("### Formatierte Tabelle")
    md_output = gr.Markdown()
    
    btn.click(fn=ui_wrapper, inputs=[dep_input, dest_input], outputs=[output, md_output])

if __name__ == "__main__":
    demo.launch(mcp_server=True)