Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import uuid | |
| import random | |
| import json | |
| from flask import Flask, render_template, request, jsonify, send_file | |
| from openai import OpenAI | |
| from werkzeug.utils import secure_filename | |
| app = Flask(__name__) | |
| app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # Increased to 50MB | |
| app.config['UPLOAD_FOLDER'] = '/tmp/uploads' | |
| if not os.path.exists(app.config['UPLOAD_FOLDER']): | |
| os.makedirs(app.config['UPLOAD_FOLDER']) | |
| # SiliconFlow Configuration | |
| API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" | |
| BASE_URL = "https://api.siliconflow.cn/v1" | |
| client = OpenAI( | |
| api_key=API_KEY, | |
| base_url=BASE_URL | |
| ) | |
| # In-memory storage for demo | |
| KNOWLEDGE_VAULT = [] | |
| # Mock Data for Fallback | |
| MOCK_INSIGHTS = [ | |
| "市场需求呈现指数级增长,尤其在亚太地区。", | |
| "当前技术栈存在明显的性能瓶颈,建议迁移至 Rust 或 Go。", | |
| "用户留存率与响应速度呈正相关,相关系数 0.85。", | |
| ] | |
| MOCK_TAGS = ["高增长", "风险警示", "技术债务", "蓝海市场", "自动化"] | |
| def index(): | |
| return render_template('index.html') | |
| def analyze_content(content): | |
| """ | |
| Call SiliconFlow API to analyze content. | |
| """ | |
| try: | |
| response = client.chat.completions.create( | |
| model="Qwen/Qwen2.5-7B-Instruct", # Using a common model | |
| messages=[ | |
| {"role": "system", "content": """你是一个专业的商业分析师和知识提炼专家。 | |
| 请分析用户提供的内容,并返回一个严格的 JSON 格式响应。 | |
| JSON 结构必须包含以下字段: | |
| - summary (string): 内容的简明摘要(200字以内) | |
| - insights (list of strings): 3-5个关键洞察 | |
| - tags (list of strings): 3-5个相关标签 | |
| - metrics (object): 包含 commercial_value (0-100), complexity (0-100), sentiment (0.0-1.0) | |
| 只返回 JSON,不要包含 Markdown 格式标记。"""}, | |
| {"role": "user", "content": content} | |
| ], | |
| temperature=0.7, | |
| max_tokens=1000, | |
| response_format={"type": "json_object"} | |
| ) | |
| result_text = response.choices[0].message.content | |
| # Try to clean markdown code blocks if present | |
| if "```json" in result_text: | |
| result_text = result_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in result_text: | |
| result_text = result_text.split("```")[1].split("```")[0].strip() | |
| return json.loads(result_text) | |
| except Exception as e: | |
| print(f"API Error: {e}") | |
| # Fallback to mock if API fails | |
| return { | |
| "summary": "分析服务暂时不可用或超时,已切换至演示模式。原始内容:" + content[:50] + "...", | |
| "insights": random.sample(MOCK_INSIGHTS, k=min(3, len(MOCK_INSIGHTS))), | |
| "tags": random.sample(MOCK_TAGS, k=3), | |
| "metrics": { | |
| "commercial_value": random.randint(50, 90), | |
| "complexity": random.randint(30, 70), | |
| "sentiment": 0.5 | |
| } | |
| } | |
| def mine_knowledge(): | |
| """ | |
| Extracts info from URL, Text, or File. | |
| """ | |
| content = "" | |
| source_type = request.form.get('type', 'text') # Changed to form data for file upload support | |
| if source_type == 'file': | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file part"}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"error": "No selected file"}), 400 | |
| if file: | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| # Read file content (Simple text read for now) | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| except UnicodeDecodeError: | |
| return jsonify({"error": "File encoding not supported. Please upload UTF-8 text files."}), 400 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| else: | |
| # For JSON requests (text/url), we might need to handle differently or expect form data | |
| # Let's support both JSON and Form for text/url to be safe, but since we use FormData for file, | |
| # let's try to unify or check content-type. | |
| if request.is_json: | |
| data = request.json | |
| content = data.get('content', '') | |
| source_type = data.get('type', 'text') | |
| else: | |
| content = request.form.get('content', '') | |
| if not content and source_type != 'file': | |
| return jsonify({"error": "No content provided"}), 400 | |
| # API Analysis | |
| analysis_result = analyze_content(content) | |
| analysis_id = str(uuid.uuid4()) | |
| result = { | |
| "id": analysis_id, | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "source_type": source_type, | |
| "summary": analysis_result.get('summary', '无摘要'), | |
| "insights": analysis_result.get('insights', []), | |
| "tags": analysis_result.get('tags', []), | |
| "metrics": analysis_result.get('metrics', { | |
| "commercial_value": 0, | |
| "complexity": 0, | |
| "sentiment": 0.5 | |
| }) | |
| } | |
| # Store in vault | |
| KNOWLEDGE_VAULT.insert(0, result) | |
| return jsonify(result) | |
| def get_vault(): | |
| """ | |
| Returns stored knowledge assets. | |
| """ | |
| return jsonify(KNOWLEDGE_VAULT) | |
| def generate_asset(): | |
| """ | |
| Simulates generating a downloadable asset (PDF/Markdown). | |
| """ | |
| data = request.json | |
| asset_id = data.get('id') | |
| asset_type = data.get('type', 'report') | |
| time.sleep(1) | |
| return jsonify({ | |
| "status": "success", | |
| "message": f"{asset_type.upper()} 资产已生成", | |
| "download_url": "#" # Mock URL | |
| }) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |