File size: 7,408 Bytes
48a8766
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import os
import time
import gradio as gr
from byteplussdkarkruntime import Ark
import requests
from datetime import datetime

# Get API key from Hugging Face secret "Key" and CLEAN it
API_KEY = os.environ.get("Key", "").strip()
print(f"✅ Key loaded, length: {len(API_KEY)}")

# Initialize client with proxy
client = Ark(
    base_url="https://1hit.no/proxy/proxy.php",
    api_key=API_KEY,
    timeout=30.0,
    max_retries=3,
)

# In-memory upload queue
upload_queue = []

# ---------------- MANUAL POLLING ----------------
def manual_poll(task_id):
    if not task_id:
        return "❌ Please enter a task ID"
    
    try:
        url = f"https://1hit.no/proxy/proxy.php?task_id={task_id}"
        headers = {"Authorization": f"Bearer {API_KEY}"}
        requests.get(url, headers=headers)
        
        io_response = requests.get("https://1hit.no/proxy/io.json")
        io_data = io_response.json()
        
        if task_id in io_data:
            status = io_data[task_id].get('status')
            video_url = io_data[task_id].get('video_url')
            result = f"✅ Task {task_id}\nStatus: {status}\n"
            if video_url:
                result += f"Video URL: {video_url}\n"
            if 'response' in io_data[task_id]:
                result += f"Full response: {io_data[task_id]['response']}\n"
            return result
        else:
            return f"❌ Task {task_id} not found in io.json"
    except Exception as e:
        return f"❌ Error: {str(e)}"

def get_raw_json():
    try:
        r = requests.get("https://1hit.no/proxy/io.json")
        return r.json()
    except:
        return {"error": "Could not fetch io.json"}

# ---------------- WATCH & DOWNLOAD ----------------
def get_task_list():
    try:
        r = requests.get("https://1hit.no/proxy/io.json")
        data = r.json()
        tasks = []
        for task_id, task_data in data.items():
            status = task_data.get('status', 'unknown')
            created = task_data.get('created', 0)
            time_str = datetime.fromtimestamp(created).strftime('%Y-%m-%d %H:%M:%S')
            prompt = task_data.get('request', {}).get('content', [{}])[0].get('text', 'No prompt')
            prompt_preview = prompt[:50] + '...' if len(prompt) > 50 else prompt
            tasks.append({
                "id": task_id,
                "status": status,
                "created": time_str,
                "created_ts": created,
                "video_url": task_data.get('video_url', ''),
                "prompt": prompt_preview
            })
        tasks.sort(key=lambda x: x['created_ts'], reverse=True)
        return tasks
    except Exception as e:
        print(f"Error getting task list: {e}")
        return []

def refresh_task_list():
    tasks = get_task_list()
    choices = [f"{t['id']} | {t['status']} | {t['created']}" for t in tasks]
    return gr.Dropdown.update(choices=choices, value=choices[0] if choices else None)

def load_task(selection):
    if not selection:
        return "No task selected", None, None
    
    task_id = selection.split(' | ')[0]
    try:
        r = requests.get("https://1hit.no/proxy/io.json")
        data = r.json()
        task = data.get(task_id, {})
        video_url = task.get('video_url', '')
        status = task.get('status', 'unknown')
        prompt = task.get('request', {}).get('content', [{}])[0].get('text', 'No prompt')
        created = task.get('created', 0)
        created_str = datetime.fromtimestamp(created).strftime('%Y-%m-%d %H:%M:%S')
        response = task.get('response', {})
        
        info = f"### Task Details\n\n**Task ID:** `{task_id}`\n**Status:** `{status}`\n**Created:** {created_str}\n**Prompt:** {prompt}\n\n"
        if status == 'succeeded' and video_url:
            info += f"✅ **Video ready!**\n**Download:** [Click here]({video_url})\n"
        elif status == 'failed':
            error = task.get('response', {}).get('error', 'Unknown error')
            info += f"❌ **Failed:** {error}\n"
        
        return info, video_url, video_url
    except Exception as e:
        return f"Error loading task: {e}", None, None

# ---------------- UPLOAD QUEUE ----------------
def add_to_queue(file_path, queue_state):
    if file_path:
        queue_state = queue_state or []
        if file_path not in queue_state:
            queue_state.append(file_path)
    return [[q] for q in queue_state], queue_state

def clear_queue():
    return [], []

# ---------------- GRADIO APP ----------------
with gr.Blocks(title="BytePlus Video Gallery", theme=gr.themes.Soft()) as demo:
    
    # ---------------- Manual Polling Tab ----------------
    with gr.TabItem("🔧 Manual Polling"):
        gr.Markdown("### 🔧 Manual Polling & Debug")
        with gr.Row():
            with gr.Column():
                gr.Markdown("#### Poll Specific Task")
                task_id_input = gr.Textbox(label="Task ID", placeholder="Enter task ID (cgt-...)")
                poll_btn = gr.Button("🔄 Poll Now", variant="primary")
                poll_result = gr.Textbox(label="Poll Result", lines=10)
                poll_btn.click(fn=manual_poll, inputs=task_id_input, outputs=poll_result)
            with gr.Column():
                gr.Markdown("#### Current io.json")
                refresh_btn = gr.Button("🔄 Refresh io.json", variant="secondary")
                raw_json = gr.JSON(label="io.json Contents")
                refresh_btn.click(fn=get_raw_json, outputs=raw_json)
        gr.Markdown("""
        ### 📝 Instructions
        1. Copy a task ID from above (like `cgt-20260217072358-hszt9`)
        2. Paste it in the Task ID field
        3. Click "Poll Now" to force an update
        4. Check io.json to see if status changed
        """)
    
    # ---------------- Watch & Download Tab ----------------
    with gr.TabItem("📺 Watch & Download"):
        gr.Markdown("### 📺 Browse and Download Generated Videos")
        
        with gr.Row():
            with gr.Column(scale=1):
                refresh_list_btn = gr.Button("🔄 Refresh Task List", variant="primary")
                task_list = gr.Dropdown(label="Select Task", choices=[], interactive=True)
            with gr.Column(scale=2):
                task_info = gr.Markdown("Select a task to view details")
        
        with gr.Row():
            with gr.Column():
                video_player = gr.Video(label="Video Player")
                download_link = gr.File(label="Download Video")
                put_queue_btn = gr.Button("📥 Put in Upload Queue")
                queue_display = gr.Dataframe(
                    headers=["Upload Queue"], value=[], interactive=False, max_rows=10
                )
                clear_queue_btn = gr.Button("🗑️ Clear Queue")
        
        # Button connections
        refresh_list_btn.click(fn=refresh_task_list, outputs=task_list)
        task_list.change(fn=load_task, inputs=task_list, outputs=[task_info, video_player, download_link])
        put_queue_btn.click(fn=add_to_queue, inputs=[download_link, gr.State(upload_queue)], outputs=[queue_display, gr.State(upload_queue)])
        clear_queue_btn.click(fn=clear_queue, outputs=[queue_display])
        
        # Auto-load task list when tab is opened
        demo.load(fn=refresh_task_list, outputs=task_list)

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0")