| import os |
| import requests |
| import gradio as gr |
| import logging |
| import shutil |
| import uuid |
| import datetime |
| from google.oauth2.credentials import Credentials |
| from googleapiclient.discovery import build |
| from googleapiclient.http import MediaFileUpload |
| from urllib.parse import urlparse, unquote |
|
|
| |
| |
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| def get_drive_service(): |
| client_id = os.environ.get("G_CLIENT_ID") |
| client_secret = os.environ.get("G_CLIENT_SECRET") |
| refresh_token = os.environ.get("G_REFRESH_TOKEN") |
| |
| if not all([client_id, client_secret, refresh_token]): |
| raise EnvironmentError("❌ 缺少必要的 OAuth 环境变量") |
|
|
| creds = Credentials( |
| token=None, |
| refresh_token=refresh_token, |
| token_uri="https://oauth2.googleapis.com/token", |
| client_id=client_id, |
| client_secret=client_secret |
| ) |
| return build("drive", "v3", credentials=creds) |
|
|
| def get_smart_filename(response, url): |
| |
| content_disposition = response.headers.get("Content-Disposition") |
| if content_disposition: |
| import re |
| fname = re.findall('filename="?([^"]+)"?', content_disposition) |
| if fname: |
| return unquote(fname[0]) |
| |
| |
| parsed = urlparse(url) |
| path_name = os.path.basename(unquote(parsed.path)) |
| |
| |
| if not path_name or len(path_name) < 3 or path_name.lower() in ['raw', 'blob', 'file', 'download']: |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| return f"video_{timestamp}.mp4" |
| |
| return path_name |
|
|
| def get_or_create_date_folder(service, parent_id=None): |
| """ |
| 在 parent_id 下查找或创建名为 'YYYY-MM-DD' 的文件夹 |
| """ |
| folder_name = datetime.datetime.now().strftime("%Y-%m-%d") |
| query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and trashed=false" |
| if parent_id: |
| query += f" and '{parent_id}' in parents" |
| |
| |
| results = service.files().list(q=query, fields="files(id)").execute() |
| files = results.get('files', []) |
| |
| if files: |
| logger.info(f"📂 找到已有日期文件夹: {folder_name} ({files[0]['id']})") |
| return files[0]['id'] |
| else: |
| |
| metadata = { |
| 'name': folder_name, |
| 'mimeType': 'application/vnd.google-apps.folder' |
| } |
| if parent_id: |
| metadata['parents'] = [parent_id] |
| |
| folder = service.files().create(body=metadata, fields='id').execute() |
| logger.info(f"📂 创建新日期文件夹: {folder_name} ({folder['id']})") |
| return folder['id'] |
|
|
| |
| |
| |
| def process_upload(file_url, password, progress=gr.Progress()): |
| |
| correct_pass = os.environ.get("ACCESS_PASSWORD") |
| |
| if correct_pass and password != correct_pass: |
| return {"status": "error", "message": "❌ 密码错误,拒绝访问。"} |
| |
| if not file_url: |
| return {"status": "error", "message": "❌ 错误: URL 为空"} |
|
|
| temp_path = None |
| try: |
| |
| service = get_drive_service() |
|
|
| |
| progress(0.1, desc="🚀 连接资源...") |
| with requests.get(file_url, stream=True, headers={'User-Agent': 'Mozilla/5.0'}) as response: |
| response.raise_for_status() |
| |
| filename = get_smart_filename(response, file_url) |
| total_size = int(response.headers.get('Content-Length', 0)) |
| |
| temp_path = f"/tmp/{uuid.uuid4()}_{filename}" |
| logger.info(f"📥 下载到: {temp_path}") |
| |
| downloaded = 0 |
| with open(temp_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=1024*1024): |
| if chunk: |
| f.write(chunk) |
| downloaded += len(chunk) |
| if total_size > 0: |
| progress((downloaded/total_size)*0.4, desc="📥 下载中...") |
|
|
| |
| root_folder_id = os.environ.get("GDRIVE_FOLDER_ID", "").strip() or None |
| |
| target_folder_id = get_or_create_date_folder(service, root_folder_id) |
|
|
| |
| progress(0.5, desc="☁️ 上传到 Google Drive...") |
| metadata = {'name': filename, 'parents': [target_folder_id]} |
| |
| media = MediaFileUpload(temp_path, resumable=True, chunksize=5*1024*1024) |
| |
| request = service.files().create( |
| body=metadata, media_body=media, fields='id, webContentLink, webViewLink' |
| ) |
| |
| response = None |
| while response is None: |
| status, response = request.next_chunk() |
| if status: |
| progress(0.5 + (status.progress()*0.5), desc="☁️ 上传中...") |
|
|
| |
| file_id = response.get('id') |
| |
| |
| try: |
| service.permissions().create( |
| fileId=file_id, body={'role': 'reader', 'type': 'anyone'} |
| ).execute() |
| except: pass |
|
|
| |
| direct_link = response.get('webContentLink', '') |
| view_link = response.get('webViewLink', '') |
| |
| |
| os.remove(temp_path) |
|
|
| |
| return { |
| "status": "success", |
| "filename": filename, |
| "file_id": file_id, |
| "download_link": direct_link, |
| "view_link": view_link, |
| "folder": datetime.datetime.now().strftime("%Y-%m-%d") |
| } |
|
|
| except Exception as e: |
| logger.error(f"❌: {e}", exc_info=True) |
| if temp_path and os.path.exists(temp_path): |
| os.remove(temp_path) |
| return {"status": "error", "message": str(e)} |
|
|
| |
| |
| |
| def ui_wrapper(url, pwd): |
| result = process_upload(url, pwd) |
| if result.get("status") == "success": |
| return ( |
| f"✅ **成功!**\n\n" |
| f"📂 文件夹: {result['folder']}\n" |
| f"📄 文件名: {result['filename']}\n" |
| f"🔗 [预览链接]({result['view_link']})\n" |
| f"⬇️ [直接下载]({result['download_link']})" |
| ) |
| else: |
| return f"❌ 失败: {result.get('message')}" |
|
|
| with gr.Blocks(title="URL to Drive Saver") as demo: |
| gr.Markdown("# 🚀 URL to Drive (API Enabled)") |
| |
| with gr.Row(): |
| url_input = gr.Textbox(label="文件 URL") |
| pwd_input = gr.Textbox(label="访问密码", type="password") |
| submit_btn = gr.Button("🚀 开始转存", variant="primary") |
| |
| output = gr.Markdown(label="结果") |
|
|
| submit_btn.click(ui_wrapper, inputs=[url_input, pwd_input], outputs=output) |
| |
| |
| api = gr.Interface( |
| fn=process_upload, |
| inputs=[ |
| gr.Textbox(label="url"), |
| gr.Textbox(label="password") |
| ], |
| outputs="json", |
| api_name="upload" |
| ) |
|
|
| if __name__ == "__main__": |
| demo.queue().launch(server_name="0.0.0.0", show_api=True) |
|
|