wuhp commited on
Commit
fd7f1a5
Β·
verified Β·
1 Parent(s): b72bf58

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +178 -61
app.py CHANGED
@@ -3,23 +3,27 @@ import csv
3
  import requests
4
  import time
5
  import os
 
 
 
6
 
7
  # File path configuration
8
  CSV_FILE = 'endpoints.csv'
9
 
 
 
 
 
10
  def load_data():
11
  """Loads the endpoints from the CSV file."""
12
  if not os.path.exists(CSV_FILE):
13
- print(f"Error: {CSV_FILE} not found.")
14
  return []
15
 
16
  data = []
17
  try:
18
  with open(CSV_FILE, 'r', encoding='utf-8') as f:
19
- # Assumes headers: TYPE1, TYPE2, URL
20
  reader = csv.DictReader(f)
21
  for row in reader:
22
- # specific cleanup to remove potential whitespace from CSV keys/values
23
  cleaned_row = {k.strip(): v.strip() for k, v in row.items() if k}
24
  data.append(cleaned_row)
25
  return data
@@ -28,61 +32,154 @@ def load_data():
28
  return []
29
 
30
  def get_type1_choices():
31
- """Extracts unique TYPE1 categories (e.g., profile, video, comments)."""
32
  data = load_data()
33
  if not data:
34
  return []
35
- # Set comprehension to get unique values
36
  return list(set([item.get('TYPE1') for item in data if item.get('TYPE1')]))
37
 
38
  def get_type2_choices(selected_type1):
39
- """
40
- Extracts TYPE2 options based on selected TYPE1.
41
- Adds an 'ALL ...' option at the beginning.
42
- """
43
  data = load_data()
44
  if not data or not selected_type1:
45
  return []
46
 
47
- # Filter data for the selected category
48
  choices = [item.get('TYPE2') for item in data if item.get('TYPE1') == selected_type1]
49
  unique_choices = list(set(choices))
50
  unique_choices.sort()
51
 
52
- # Add the "ALL" option at the top
53
  all_option = f"ALL {selected_type1.upper()} REPORTS"
54
  return [all_option] + unique_choices
55
 
56
- def execute_reports(type1, type2):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  """
58
- Main logic to send requests to TikTok endpoints.
59
- Generator function to stream logs to the UI.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  """
61
  data = load_data()
62
  if not data:
63
  yield "❌ Error: endpoints.csv not found or empty."
64
  return
65
 
66
- targets = []
67
-
68
- # Construct the "ALL" string to check against
69
  all_check_str = f"ALL {type1.upper()} REPORTS"
70
-
71
- # Check if the user selected the "ALL" option
72
  if type2 == all_check_str:
73
- # Filter all items matching TYPE1
74
  targets = [item for item in data if item.get('TYPE1') == type1]
75
- yield f"πŸš€ Preparing to send ALL {len(targets)} reports for category: {type1}...\n"
76
  else:
77
- # Filter specific item matching TYPE1 and TYPE2
78
  targets = [item for item in data if item.get('TYPE1') == type1 and item.get('TYPE2') == type2]
79
- yield f"πŸš€ Preparing to send specific report: {type2}...\n"
80
 
81
  if not targets:
82
- yield "❌ No matching endpoints found."
83
  return
84
 
85
- # Headers to mimic a real browser (helper to avoid immediate rejection)
 
 
 
 
 
 
 
86
  headers = {
87
  "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
88
  "Referer": "https://www.tiktok.com/"
@@ -91,35 +188,31 @@ def execute_reports(type1, type2):
91
  log_history = ""
92
 
93
  for i, target in enumerate(targets):
94
- url = target.get('URL')
95
  report_name = target.get('TYPE2')
96
 
97
- if not url:
98
  continue
99
 
 
 
 
100
  try:
101
- # Send the request
102
- # Note: TikTok endpoints usually ignore GET for actions and require POST,
103
- # but based on your URL structure (params in URL), a GET might trigger the hit
104
- # or the URL is intended to be loaded.
105
- # If these are API endpoints, requests.get is standard.
106
- response = requests.get(url, headers=headers)
107
-
108
  timestamp = time.strftime("%H:%M:%S")
109
 
110
  if response.status_code == 200:
111
  log_entry = f"[{timestamp}] βœ… SENT ({i+1}/{len(targets)}): {report_name}\n"
112
  else:
113
- log_entry = f"[{timestamp}] ⚠️ STATUS {response.status_code} ({i+1}/{len(targets)}): {report_name}\n"
114
 
115
  log_history += log_entry
116
  yield log_history
117
-
118
- # Small delay to be polite to the server
119
  time.sleep(0.5)
120
 
121
  except Exception as e:
122
- log_history += f"[{time.strftime('%H:%M:%S')}] ❌ ERROR: {report_name} - {str(e)}\n"
123
  yield log_history
124
 
125
  yield log_history + "\nβœ… DONE. All tasks completed."
@@ -130,62 +223,86 @@ def execute_reports(type1, type2):
130
 
131
  with gr.Blocks(theme=gr.themes.Soft()) as app:
132
  gr.Markdown("# πŸ€– TikTok Report Bot Controller")
133
- gr.Markdown("Select a category and report type. Data is loaded from `endpoints.csv`.")
134
 
135
  with gr.Row():
 
136
  with gr.Column(scale=1):
137
- # Dropdown 1: Category (profile, video, comments)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
  t1_input = gr.Dropdown(
139
  choices=get_type1_choices(),
140
- label="Category (Type 1)",
141
- info="Select the target category",
142
  value=None,
143
  interactive=True
144
  )
145
 
146
- # Dropdown 2: Specific Report or "ALL"
147
- # We initialize it empty; it gets populated when t1_input changes
148
  t2_input = gr.Dropdown(
149
  choices=[],
150
- label="Report Type (Type 2)",
151
- info="Select a specific violation or 'ALL'",
152
  interactive=True
153
  )
154
 
155
- send_btn = gr.Button("πŸš€ Send Report(s)", variant="primary")
156
-
157
- refresh_btn = gr.Button("πŸ”„ Reload CSV Data", variant="secondary")
158
 
159
- with gr.Column(scale=2):
160
- # Output Logs
 
161
  logs_output = gr.Textbox(
162
- label="Execution Logs",
163
- placeholder="Waiting for command...",
164
- lines=20,
165
- max_lines=20,
166
- autoscroll=True
 
167
  )
168
 
169
  # ---------------------------------------------------------------------
170
  # EVENT HANDLERS
171
  # ---------------------------------------------------------------------
172
 
173
- # 1. When Category changes, update Report Type choices
174
- # This fixes the AttributeError by returning a new gr.Dropdown object
 
 
 
 
 
 
175
  def update_second_dropdown(choice):
176
  new_choices = get_type2_choices(choice)
177
  return gr.Dropdown(choices=new_choices, value=new_choices[0] if new_choices else None)
178
 
179
  t1_input.change(fn=update_second_dropdown, inputs=t1_input, outputs=t2_input)
180
 
181
- # 2. Send Button Logic
182
  send_btn.click(
183
  fn=execute_reports,
184
- inputs=[t1_input, t2_input],
185
  outputs=logs_output
186
  )
187
 
188
- # 3. Refresh Button Logic
189
  def refresh_ui():
190
  new_t1 = get_type1_choices()
191
  return gr.Dropdown(choices=new_t1), gr.Dropdown(choices=[], value=None)
 
3
  import requests
4
  import time
5
  import os
6
+ import re
7
+ import json
8
+ from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
9
 
10
  # File path configuration
11
  CSV_FILE = 'endpoints.csv'
12
 
13
+ # -------------------------------------------------------------------------
14
+ # DATA LOADING & UTILS
15
+ # -------------------------------------------------------------------------
16
+
17
  def load_data():
18
  """Loads the endpoints from the CSV file."""
19
  if not os.path.exists(CSV_FILE):
 
20
  return []
21
 
22
  data = []
23
  try:
24
  with open(CSV_FILE, 'r', encoding='utf-8') as f:
 
25
  reader = csv.DictReader(f)
26
  for row in reader:
 
27
  cleaned_row = {k.strip(): v.strip() for k, v in row.items() if k}
28
  data.append(cleaned_row)
29
  return data
 
32
  return []
33
 
34
  def get_type1_choices():
35
+ """Extracts unique TYPE1 categories."""
36
  data = load_data()
37
  if not data:
38
  return []
 
39
  return list(set([item.get('TYPE1') for item in data if item.get('TYPE1')]))
40
 
41
  def get_type2_choices(selected_type1):
42
+ """Extracts TYPE2 options based on selected TYPE1."""
 
 
 
43
  data = load_data()
44
  if not data or not selected_type1:
45
  return []
46
 
 
47
  choices = [item.get('TYPE2') for item in data if item.get('TYPE1') == selected_type1]
48
  unique_choices = list(set(choices))
49
  unique_choices.sort()
50
 
 
51
  all_option = f"ALL {selected_type1.upper()} REPORTS"
52
  return [all_option] + unique_choices
53
 
54
+ # -------------------------------------------------------------------------
55
+ # LINK RESOLVER (Scraper)
56
+ # -------------------------------------------------------------------------
57
+
58
+ def resolve_tiktok_link(url):
59
+ """
60
+ Attempts to fetch Object ID, Owner ID, and SecUID from a TikTok URL.
61
+ Returns a dictionary of values to populate the UI.
62
+ """
63
+ headers = {
64
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
65
+ "Referer": "https://www.tiktok.com/"
66
+ }
67
+
68
+ try:
69
+ # Determine if it's a video or profile
70
+ is_video = "/video/" in url
71
+ response = requests.get(url, headers=headers, timeout=10)
72
+ html = response.text
73
+
74
+ data = {
75
+ "object_id": "",
76
+ "owner_id": "",
77
+ "sec_uid": "",
78
+ "nickname": ""
79
+ }
80
+
81
+ # Regex to find standard TikTok JSON data in HTML
82
+ # Checking SIGI_STATE or generic JSON structures
83
+ # Note: TikTok changes this often, this is a best-effort approach.
84
+
85
+ # 1. Try to find User ID (owner_id)
86
+ user_id_match = re.search(r'"userId":"(\d+)"', html)
87
+ if user_id_match:
88
+ data["owner_id"] = user_id_match.group(1)
89
+
90
+ # 2. Try to find SecUID
91
+ sec_uid_match = re.search(r'"secUid":"(MS4wLjAB[^"]+)"', html)
92
+ if sec_uid_match:
93
+ data["sec_uid"] = sec_uid_match.group(1)
94
+
95
+ # 3. Try to find Nickname
96
+ nickname_match = re.search(r'"uniqueId":"([^"]+)"', html)
97
+ if nickname_match:
98
+ data["nickname"] = nickname_match.group(1)
99
+
100
+ # 4. Determine Object ID
101
+ if is_video:
102
+ # For video, object_id is the video ID found in URL
103
+ vid_match = re.search(r'/video/(\d+)', url)
104
+ if vid_match:
105
+ data["object_id"] = vid_match.group(1)
106
+ else:
107
+ # For profile, object_id is usually the same as owner_id (User ID)
108
+ data["object_id"] = data["owner_id"]
109
+
110
+ return (
111
+ data["object_id"],
112
+ data["owner_id"],
113
+ data["sec_uid"],
114
+ data["nickname"],
115
+ f"βœ… Success! Found: {data['nickname']} (ID: {data['object_id']})"
116
+ )
117
+
118
+ except Exception as e:
119
+ return "", "", "", "", f"❌ Error resolving link: {str(e)}"
120
+
121
+ # -------------------------------------------------------------------------
122
+ # REPORT EXECUTION LOGIC
123
+ # -------------------------------------------------------------------------
124
+
125
+ def patch_url(original_url, new_params):
126
  """
127
+ Replaces query parameters in the original URL with new values.
128
+ """
129
+ parsed = urlparse(original_url)
130
+ query_params = parse_qs(parsed.query)
131
+
132
+ # Mapping UI fields to URL parameters
133
+ # Note: 'target' is often used interchangeably with object_id in some endpoints
134
+ mapping = {
135
+ 'object_id': new_params.get('object_id'),
136
+ 'owner_id': new_params.get('owner_id'),
137
+ 'secUid': new_params.get('sec_uid'),
138
+ 'nickname': new_params.get('nickname'),
139
+ 'target': new_params.get('object_id'), # Often target == object_id
140
+ }
141
+
142
+ # Only update if the user provided a value
143
+ for param_key, new_value in mapping.items():
144
+ if new_value and new_value.strip():
145
+ query_params[param_key] = [new_value.strip()]
146
+
147
+ # Rebuild URL
148
+ new_query = urlencode(query_params, doseq=True)
149
+ new_url = urlunparse(parsed._replace(query=new_query))
150
+ return new_url
151
+
152
+ def execute_reports(type1, type2, obj_id, own_id, sec_uid, nick):
153
+ """
154
+ Main execution handler.
155
+ Streams logs to the UI.
156
  """
157
  data = load_data()
158
  if not data:
159
  yield "❌ Error: endpoints.csv not found or empty."
160
  return
161
 
162
+ # Check for "ALL" selection
 
 
163
  all_check_str = f"ALL {type1.upper()} REPORTS"
 
 
164
  if type2 == all_check_str:
 
165
  targets = [item for item in data if item.get('TYPE1') == type1]
166
+ yield f"πŸš€ Preparing ALL reports for: {type1}...\n"
167
  else:
 
168
  targets = [item for item in data if item.get('TYPE1') == type1 and item.get('TYPE2') == type2]
169
+ yield f"πŸš€ Preparing specific report: {type2}...\n"
170
 
171
  if not targets:
172
+ yield "❌ No matching endpoints found in CSV."
173
  return
174
 
175
+ # Pack the custom target data
176
+ custom_target_data = {
177
+ 'object_id': obj_id,
178
+ 'owner_id': own_id,
179
+ 'sec_uid': sec_uid,
180
+ 'nickname': nick
181
+ }
182
+
183
  headers = {
184
  "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
185
  "Referer": "https://www.tiktok.com/"
 
188
  log_history = ""
189
 
190
  for i, target in enumerate(targets):
191
+ original_url = target.get('URL')
192
  report_name = target.get('TYPE2')
193
 
194
+ if not original_url:
195
  continue
196
 
197
+ # Patch the URL with new target info
198
+ final_url = patch_url(original_url, custom_target_data)
199
+
200
  try:
201
+ # Send Request
202
+ response = requests.get(final_url, headers=headers)
 
 
 
 
 
203
  timestamp = time.strftime("%H:%M:%S")
204
 
205
  if response.status_code == 200:
206
  log_entry = f"[{timestamp}] βœ… SENT ({i+1}/{len(targets)}): {report_name}\n"
207
  else:
208
+ log_entry = f"[{timestamp}] ⚠️ STATUS {response.status_code}: {report_name}\n"
209
 
210
  log_history += log_entry
211
  yield log_history
 
 
212
  time.sleep(0.5)
213
 
214
  except Exception as e:
215
+ log_history += f"[{time.strftime('%H:%M:%S')}] ❌ ERROR: {str(e)}\n"
216
  yield log_history
217
 
218
  yield log_history + "\nβœ… DONE. All tasks completed."
 
223
 
224
  with gr.Blocks(theme=gr.themes.Soft()) as app:
225
  gr.Markdown("# πŸ€– TikTok Report Bot Controller")
 
226
 
227
  with gr.Row():
228
+ # --- LEFT COLUMN: Configuration ---
229
  with gr.Column(scale=1):
230
+ gr.Markdown("### 1. Target Configuration")
231
+ gr.Markdown("Enter the details of the User/Video you want to report. Use the resolver to auto-fill.")
232
+
233
+ with gr.Group():
234
+ link_input = gr.Textbox(label="TikTok Link (Profile or Video)", placeholder="https://www.tiktok.com/@username...")
235
+ resolve_btn = gr.Button("πŸ” Resolve IDs from Link", size="sm")
236
+ resolve_status = gr.Markdown("")
237
+
238
+ with gr.Row():
239
+ # Inputs that will be passed to logic
240
+ obj_id_input = gr.Textbox(label="Target Object ID", info="User ID (for profiles) or Video ID")
241
+ own_id_input = gr.Textbox(label="Target Owner ID", info="The User ID of the account owner")
242
+
243
+ with gr.Row():
244
+ sec_uid_input = gr.Textbox(label="SecUID", info="Starts with MS4w...")
245
+ nick_input = gr.Textbox(label="Nickname", info="Username (no @)")
246
+
247
+ gr.Markdown("---")
248
+ gr.Markdown("### 2. Report Settings")
249
+
250
+ # Dropdown 1: Category
251
  t1_input = gr.Dropdown(
252
  choices=get_type1_choices(),
253
+ label="Category",
 
254
  value=None,
255
  interactive=True
256
  )
257
 
258
+ # Dropdown 2: Specific Report
 
259
  t2_input = gr.Dropdown(
260
  choices=[],
261
+ label="Report Type",
 
262
  interactive=True
263
  )
264
 
265
+ send_btn = gr.Button("πŸš€ Send Report(s)", variant="primary", size="lg")
266
+ refresh_btn = gr.Button("πŸ”„ Reload CSV", variant="secondary")
 
267
 
268
+ # --- RIGHT COLUMN: Logs ---
269
+ with gr.Column(scale=1):
270
+ gr.Markdown("### 3. Execution Logs")
271
  logs_output = gr.Textbox(
272
+ show_label=False,
273
+ placeholder="Logs will appear here...",
274
+ lines=25,
275
+ max_lines=25,
276
+ autoscroll=True,
277
+ elem_id="log-box"
278
  )
279
 
280
  # ---------------------------------------------------------------------
281
  # EVENT HANDLERS
282
  # ---------------------------------------------------------------------
283
 
284
+ # 1. Resolve Link Button
285
+ resolve_btn.click(
286
+ fn=resolve_tiktok_link,
287
+ inputs=link_input,
288
+ outputs=[obj_id_input, own_id_input, sec_uid_input, nick_input, resolve_status]
289
+ )
290
+
291
+ # 2. Update Report Type Choices on Category Change
292
  def update_second_dropdown(choice):
293
  new_choices = get_type2_choices(choice)
294
  return gr.Dropdown(choices=new_choices, value=new_choices[0] if new_choices else None)
295
 
296
  t1_input.change(fn=update_second_dropdown, inputs=t1_input, outputs=t2_input)
297
 
298
+ # 3. Send Button
299
  send_btn.click(
300
  fn=execute_reports,
301
+ inputs=[t1_input, t2_input, obj_id_input, own_id_input, sec_uid_input, nick_input],
302
  outputs=logs_output
303
  )
304
 
305
+ # 4. Refresh Button
306
  def refresh_ui():
307
  new_t1 = get_type1_choices()
308
  return gr.Dropdown(choices=new_t1), gr.Dropdown(choices=[], value=None)