Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import sqlite3 | |
| import datetime | |
| import requests | |
| import random | |
| from flask import Flask, render_template, request, jsonify, g | |
| from flask_cors import CORS | |
| app = Flask(__name__, instance_relative_config=True) | |
| CORS(app) | |
| # Configuration | |
| app.config['SECRET_KEY'] = 'support-pilot-secret-key' | |
| app.config['DATABASE'] = os.path.join(app.instance_path, 'support_pilot.db') | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB Max Upload | |
| SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" | |
| SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions" | |
| # Ensure instance folder exists | |
| try: | |
| os.makedirs(app.instance_path) | |
| except OSError: | |
| pass | |
| # --- Database Helpers --- | |
| def get_db(): | |
| if 'db' not in g: | |
| g.db = sqlite3.connect(app.config['DATABASE']) | |
| g.db.row_factory = sqlite3.Row | |
| return g.db | |
| def close_db(error): | |
| if hasattr(g, 'db'): | |
| g.db.close() | |
| def init_db(): | |
| db = get_db() | |
| # Tickets/Logs Table | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS tickets ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| content TEXT NOT NULL, | |
| source TEXT DEFAULT 'manual', | |
| sentiment TEXT, | |
| intent TEXT, | |
| priority TEXT, | |
| status TEXT DEFAULT 'new', | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Knowledge Base / Solutions | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS solutions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| title TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| tags TEXT, | |
| usage_count INTEGER DEFAULT 0, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Seed Data | |
| cursor = db.execute("SELECT count(*) FROM solutions") | |
| if cursor.fetchone()[0] == 0: | |
| db.execute("INSERT INTO solutions (title, content, tags) VALUES (?, ?, ?)", | |
| ("退款流程", "用户申请退款需提供订单号,若在7天内且无使用痕迹,可直接办理。", "refund,policy")) | |
| db.execute("INSERT INTO solutions (title, content, tags) VALUES (?, ?, ?)", | |
| ("账号无法登录", "请引导用户检查大小写,或使用'忘记密码'功能重置。", "login,account")) | |
| # Seed Tickets | |
| cursor = db.execute("SELECT count(*) FROM tickets") | |
| if cursor.fetchone()[0] == 0: | |
| sample_tickets = [ | |
| ("我的包裹丢了!你们怎么搞的?", "Negative", "Logistics", "High"), | |
| ("这个产品怎么使用?说明书看不懂。", "Neutral", "Inquiry", "Medium"), | |
| ("感谢你们的帮助,问题解决了。", "Positive", "Feedback", "Low") | |
| ] | |
| for t in sample_tickets: | |
| db.execute("INSERT INTO tickets (content, sentiment, intent, priority, status) VALUES (?, ?, ?, ?, ?)", | |
| (t[0], t[1], t[2], t[3], 'closed')) | |
| db.commit() | |
| # --- AI Integration --- | |
| def call_ai_model(system_prompt, user_input, model="Qwen/Qwen2.5-7B-Instruct"): | |
| headers = { | |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": model, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_input} | |
| ], | |
| "temperature": 0.7, | |
| "response_format": {"type": "json_object"} | |
| } | |
| try: | |
| response = requests.post(SILICONFLOW_API_URL, json=payload, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| content = data['choices'][0]['message']['content'] | |
| # Try to parse JSON | |
| try: | |
| return json.loads(content) | |
| except json.JSONDecodeError: | |
| # Fallback if model didn't return pure JSON | |
| return {"raw_content": content, "error": "json_parse_error"} | |
| except Exception as e: | |
| print(f"AI API Error: {e}") | |
| # Mock Fallback | |
| return None | |
| # --- Routes --- | |
| def index(): | |
| return render_template('index.html') | |
| def init_data(): | |
| with app.app_context(): | |
| init_db() | |
| return jsonify({"status": "initialized"}) | |
| def analyze_ticket(): | |
| content = request.json.get('content', '') | |
| if not content: | |
| return jsonify({"error": "No content"}), 400 | |
| system_prompt = """ | |
| You are a Customer Support AI Analyst. Analyze the following customer message. | |
| Return a JSON object with: | |
| - "sentiment": "Positive", "Neutral", "Negative" or "Urgent" | |
| - "intent": The user's main goal (e.g., Refund, Tech Support, Inquiry) | |
| - "priority": "Low", "Medium", "High", "Critical" | |
| - "suggested_reply": A polite, professional, and helpful draft response (in Chinese). | |
| - "keywords": A list of 3-5 keywords. | |
| """ | |
| ai_result = call_ai_model(system_prompt, content) | |
| # Fallback if AI fails | |
| if not ai_result or 'error' in ai_result: | |
| ai_result = { | |
| "sentiment": "Neutral (Mock)", | |
| "intent": "Unknown", | |
| "priority": "Medium", | |
| "suggested_reply": "感谢您的反馈,人工客服稍后会介入处理。", | |
| "keywords": ["fallback"] | |
| } | |
| # Save to DB | |
| db = get_db() | |
| db.execute('INSERT INTO tickets (content, sentiment, intent, priority, status) VALUES (?, ?, ?, ?, ?)', | |
| (content, ai_result.get('sentiment'), ai_result.get('intent'), ai_result.get('priority'), 'analyzed')) | |
| db.commit() | |
| return jsonify(ai_result) | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file part"}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"error": "No selected file"}), 400 | |
| try: | |
| # Read file content | |
| content = file.read() | |
| # Try to decode as UTF-8 | |
| try: | |
| text_content = content.decode('utf-8') | |
| return jsonify({"content": text_content, "filename": file.filename}) | |
| except UnicodeDecodeError: | |
| return jsonify({"error": "Binary or non-UTF-8 file detected. Please upload a text file."}), 400 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def get_stats(): | |
| db = get_db() | |
| tickets = db.execute('SELECT * FROM tickets ORDER BY created_at DESC LIMIT 50').fetchall() | |
| solutions = db.execute('SELECT * FROM solutions').fetchall() | |
| # Calculate stats | |
| sentiment_counts = {} | |
| intent_counts = {} | |
| for t in tickets: | |
| s = t['sentiment'] or 'Unknown' | |
| i = t['intent'] or 'Other' | |
| sentiment_counts[s] = sentiment_counts.get(s, 0) + 1 | |
| intent_counts[i] = intent_counts.get(i, 0) + 1 | |
| return jsonify({ | |
| "total_tickets": len(tickets), | |
| "total_solutions": len(solutions), | |
| "sentiment_dist": [{"name": k, "value": v} for k, v in sentiment_counts.items()], | |
| "intent_dist": [{"name": k, "value": v} for k, v in intent_counts.items()], | |
| "recent_tickets": [dict(t) for t in tickets[:5]] | |
| }) | |
| def simulate_chat(): | |
| # Roleplay: User acts as Agent, AI acts as Customer | |
| history = request.json.get('history', []) # List of {role, content} | |
| scenario = request.json.get('scenario', 'refund_dispute') | |
| system_prompt = f""" | |
| You are roleplaying as a CUSTOMER in a support chat. | |
| Scenario: {scenario}. | |
| Your goal is to test the support agent. Be realistic. | |
| If the agent helps you, thank them. If they are rude or unhelpful, get angry. | |
| Return JSON: {{"reply": "Your response as customer", "satisfaction_score": 1-10 (integer)}} | |
| """ | |
| last_msg = history[-1]['content'] if history else "Start" | |
| ai_result = call_ai_model(system_prompt, last_msg) | |
| if not ai_result: | |
| ai_result = { | |
| "reply": "Wait, I need to check (Simulated Delay)...", | |
| "satisfaction_score": 5 | |
| } | |
| return jsonify(ai_result) | |
| def manage_solutions(): | |
| db = get_db() | |
| if request.method == 'POST': | |
| data = request.json | |
| db.execute('INSERT INTO solutions (title, content, tags) VALUES (?, ?, ?)', | |
| (data['title'], data['content'], data['tags'])) | |
| db.commit() | |
| return jsonify({"status": "success"}) | |
| else: | |
| rows = db.execute('SELECT * FROM solutions ORDER BY usage_count DESC').fetchall() | |
| return jsonify([dict(r) for r in rows]) | |
| if __name__ == '__main__': | |
| with app.app_context(): | |
| init_db() | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |