shvchenko commited on
Commit
4754988
Β·
verified Β·
1 Parent(s): cde23f3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +125 -233
app.py CHANGED
@@ -1,246 +1,138 @@
1
- import re
2
- import json
3
  import random
4
  import string
5
- import gradio as gr
 
6
  from difflib import get_close_matches
7
 
8
- CHANNELS_JSON = "channels_fixed.json"
9
-
10
- # Abbreviations mapping for Events channels
11
- ABBREVIATIONS = {
12
- "BTN": "BIG TEN Network (BTN USA)",
13
- # add other known abbreviations here
14
- }
15
-
16
- # Special-case tvg-ids
17
- SPECIAL_CASES = {
18
- "MLB LEAGUE PASS": "MLB.Baseball.Dummy.us",
19
- "NHL GAMECENTER": "NHL.Hockey.Dummy.us",
20
- }
21
-
22
- def normalize(s: str) -> str:
23
- s = (s or "").lower()
24
- s = re.sub(r'[^a-z0-9]', '', s)
25
- s = re.sub(r'^0+', '', s) # remove leading zeros
26
- return s
27
-
28
- def find_best_match(candidate: str, keys):
29
- """Normalize + fuzzy match candidate against keys; returns matching key or None."""
30
- if not candidate:
31
- return None
32
- candidate_norm = normalize(candidate)
33
- norm_map = {normalize(k): k for k in keys}
34
- # direct normalized match
35
- if candidate_norm in norm_map:
36
- return norm_map[candidate_norm]
37
- # fuzzy on normalized keys
38
- matches = get_close_matches(candidate_norm, list(norm_map.keys()), n=1, cutoff=0.75)
39
- if matches:
40
- return norm_map[matches[0]]
41
- return None
42
-
43
- def generate_filename(upper=False):
44
- letters = ''.join(random.choices(string.ascii_uppercase if upper else string.ascii_lowercase, k=3))
45
- return f"{letters}.m3u"
46
-
47
- def process_playlist_text(m3u_text):
48
- # load JSON reference
49
- try:
50
- with open(CHANNELS_JSON, "r", encoding="utf-8") as f:
51
- channels_ref = json.load(f)
52
- except Exception as e:
53
- return f"Error loading {CHANNELS_JSON}: {e}", None, None
54
-
55
  lines = m3u_text.splitlines()
56
- headers = []
57
- blocks = [] # list of dicts: {ext, url, section, orig_index}
58
-
59
- i = 0
60
- idx = 0
61
- while i < len(lines):
62
- line = lines[i].rstrip("\n")
63
- if line.startswith("#EXTINF"):
64
- ext = line
65
- url = ""
66
- # take the next line as the URL if present
67
- if i + 1 < len(lines):
68
- url = lines[i+1].rstrip("\n")
69
- i += 2
70
- else:
71
- i += 1
72
- # determine section
73
- sec = "other"
74
- # robust check for 24/7 and EVENTS
75
- if 'group-title="' in ext and '24/7' in ext:
76
- sec = "24_7"
77
- elif 'EVENTS|' in ext:
78
- sec = "events"
79
- blocks.append({"ext": ext, "url": url, "section": sec, "idx": idx})
80
- idx += 1
81
- else:
82
- # header / comment lines (keep for both outputs)
83
- headers.append(line)
84
- i += 1
85
-
86
- # now process each block to decide tvg-id updates; we will NOT remove group-title here
87
- out_24_7_blocks = []
88
  out_events_blocks = []
89
  log = []
90
 
91
- json_keys = list(channels_ref.keys())
92
-
93
- for b in blocks:
94
- ext = b["ext"]
95
- url = b["url"]
96
- sec = b["section"]
97
-
98
- # default: no change
99
- new_ext = ext
100
-
101
- if sec == "events":
102
- # extract bracketed channel name if present, otherwise fallback to display name (after last comma)
103
- m = re.search(r'\[([^\]]+)\]', ext)
104
- candidate = m.group(1).strip() if m else ext.split(",")[-1].strip()
105
-
106
- # check abbreviations first (exact match)
107
- match_key = None
108
- if candidate in ABBREVIATIONS:
109
- mapkey = ABBREVIATIONS[candidate]
110
- if mapkey in channels_ref:
111
- match_key = mapkey
112
-
113
- # special cases by substring/inclusion in candidate or ext line
114
- if not match_key:
115
- up = candidate.upper()
116
- ext_up = ext.upper()
117
- if "MLB LEAGUE PASS" in up or "MLB LEAGUE PASS" in ext_up:
118
- chosen_tvg = SPECIAL_CASES["MLB LEAGUE PASS"]
119
- match_key = None
120
- elif "NHL GAMECENTER" in up or "NHL GAMECENTER" in ext_up:
121
- chosen_tvg = SPECIAL_CASES["NHL GAMECENTER"]
122
- match_key = None
123
  else:
124
- chosen_tvg = None
125
-
126
- # ATP/WTA fallback
127
- if not match_key and not ('chosen_tvg' in locals() and chosen_tvg):
128
- if re.search(r'\b(ATP|WTA)\b', candidate, flags=re.IGNORECASE):
129
- chosen_tvg = "Tennis.Channel.us"
130
-
131
- # if no chosen_tvg yet, attempt fuzzy lookup
132
- if not ('chosen_tvg' in locals() and chosen_tvg):
133
- if match_key is None:
134
- best = find_best_match(candidate, json_keys)
135
- if best:
136
- match_key = best
137
-
138
- # derive tvg-id
139
- if 'chosen_tvg' in locals() and chosen_tvg:
140
- tvg_id = chosen_tvg
141
- elif match_key:
142
- tvg_id = channels_ref.get(match_key, {}).get("tvg_id", None)
143
- else:
144
- tvg_id = "Live.Event.us"
145
-
146
- # if ext had tvg-id="test", override with Live.Event.us (explicit)
147
- if 'tvg-id="test"' in ext:
148
- tvg_id = "Live.Event.us"
149
-
150
- # insert or replace tvg-id attribute (do NOT strip group-title here)
151
- if tvg_id:
152
- if 'tvg-id="' in new_ext:
153
- new_ext = re.sub(r'tvg-id="[^"]*"', f'tvg-id="{tvg_id}"', new_ext, count=1)
 
 
 
 
 
 
 
 
154
  else:
155
- # insert after the #EXTINF:-1 token (keep other attributes)
156
- new_ext = re.sub(r'(#EXTINF:-?\d+)(\s*)', r'\1 tvg-id="' + tvg_id + r'"\2', new_ext, count=1)
157
-
158
- # clean up chosen_tvg variable for next loop
159
- if 'chosen_tvg' in locals():
160
- del chosen_tvg
161
-
162
- # logging
163
- if match_key:
164
- log.append(f"βœ… EVENTS | {candidate} β†’ {tvg_id} (matched {match_key})")
165
- else:
166
- # if we used an abbreviations mapped key, we already set match_key; otherwise unmatched
167
- log.append(f"⚠️ EVENTS | {candidate} β†’ {tvg_id} (no exact json match)")
168
-
169
- out_events_blocks.append((new_ext, url))
170
-
171
- elif sec == "24_7":
172
- # use display name after last comma
173
- display_name = ext.split(",")[-1].strip()
174
- best = find_best_match(display_name, json_keys)
175
- if best:
176
- tvg_id = channels_ref.get(best, {}).get("tvg_id", None)
177
- else:
178
- tvg_id = "Info.Guide.Dummy.us"
179
-
180
- # replace test if present
181
- if 'tvg-id="test"' in ext:
182
- tvg_id = "Info.Guide.Dummy.us"
183
-
184
- # insert or replace
185
- if 'tvg-id="' in new_ext:
186
- new_ext = re.sub(r'tvg-id="[^"]*"', f'tvg-id="{tvg_id}"', new_ext, count=1)
187
- else:
188
- new_ext = re.sub(r'(#EXTINF:-?\d+)(\s*)', r'\1 tvg-id="' + tvg_id + r'"\2', new_ext, count=1)
189
-
190
- log.append(f"βœ… 24/7 | {display_name} β†’ {tvg_id}" if best else f"⚠️ 24/7 | {display_name} β†’ {tvg_id} (no json match)")
191
-
192
- out_24_7_blocks.append((new_ext, url))
193
-
194
- else:
195
- # other: keep as-is (but still write as block if has URL)
196
- out_24_7_blocks.append((ext, url))
197
- out_events_blocks.append((ext, url))
198
-
199
- # Build output text for both playlists (headers first, then blocks)
200
- def build_text(header_lines, block_list):
201
- out = []
202
- # write header lines, but skip the url-tvg epg line if present (optional)
203
- for h in header_lines:
204
- # optional: skip url-tvg line that pointed to xml epg if present
205
- if h.strip().startswith('#EXTM3U url-tvg='):
206
- continue
207
- out.append(h)
208
- # then blocks
209
- for ext, url in block_list:
210
- out.append(ext)
211
- # write URL on next line (even if empty)
212
- if url is not None:
213
- out.append(url)
214
- return "\n".join(out) + "\n"
215
-
216
- text_24_7 = build_text(headers, out_24_7_blocks)
217
- text_events = build_text(headers, out_events_blocks)
218
-
219
- # write to files with random names
220
- f24 = generate_filename(upper=True)
221
- fev = generate_filename(upper=False)
222
- with open(f24, "w", encoding="utf-8") as f:
223
- f.write(text_24_7)
224
- with open(fev, "w", encoding="utf-8") as f:
225
- f.write(text_events)
226
-
227
- return ("\n".join(log), f24, fev)
228
-
229
- # --- Gradio UI ---
230
- def run_gradio(m3u_text):
231
- status, f1, f2 = process_playlist_text(m3u_text)
232
- return status, f1, f2
233
-
234
- demo = gr.Interface(
235
- fn=process_playlist_text,
236
- inputs=[gr.Textbox(label="Paste M3U8 Playlist Here", lines=30)],
237
- outputs=[
238
- gr.Textbox(label="Update Log", lines=25),
239
- gr.File(label="Download 24/7 Channels M3U"),
240
- gr.File(label="Download Events M3U"),
241
- ],
242
- title="Project 1 β€” M3U Updater (stable output format)"
243
  )
244
 
245
  if __name__ == "__main__":
246
- demo.launch()
 
1
+ import gradio as gr
 
2
  import random
3
  import string
4
+ import re
5
+ import json
6
  from difflib import get_close_matches
7
 
8
+ # Load channel mapping JSON
9
+ with open("channels_fixed.json", "r", encoding="utf-8") as f:
10
+ channel_map = json.load(f)
11
+
12
+ # Random 3-character filename generator
13
+ def random_filename(uppercase=True):
14
+ letters = string.ascii_uppercase if uppercase else string.ascii_lowercase
15
+ return "".join(random.choice(letters) for _ in range(3)) + ".m3u"
16
+
17
+ # Apply special event rules
18
+ def apply_event_special_cases(extinf_line, channel_name):
19
+ line = extinf_line
20
+ if "MLB LEAGUE PASS" in channel_name:
21
+ return re.sub(r'tvg-id="[^"]*"', 'tvg-id="MLB.Baseball.Dummy.us"', line)
22
+ if "NHL GAMECENTER" in channel_name:
23
+ return re.sub(r'tvg-id="[^"]*"', 'tvg-id="NHL.Hockey.Dummy.us"', line)
24
+ if "ATP" in channel_name or "WTA" in channel_name:
25
+ return re.sub(r'tvg-id="[^"]*"', 'tvg-id="Tennis.Channel.us"', line)
26
+ if 'tvg-id="test"' in line:
27
+ return line.replace('tvg-id="test"', 'tvg-id="Live.Event.us"')
28
+ return line
29
+
30
+ # Main processor
31
+ def process_m3u(m3u_text):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  lines = m3u_text.splitlines()
33
+ out_247_blocks = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  out_events_blocks = []
35
  log = []
36
 
37
+ # Hardcoded abbreviation exceptions
38
+ exceptions = {
39
+ "BTN": "Big.Ten.Network.HD.us2",
40
+ "SNY": "SNY.SportsNet.New.York.HD.us2",
41
+ "MASN": "MASN.-.Mid.Atlantic.Sports.Network.us2",
42
+ }
43
+
44
+ for i, line in enumerate(lines):
45
+ if line.startswith("#EXTINF"):
46
+ url = lines[i+1] if i+1 < len(lines) else ""
47
+ extinf = line
48
+
49
+ # Extract tvg-id and channel name
50
+ tvg_match = re.search(r'tvg-id="([^"]*)"', extinf)
51
+ tvg_id = tvg_match.group(1) if tvg_match else None
52
+ name_match = re.search(r",(.*)", extinf)
53
+ channel_name = name_match.group(1).strip() if name_match else ""
54
+
55
+ # Extract group
56
+ group_match = re.search(r'group-title="([^"]*)"', extinf)
57
+ group_title = group_match.group(1).upper() if group_match else ""
58
+
59
+ # --- 24/7 CHANNELS ---
60
+ if "24/7" in group_title:
61
+ if tvg_id == "test" or not tvg_id:
62
+ new_ext = re.sub(r'tvg-id="[^"]*"', 'tvg-id="Info.Guide.Dummy.us"', extinf) \
63
+ if 'tvg-id="' in extinf else extinf + ' tvg-id="Info.Guide.Dummy.us"'
64
+ log.append(f"ℹ️ 24/7 | {channel_name} β†’ Info.Guide.Dummy.us")
 
 
 
 
65
  else:
66
+ new_ext = extinf
67
+ out_247_blocks.append((new_ext, url))
68
+
69
+ # --- EVENTS ---
70
+ elif "EVENTS" in group_title:
71
+ candidate = channel_name
72
+
73
+ # Check hardcoded abbreviation exceptions
74
+ matched_exception = False
75
+ for abbr, fixed_id in exceptions.items():
76
+ if candidate.upper().startswith(abbr):
77
+ tvg_id = fixed_id
78
+ new_ext = re.sub(r'tvg-id="[^"]*"', f'tvg-id="{tvg_id}"', extinf) \
79
+ if 'tvg-id="' in extinf else extinf + f' tvg-id="{tvg_id}"'
80
+ new_ext = apply_event_special_cases(new_ext, candidate)
81
+ out_events_blocks.append((new_ext, url))
82
+ log.append(f"βœ… EVENTS | {candidate} β†’ {tvg_id} (hardcoded {abbr} exception)")
83
+ matched_exception = True
84
+ break
85
+ if matched_exception:
86
+ continue
87
+
88
+ # Try matching against JSON
89
+ match = None
90
+ for key, data in channel_map.items():
91
+ if data.get("tvg_id") and key.lower() in candidate.lower():
92
+ match = data["tvg_id"]
93
+ break
94
+ if not match:
95
+ keys = list(channel_map.keys())
96
+ guesses = get_close_matches(candidate, keys, n=1, cutoff=0.6)
97
+ if guesses:
98
+ match = channel_map[guesses[0]].get("tvg_id")
99
+
100
+ if match:
101
+ new_ext = re.sub(r'tvg-id="[^"]*"', f'tvg-id="{match}"', extinf) \
102
+ if 'tvg-id="' in extinf else extinf + f' tvg-id="{match}"'
103
+ log.append(f"βœ… EVENTS | {candidate} β†’ {match}")
104
  else:
105
+ new_ext = re.sub(r'tvg-id="[^"]*"', 'tvg-id="Live.Event.us"', extinf) \
106
+ if 'tvg-id="' in extinf else extinf + ' tvg-id="Live.Event.us"'
107
+ log.append(f"⚠️ EVENTS | {candidate} β†’ Live.Event.us (fallback)")
108
+
109
+ new_ext = apply_event_special_cases(new_ext, candidate)
110
+ out_events_blocks.append((new_ext, url))
111
+
112
+ # Build output playlists
113
+ out_247 = "\n".join([f"{ext}\n{url}" for ext, url in out_247_blocks])
114
+ out_events = "\n".join([f"{ext}\n{url}" for ext, url in out_events_blocks])
115
+
116
+ return out_247, out_events, "\n".join(log)
117
+
118
+ # Gradio UI
119
+ def run_app(m3u_text):
120
+ out_247, out_events, log = process_m3u(m3u_text)
121
+ file_247 = random_filename(uppercase=True)
122
+ file_events = random_filename(uppercase=False)
123
+ with open(file_247, "w", encoding="utf-8") as f:
124
+ f.write(out_247)
125
+ with open(file_events, "w", encoding="utf-8") as f:
126
+ f.write(out_events)
127
+ return f"βœ… Processed!\n24/7 file: {file_247}\nEvents file: {file_events}\n\nLog:\n{log}"
128
+
129
+ iface = gr.Interface(
130
+ fn=run_app,
131
+ inputs=gr.Textbox(lines=15, placeholder="Paste your M3U playlist here..."),
132
+ outputs="text",
133
+ title="Project 1 Playlist Processor",
134
+ description="Splits 24/7 and Events playlists, applies JSON tvg-id mappings, adds special sports rules, and outputs two clean M3Us."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  )
136
 
137
  if __name__ == "__main__":
138
+ iface.launch()