File size: 7,408 Bytes
48a8766 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | import os
import time
import gradio as gr
from byteplussdkarkruntime import Ark
import requests
from datetime import datetime
# Get API key from Hugging Face secret "Key" and CLEAN it
API_KEY = os.environ.get("Key", "").strip()
print(f"✅ Key loaded, length: {len(API_KEY)}")
# Initialize client with proxy
client = Ark(
base_url="https://1hit.no/proxy/proxy.php",
api_key=API_KEY,
timeout=30.0,
max_retries=3,
)
# In-memory upload queue
upload_queue = []
# ---------------- MANUAL POLLING ----------------
def manual_poll(task_id):
if not task_id:
return "❌ Please enter a task ID"
try:
url = f"https://1hit.no/proxy/proxy.php?task_id={task_id}"
headers = {"Authorization": f"Bearer {API_KEY}"}
requests.get(url, headers=headers)
io_response = requests.get("https://1hit.no/proxy/io.json")
io_data = io_response.json()
if task_id in io_data:
status = io_data[task_id].get('status')
video_url = io_data[task_id].get('video_url')
result = f"✅ Task {task_id}\nStatus: {status}\n"
if video_url:
result += f"Video URL: {video_url}\n"
if 'response' in io_data[task_id]:
result += f"Full response: {io_data[task_id]['response']}\n"
return result
else:
return f"❌ Task {task_id} not found in io.json"
except Exception as e:
return f"❌ Error: {str(e)}"
def get_raw_json():
try:
r = requests.get("https://1hit.no/proxy/io.json")
return r.json()
except:
return {"error": "Could not fetch io.json"}
# ---------------- WATCH & DOWNLOAD ----------------
def get_task_list():
try:
r = requests.get("https://1hit.no/proxy/io.json")
data = r.json()
tasks = []
for task_id, task_data in data.items():
status = task_data.get('status', 'unknown')
created = task_data.get('created', 0)
time_str = datetime.fromtimestamp(created).strftime('%Y-%m-%d %H:%M:%S')
prompt = task_data.get('request', {}).get('content', [{}])[0].get('text', 'No prompt')
prompt_preview = prompt[:50] + '...' if len(prompt) > 50 else prompt
tasks.append({
"id": task_id,
"status": status,
"created": time_str,
"created_ts": created,
"video_url": task_data.get('video_url', ''),
"prompt": prompt_preview
})
tasks.sort(key=lambda x: x['created_ts'], reverse=True)
return tasks
except Exception as e:
print(f"Error getting task list: {e}")
return []
def refresh_task_list():
tasks = get_task_list()
choices = [f"{t['id']} | {t['status']} | {t['created']}" for t in tasks]
return gr.Dropdown.update(choices=choices, value=choices[0] if choices else None)
def load_task(selection):
if not selection:
return "No task selected", None, None
task_id = selection.split(' | ')[0]
try:
r = requests.get("https://1hit.no/proxy/io.json")
data = r.json()
task = data.get(task_id, {})
video_url = task.get('video_url', '')
status = task.get('status', 'unknown')
prompt = task.get('request', {}).get('content', [{}])[0].get('text', 'No prompt')
created = task.get('created', 0)
created_str = datetime.fromtimestamp(created).strftime('%Y-%m-%d %H:%M:%S')
response = task.get('response', {})
info = f"### Task Details\n\n**Task ID:** `{task_id}`\n**Status:** `{status}`\n**Created:** {created_str}\n**Prompt:** {prompt}\n\n"
if status == 'succeeded' and video_url:
info += f"✅ **Video ready!**\n**Download:** [Click here]({video_url})\n"
elif status == 'failed':
error = task.get('response', {}).get('error', 'Unknown error')
info += f"❌ **Failed:** {error}\n"
return info, video_url, video_url
except Exception as e:
return f"Error loading task: {e}", None, None
# ---------------- UPLOAD QUEUE ----------------
def add_to_queue(file_path, queue_state):
if file_path:
queue_state = queue_state or []
if file_path not in queue_state:
queue_state.append(file_path)
return [[q] for q in queue_state], queue_state
def clear_queue():
return [], []
# ---------------- GRADIO APP ----------------
with gr.Blocks(title="BytePlus Video Gallery", theme=gr.themes.Soft()) as demo:
# ---------------- Manual Polling Tab ----------------
with gr.TabItem("🔧 Manual Polling"):
gr.Markdown("### 🔧 Manual Polling & Debug")
with gr.Row():
with gr.Column():
gr.Markdown("#### Poll Specific Task")
task_id_input = gr.Textbox(label="Task ID", placeholder="Enter task ID (cgt-...)")
poll_btn = gr.Button("🔄 Poll Now", variant="primary")
poll_result = gr.Textbox(label="Poll Result", lines=10)
poll_btn.click(fn=manual_poll, inputs=task_id_input, outputs=poll_result)
with gr.Column():
gr.Markdown("#### Current io.json")
refresh_btn = gr.Button("🔄 Refresh io.json", variant="secondary")
raw_json = gr.JSON(label="io.json Contents")
refresh_btn.click(fn=get_raw_json, outputs=raw_json)
gr.Markdown("""
### 📝 Instructions
1. Copy a task ID from above (like `cgt-20260217072358-hszt9`)
2. Paste it in the Task ID field
3. Click "Poll Now" to force an update
4. Check io.json to see if status changed
""")
# ---------------- Watch & Download Tab ----------------
with gr.TabItem("📺 Watch & Download"):
gr.Markdown("### 📺 Browse and Download Generated Videos")
with gr.Row():
with gr.Column(scale=1):
refresh_list_btn = gr.Button("🔄 Refresh Task List", variant="primary")
task_list = gr.Dropdown(label="Select Task", choices=[], interactive=True)
with gr.Column(scale=2):
task_info = gr.Markdown("Select a task to view details")
with gr.Row():
with gr.Column():
video_player = gr.Video(label="Video Player")
download_link = gr.File(label="Download Video")
put_queue_btn = gr.Button("📥 Put in Upload Queue")
queue_display = gr.Dataframe(
headers=["Upload Queue"], value=[], interactive=False, max_rows=10
)
clear_queue_btn = gr.Button("🗑️ Clear Queue")
# Button connections
refresh_list_btn.click(fn=refresh_task_list, outputs=task_list)
task_list.change(fn=load_task, inputs=task_list, outputs=[task_info, video_player, download_link])
put_queue_btn.click(fn=add_to_queue, inputs=[download_link, gr.State(upload_queue)], outputs=[queue_display, gr.State(upload_queue)])
clear_queue_btn.click(fn=clear_queue, outputs=[queue_display])
# Auto-load task list when tab is opened
demo.load(fn=refresh_task_list, outputs=task_list)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0") |