Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import random | |
| import requests | |
| from flask import Flask, jsonify, request, render_template | |
| from werkzeug.utils import secure_filename | |
| app = Flask(__name__, template_folder='templates', static_folder='static') | |
| # Configuration | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload | |
| app.config['UPLOAD_FOLDER'] = 'uploads' | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| # SiliconFlow API Configuration | |
| SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" | |
| SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions" | |
| # Mock Data: Skill Registry (Translated to Chinese) | |
| SKILL_REGISTRY = [ | |
| { | |
| "id": "sk_001", | |
| "name": "PDF 文本提取器", | |
| "description": "使用 OCR 技术从 PDF 文档中提取纯文本内容。", | |
| "author": "System", | |
| "downloads": 1240, | |
| "price": 0.00, | |
| "rating": 4.8, | |
| "tags": ["文档", "OCR", "自动化"], | |
| "code": "def extract_pdf(file_path):\n # 模拟实现\n return '提取的文本内容...'" | |
| }, | |
| { | |
| "id": "sk_002", | |
| "name": "YouTube 视频总结", | |
| "description": "根据 YouTube 视频 URL 生成简洁的要点总结。", | |
| "author": "AI_Architect", | |
| "downloads": 890, | |
| "price": 4.99, | |
| "rating": 4.9, | |
| "tags": ["视频", "总结", "内容生成"], | |
| "code": "def summarize_video(url):\n return {'summary': '提取的关键点...'}" | |
| }, | |
| { | |
| "id": "sk_003", | |
| "name": "CSV 数据清洗助手", | |
| "description": "自动修复 CSV 文件中的缺失值并标准化日期格式。", | |
| "author": "DataWizard", | |
| "downloads": 2100, | |
| "price": 0.00, | |
| "rating": 4.7, | |
| "tags": ["数据", "CSV", "清洗"], | |
| "code": "def clean_csv(df):\n df.fillna(0, inplace=True)\n return df" | |
| }, | |
| { | |
| "id": "sk_004", | |
| "name": "SEO 关键词生成器", | |
| "description": "根据输入的主题生成高排名的 SEO 关键词。", | |
| "author": "MarketingBot", | |
| "downloads": 560, | |
| "price": 9.99, | |
| "rating": 4.5, | |
| "tags": ["营销", "SEO", "增长"], | |
| "code": "def gen_keywords(topic):\n return ['最佳 ' + topic, topic + ' 测评']" | |
| }, | |
| { | |
| "id": "sk_005", | |
| "name": "社交媒体情感雷达", | |
| "description": "分析品牌在 Twitter 和 Reddit 上的情感倾向。", | |
| "author": "System", | |
| "downloads": 1500, | |
| "price": 19.99, | |
| "rating": 4.9, | |
| "tags": ["社交", "分析", "情感"], | |
| "code": "def analyze_sentiment(brand):\n return {'positive': 80, 'negative': 20}" | |
| } | |
| ] | |
| # Mock Data: Stats | |
| STATS = { | |
| "total_skills": 142, | |
| "active_users": 3500, | |
| "total_executions": 45000, | |
| "revenue": 12500.00 | |
| } | |
| def index(): | |
| return render_template('index.html') | |
| def get_stats(): | |
| # Simulate live updates | |
| STATS['total_executions'] += random.randint(1, 10) | |
| STATS['revenue'] += random.uniform(0, 5) | |
| return jsonify(STATS) | |
| def get_skills(): | |
| return jsonify(SKILL_REGISTRY) | |
| def generate_skill(): | |
| data = request.json | |
| prompt = data.get('prompt', '') | |
| if not prompt: | |
| return jsonify({"error": "Prompt is required"}), 400 | |
| # Try SiliconFlow API | |
| try: | |
| headers = { | |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| # Using a reliable model on SiliconFlow | |
| payload = { | |
| "model": "deepseek-ai/DeepSeek-V3", | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": """你是一个专业的 Python 代码生成助手。请根据用户的需求生成一个 Python 函数。 | |
| 返回格式必须是 JSON,包含以下字段: | |
| - name: 技能名称 (中文) | |
| - description: 简短描述 (中文) | |
| - code: 完整的 Python 函数代码 (包含注释) | |
| - tags: 3-5个标签 (List[str]) | |
| 只返回 JSON 字符串,不要包含 markdown 格式化符号。""" | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"生成一个 Python 任务脚本: {prompt}" | |
| } | |
| ], | |
| "temperature": 0.7 | |
| } | |
| response = requests.post(SILICONFLOW_API_URL, json=payload, headers=headers, timeout=30) | |
| if response.status_code == 200: | |
| result = response.json() | |
| content = result['choices'][0]['message']['content'] | |
| # Clean up content if it contains markdown code blocks | |
| if "```json" in content: | |
| content = content.replace("```json", "").replace("```", "") | |
| elif "```" in content: | |
| content = content.replace("```", "") | |
| ai_data = json.loads(content.strip()) | |
| new_id = f"sk_{random.randint(1000, 9999)}" | |
| new_skill = { | |
| "id": new_id, | |
| "name": ai_data.get('name', f"AI 生成: {prompt[:10]}"), | |
| "description": ai_data.get('description', f"AI 为 '{prompt}' 生成的解决方案"), | |
| "author": "SiliconFlow_AI", | |
| "downloads": 0, | |
| "price": 0.00, | |
| "rating": 0.0, | |
| "tags": ai_data.get('tags', ["AI生成", "Python"]), | |
| "code": ai_data.get('code', "# 生成失败,请重试") | |
| } | |
| # Don't insert automatically, let user verify first (frontend logic handles display) | |
| return jsonify(new_skill) | |
| except Exception as e: | |
| print(f"SiliconFlow API Error: {e}") | |
| # Fallback to mock if API fails | |
| time.sleep(1) | |
| new_id = f"sk_{random.randint(100, 999)}" | |
| new_skill = { | |
| "id": new_id, | |
| "name": f"模拟生成: {prompt[:10]}...", | |
| "description": f"由于 API 连接问题,这是本地模拟生成的结果: {prompt}", | |
| "author": "Local_Fallback", | |
| "downloads": 0, | |
| "price": 0.00, | |
| "rating": 0.0, | |
| "tags": ["模拟", "离线"], | |
| "code": f"def task_fallback(data):\n # API Error fallback\n print('Processing {prompt}')\n return True" | |
| } | |
| return jsonify(new_skill) | |
| return jsonify({"error": "Generation failed"}), 500 | |
| def save_skill(): | |
| data = request.json | |
| if not data: | |
| return jsonify({"error": "No data provided"}), 400 | |
| # In a real app, save to DB. Here, add to memory. | |
| # Check if exists | |
| existing = next((s for s in SKILL_REGISTRY if s['id'] == data.get('id')), None) | |
| if not existing: | |
| SKILL_REGISTRY.insert(0, data) | |
| STATS['total_skills'] += 1 | |
| return jsonify({"status": "success", "message": "技能已保存"}) | |
| else: | |
| return jsonify({"status": "exists", "message": "技能已存在"}) | |
| def execute_skill(): | |
| data = request.json | |
| skill_id = data.get('skill_id') | |
| # Mock Execution Delay | |
| time.sleep(0.5) | |
| return jsonify({ | |
| "status": "success", | |
| "output": f"技能 {skill_id} 执行成功。", | |
| "result_data": { | |
| "processed_items": random.randint(5, 50), | |
| "efficiency_score": f"{random.randint(80, 99)}%", | |
| "timestamp": time.time() | |
| } | |
| }) | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}), 400 | |
| if file: | |
| filename = secure_filename(file.filename) | |
| file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) | |
| return jsonify({'message': f'File {filename} uploaded successfully', 'path': f"/uploads/{filename}"}) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) | |