Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import time | |
| import random | |
| from urllib.parse import urlparse, parse_qs | |
| from threading import Event | |
| from http.cookies import SimpleCookie | |
| class ReportSender: | |
| def __init__(self): | |
| self.stop_event = Event() | |
| self.is_running = False | |
| def execute_reports_loop(self, master_url_batch, cookie_header, batch_count, current_status): | |
| if self.is_running: | |
| yield gr.update(), True, "Running" | |
| return | |
| if not master_url_batch or not master_url_batch.strip(): | |
| yield "Error: Report URL Batch is empty.", False, "Stopped" | |
| return | |
| if not cookie_header or not cookie_header.strip(): | |
| yield "Error: Cookie Header is missing.", False, "Stopped" | |
| return | |
| self.stop_event.clear() | |
| self.is_running = True | |
| master_urls = [u.strip() for u in master_url_batch.split('\n') if u.strip()] | |
| num_urls = len(master_urls) | |
| if not master_urls: | |
| self.is_running = False | |
| yield "Error: No valid URLs found.", False, "Stopped" | |
| return | |
| USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" | |
| csrf_token = "" | |
| try: | |
| cookie_parser = SimpleCookie() | |
| cookie_parser.load(cookie_header) | |
| if 'tt_csrf_token' in cookie_parser: | |
| csrf_token = cookie_parser['tt_csrf_token'].value | |
| except Exception: | |
| pass | |
| headers = { | |
| "User-Agent": USER_AGENT, | |
| "Referer": "https://www.tiktok.com/", | |
| "Cookie": cookie_header, | |
| "Accept-Encoding": "gzip, deflate, br, zstd", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| } | |
| if csrf_token: | |
| headers["tt-csrf-token"] = csrf_token | |
| headers["x-secsdk-csrf-token"] = csrf_token | |
| log_history = f" Starting batch loop for {num_urls} report types...\n" | |
| if csrf_token: | |
| log_history += " CSRF Token applied.\n" | |
| else: | |
| log_history += " WARNING: No 'tt_csrf_token' found.\n" | |
| yield log_history, self.is_running, "Running" | |
| total_requests_sent = 0 | |
| while not self.stop_event.is_set(): | |
| for i, final_url in enumerate(master_urls): | |
| if self.stop_event.is_set(): | |
| break | |
| delay = random.uniform(1, 3) | |
| time.sleep(delay) | |
| try: | |
| query_params = parse_qs(urlparse(final_url).query) | |
| reason_code = query_params.get('reason', ['UNKNOWN'])[0] | |
| report_type = query_params.get('report_type', ['??'])[0] | |
| report_name = f"Reason: {reason_code} / Type: {report_type.upper()}" | |
| except Exception: | |
| report_name = f"URL #{i+1}" | |
| try: | |
| response = requests.get(final_url, headers=headers, timeout=10) | |
| timestamp = time.strftime("%H:%M:%S") | |
| total_requests_sent += 1 | |
| is_empty_success = response.status_code == 200 and not response.text.strip() | |
| is_json_success = False | |
| status_code_msg = "No Message" | |
| status_code_val = "???" | |
| if not is_empty_success: | |
| try: | |
| parsed = response.json() | |
| status_code_val = parsed.get('status_code', -1) | |
| status_code_msg = parsed.get('status_message', 'No Message') | |
| if status_code_val == 0: | |
| is_json_success = True | |
| except: | |
| status_code_msg = "Non-JSON Response" | |
| status_code_val = response.status_code | |
| is_success = is_empty_success or is_json_success | |
| if is_success: | |
| status_icon = "SUCCESS" | |
| status_msg = "Request Sent" | |
| else: | |
| status_icon = "FAIL" | |
| if not is_empty_success and not is_json_success: | |
| status_msg = f"Failed ({response.status_code}): {status_code_msg}" | |
| else: | |
| status_msg = f"Failed ({response.status_code})" | |
| log_entry = ( | |
| f"[{timestamp}] {status_icon} REQ ({total_requests_sent}): {report_name}\n" | |
| f" Status: {status_msg} | Delay: {delay:.2f}s\n" | |
| ) | |
| log_history += log_entry | |
| yield log_history, self.is_running, "Running" | |
| except requests.exceptions.Timeout: | |
| log_history += f"[{time.strftime('%H:%M:%S')}] ERROR: Timeout.\n" | |
| yield log_history, self.is_running, "Running" | |
| except Exception as e: | |
| log_history += f"[{time.strftime('%H:%M:%S')}] ERROR: {str(e)}\n" | |
| yield log_history, self.is_running, "Running" | |
| if batch_count != -1: | |
| batch_count -= 1 | |
| if batch_count <= 0: | |
| self.stop_event.set() | |
| self.is_running = False | |
| log_history += f"\n\nSTOPPED. Total Requests Sent: {total_requests_sent}." | |
| yield log_history, self.is_running, "Stopped" | |
| def stop_execution(self): | |
| self.stop_event.set() | |
| self.is_running = False | |
| sender = ReportSender() | |
| with gr.Blocks() as app: | |
| gr.Markdown("# TikTok spam report") | |
| is_running_state = gr.State(False) | |
| current_status = gr.State("Stopped") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 1. Request Context") | |
| cookie_input = gr.Textbox( | |
| label="Full Browser Cookie Header", | |
| lines=5, max_lines=5 | |
| ) | |
| gr.Markdown("### 2. Report URLs Batch") | |
| master_url_batch_input = gr.Textbox( | |
| label="Master Signed Report URLs", | |
| lines=5, max_lines=10 | |
| ) | |
| gr.Markdown("### 3. Loop Control") | |
| with gr.Row(equal_height=False): | |
| batch_count_input = gr.Number( | |
| label="Batch Count (-1 for infinite)", | |
| value=-1, scale=1 | |
| ) | |
| status_box = gr.Textbox(label="Status", value="Stopped", interactive=False, scale=1) | |
| with gr.Row(): | |
| send_btn = gr.Button("START Loop", variant="primary", scale=1) | |
| stop_btn = gr.Button("STOP Loop", variant="stop", scale=1, interactive=False) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 4. Execution Logs") | |
| logs_output = gr.Textbox( | |
| show_label=False, | |
| lines=20, max_lines=20, | |
| autoscroll=True, | |
| elem_id="log-box" | |
| ) | |
| send_btn.click( | |
| fn=lambda: (gr.update(interactive=False), gr.update(interactive=True)), | |
| outputs=[send_btn, stop_btn], | |
| queue=False | |
| ) | |
| run_event = send_btn.click( | |
| fn=sender.execute_reports_loop, | |
| inputs=[master_url_batch_input, cookie_input, batch_count_input, current_status], | |
| outputs=[logs_output, is_running_state, status_box], | |
| ) | |
| run_event.then( | |
| fn=lambda: (gr.update(interactive=True), gr.update(interactive=False)), | |
| outputs=[send_btn, stop_btn] | |
| ) | |
| stop_btn.click( | |
| fn=sender.stop_execution, | |
| inputs=None, | |
| outputs=None, | |
| cancels=[run_event], | |
| queue=False | |
| ).then( | |
| fn=lambda: (gr.update(interactive=True), gr.update(interactive=False), "Stopped"), | |
| outputs=[send_btn, stop_btn, status_box], | |
| queue=False | |
| ) | |
| status_box.change(lambda x: x, inputs=status_box, outputs=current_status) | |
| if __name__ == "__main__": | |
| app.launch(theme=gr.themes.Soft()) |