| import os, re, uuid, json, secrets, threading, base64 |
| from datetime import datetime |
| from flask import Flask, render_template, request, jsonify, send_from_directory, Response, stream_with_context |
| from flask_sqlalchemy import SQLAlchemy |
| from werkzeug.security import generate_password_hash, check_password_hash |
| from werkzeug.middleware.proxy_fix import ProxyFix |
| from werkzeug.utils import secure_filename |
| import anthropic |
|
|
| app = Flask(__name__) |
| app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) |
| app.secret_key = os.environ.get('SECRET_KEY', 'claude-clone-secret-2024') |
| app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///claude.db' |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False |
| app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 |
|
|
| for d in ('outputs', 'uploads', 'static'): |
| os.makedirs(d, exist_ok=True) |
|
|
| db = SQLAlchemy(app) |
| MODEL = "claude-opus-4-6" |
|
|
| SYSTEM_PROMPT = """You are Claude, an expert AI assistant by Anthropic. |
| |
| CRITICAL CODE RULES — NEVER BREAK THESE: |
| 1. ALWAYS write COMPLETE, fully functional code — never truncate, never use "..." or "# rest of code" |
| 2. ALWAYS label every code block with the language: ```python, ```javascript, ```html, etc. |
| 3. Write 1000, 2000, 5000+ lines if needed — length is not a problem |
| 4. Each file gets its own code block when creating multi-file projects |
| 5. Include ALL imports, error handling, comments, and documentation |
| 6. When you see uploaded code: detect ALL bugs, security issues, and performance problems |
| 7. When you see uploaded images: analyze them in full detail |
| |
| You can create complete: web apps, APIs, scripts, databases, CLI tools, games, bots — anything.""" |
|
|
| class User(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| username = db.Column(db.String(80), unique=True, nullable=False) |
| email = db.Column(db.String(120), unique=True, nullable=False) |
| password_hash = db.Column(db.String(256), nullable=False) |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
| class UserToken(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| token = db.Column(db.String(64), unique=True, nullable=False, default=lambda: secrets.token_hex(32)) |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
| user = db.relationship('User', backref='tokens') |
|
|
| class Chat(db.Model): |
| id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| title = db.Column(db.String(200), default='New Chat') |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
| updated_at = db.Column(db.DateTime, default=datetime.utcnow) |
| messages = db.relationship('Message', backref='chat', lazy=True, |
| cascade='all, delete-orphan', order_by='Message.created_at') |
|
|
| class Message(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| chat_id = db.Column(db.String(36), db.ForeignKey('chat.id'), nullable=False) |
| role = db.Column(db.String(20), nullable=False) |
| content = db.Column(db.Text, nullable=False) |
| thinking = db.Column(db.Text, nullable=True) |
| files_json = db.Column(db.Text, nullable=True) |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
| class GeneratedFile(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| chat_id = db.Column(db.String(36), db.ForeignKey('chat.id'), nullable=True) |
| message_id = db.Column(db.Integer, nullable=True) |
| filename = db.Column(db.String(200), nullable=False) |
| filepath = db.Column(db.String(500), nullable=False) |
| language = db.Column(db.String(50), nullable=True) |
| line_count = db.Column(db.Integer, default=0) |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
| def get_current_user(): |
| t = request.headers.get('X-Auth-Token') |
| if not t: return None |
| ut = UserToken.query.filter_by(token=t).first() |
| return ut.user if ut else None |
|
|
| def make_token(user): |
| ut = UserToken(user_id=user.id) |
| db.session.add(ut); db.session.commit() |
| return ut.token |
|
|
| @app.route('/api/register', methods=['POST']) |
| def register(): |
| d = request.json or {} |
| u, e, p = d.get('username','').strip(), d.get('email','').strip().lower(), d.get('password','') |
| if not u or not e or not p: return jsonify({'error': 'All fields required'}), 400 |
| if User.query.filter_by(username=u).first(): return jsonify({'error': 'Username taken'}), 409 |
| if User.query.filter_by(email=e).first(): return jsonify({'error': 'Email exists'}), 409 |
| user = User(username=u, email=e, password_hash=generate_password_hash(p)) |
| db.session.add(user); db.session.commit() |
| return jsonify({'success': True, 'username': user.username, 'token': make_token(user)}) |
|
|
| @app.route('/api/login', methods=['POST']) |
| def login(): |
| d = request.json or {} |
| idf, p = d.get('identifier','').strip(), d.get('password','') |
| user = User.query.filter((User.username==idf)|(User.email==idf.lower())).first() |
| if not user or not check_password_hash(user.password_hash, p): |
| return jsonify({'error': 'Invalid credentials'}), 401 |
| return jsonify({'success': True, 'username': user.username, 'token': make_token(user)}) |
|
|
| @app.route('/api/logout', methods=['POST']) |
| def logout(): |
| t = request.headers.get('X-Auth-Token') |
| if t: UserToken.query.filter_by(token=t).delete(); db.session.commit() |
| return jsonify({'success': True}) |
|
|
| @app.route('/api/me') |
| def me(): |
| user = get_current_user() |
| if not user: return jsonify({'authenticated': False}), 401 |
| return jsonify({'authenticated': True, 'username': user.username}) |
|
|
| IMAGE_EXTS = {'png','jpg','jpeg','gif','webp'} |
| CODE_EXTS = {'txt','py','js','ts','jsx','tsx','html','css','scss','json','yaml','yml', |
| 'xml','sql','md','sh','bash','php','java','cpp','c','h','go','rs','rb', |
| 'swift','kt','r','vue','svelte','env','toml','ini','cfg','log','csv', |
| 'dockerfile','makefile','gradle','cmake'} |
|
|
| @app.route('/api/upload', methods=['POST']) |
| def upload_file(): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| if 'file' not in request.files: return jsonify({'error': 'No file'}), 400 |
| f = request.files['file'] |
| ext = f.filename.rsplit('.', 1)[-1].lower() if '.' in (f.filename or '') else '' |
| if ext not in IMAGE_EXTS | CODE_EXTS: return jsonify({'error': f'.{ext} not supported'}), 400 |
| safe = secure_filename(f.filename) |
| udir = os.path.join('uploads', str(user.id)); os.makedirs(udir, exist_ok=True) |
| fpath = os.path.join(udir, f"{uuid.uuid4().hex}_{safe}"); f.save(fpath) |
| if ext in IMAGE_EXTS: |
| with open(fpath,'rb') as fp: b64 = base64.b64encode(fp.read()).decode() |
| return jsonify({'success':True,'filename':safe,'type':'image', |
| 'media_type':f"image/{'jpeg' if ext=='jpg' else ext}",'data':b64}) |
| with open(fpath,'r',encoding='utf-8',errors='replace') as fp: content = fp.read() |
| return jsonify({'success':True,'filename':safe,'type':'text','content':content[:80000]}) |
|
|
| @app.route('/api/chats') |
| def get_chats(): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| chats = Chat.query.filter_by(user_id=user.id).order_by(Chat.updated_at.desc()).all() |
| return jsonify([{'id':c.id,'title':c.title,'updated_at':c.updated_at.isoformat()} for c in chats]) |
|
|
| @app.route('/api/chats', methods=['POST']) |
| def create_chat(): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| chat = Chat(user_id=user.id); db.session.add(chat); db.session.commit() |
| return jsonify({'id': chat.id, 'title': chat.title}) |
|
|
| @app.route('/api/chats/<chat_id>') |
| def get_chat(chat_id): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| chat = Chat.query.filter_by(id=chat_id, user_id=user.id).first() |
| if not chat: return jsonify({'error': 'Not found'}), 404 |
| msgs = [{'id':m.id,'role':m.role,'content':m.content,'thinking':m.thinking, |
| 'files':json.loads(m.files_json) if m.files_json else [], |
| 'created_at':m.created_at.isoformat()} for m in chat.messages] |
| return jsonify({'id':chat.id,'title':chat.title,'messages':msgs}) |
|
|
| @app.route('/api/chats/<chat_id>', methods=['DELETE']) |
| def delete_chat(chat_id): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| chat = Chat.query.filter_by(id=chat_id, user_id=user.id).first() |
| if not chat: return jsonify({'error': 'Not found'}), 404 |
| db.session.delete(chat); db.session.commit() |
| return jsonify({'success': True}) |
|
|
| @app.route('/api/chats/<chat_id>/stream', methods=['POST']) |
| def stream_message(chat_id): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| chat = Chat.query.filter_by(id=chat_id, user_id=user.id).first() |
| if not chat: return jsonify({'error': 'Not found'}), 404 |
|
|
| data = request.json or {} |
| user_text = data.get('content', '').strip() |
| files = data.get('files', []) |
|
|
| api_content = [] |
| for f in files: |
| if f.get('type') == 'image': |
| api_content.append({"type":"image","source":{"type":"base64", |
| "media_type":f['media_type'],"data":f['data']}}) |
| elif f.get('type') == 'text': |
| api_content.append({"type":"text", |
| "text":f"**📎 Uploaded: `{f['filename']}`**\n```\n{f['content']}\n```"}) |
| if user_text: |
| api_content.append({"type":"text","text":user_text}) |
| if not api_content: |
| return jsonify({'error': 'Empty message'}), 400 |
|
|
| finfo = [{'filename':f['filename'],'type':f['type']} for f in files] |
| user_msg = Message(chat_id=chat_id, role='user', content=user_text, |
| files_json=json.dumps(finfo) if finfo else None) |
| db.session.add(user_msg) |
|
|
| msg_count = Message.query.filter_by(chat_id=chat_id).count() |
| if msg_count <= 1: |
| chat.title = (user_text[:60] + ('…' if len(user_text)>60 else '')) or \ |
| (files[0]['filename'] if files else 'New Chat') |
| db.session.commit() |
|
|
| history = [] |
| for m in chat.messages[:-1]: |
| history.append({"role":m.role,"content":m.content or "(attachment)"}) |
| history.append({"role":"user","content":api_content}) |
|
|
| api_key = os.environ.get('ANTHROPIC_API_KEY','') |
| chat_id_ = chat_id; user_id_ = user.id; chat_title = chat.title |
|
|
| def generate(): |
| full_text = '' |
| if not api_key: |
| err = ("⚠️ **ANTHROPIC_API_KEY not set.**\n\n" |
| "Add it in Space Settings → Variables and secrets → `ANTHROPIC_API_KEY`\n" |
| "Get your key at: https://console.anthropic.com/") |
| full_text = err |
| yield f"data: {json.dumps({'type':'text','content':err})}\n\n" |
| else: |
| try: |
| client = anthropic.Anthropic(api_key=api_key) |
| with client.messages.stream(model=MODEL, max_tokens=16000, |
| system=SYSTEM_PROMPT, messages=history) as stream: |
| for chunk in stream.text_stream: |
| full_text += chunk |
| yield f"data: {json.dumps({'type':'text','content':chunk})}\n\n" |
| except Exception as exc: |
| err = f"❌ **Claude API Error:** `{str(exc)}`" |
| full_text = err |
| yield f"data: {json.dumps({'type':'text','content':err})}\n\n" |
|
|
| with app.app_context(): |
| ai_msg = Message(chat_id=chat_id_, role='assistant', content=full_text) |
| db.session.add(ai_msg) |
| c = Chat.query.get(chat_id_) |
| if c: c.updated_at = datetime.utcnow() |
| db.session.commit() |
| mid = ai_msg.id |
|
|
| threading.Thread(target=extract_files, args=(full_text, chat_id_, user_id_, mid), daemon=True).start() |
| yield f"data: {json.dumps({'type':'done','message_id':mid,'title':chat_title})}\n\n" |
|
|
| return Response(stream_with_context(generate()), mimetype='text/event-stream', |
| headers={'Cache-Control':'no-cache','X-Accel-Buffering':'no','Connection':'keep-alive'}) |
|
|
| EXT_MAP = {'python':'py','py':'py','javascript':'js','js':'js','typescript':'ts','ts':'ts', |
| 'jsx':'jsx','tsx':'tsx','html':'html','css':'css','scss':'scss','java':'java', |
| 'cpp':'cpp','c':'c','go':'go','rust':'rs','bash':'sh','sh':'sh','sql':'sql', |
| 'json':'json','yaml':'yaml','yml':'yml','xml':'xml','php':'php','ruby':'rb', |
| 'rb':'rb','swift':'swift','kotlin':'kt','r':'r','markdown':'md','md':'md', |
| 'vue':'vue','svelte':'svelte','dockerfile':'dockerfile','plaintext':'txt','text':'txt'} |
|
|
| def extract_files(content, chat_id, user_id, message_id): |
| with app.app_context(): |
| for i, (lang, code) in enumerate(re.findall(r'```(\w+)?\n([\s\S]*?)```', content)): |
| code = code.strip() |
| if not code or len(code) < 30: continue |
| lang_l = lang.lower() if lang else 'txt' |
| ext = EXT_MAP.get(lang_l, 'txt') |
| ts = datetime.utcnow().strftime('%Y%m%d_%H%M%S') |
| fname = f"file_{ts}_{i+1}.{ext}" |
| udir = os.path.join('outputs', str(user_id)); os.makedirs(udir, exist_ok=True) |
| fpath = os.path.join(udir, fname) |
| with open(fpath,'w',encoding='utf-8') as fp: fp.write(code) |
| db.session.add(GeneratedFile(user_id=user_id, chat_id=chat_id, message_id=message_id, |
| filename=fname, filepath=fpath, language=lang_l, line_count=code.count('\n')+1)) |
| db.session.commit() |
|
|
| @app.route('/api/files') |
| def list_files(): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| chat_id = request.args.get('chat_id') |
| q = GeneratedFile.query.filter_by(user_id=user.id) |
| if chat_id: q = q.filter_by(chat_id=chat_id) |
| files = q.order_by(GeneratedFile.created_at.desc()).limit(200).all() |
| return jsonify([{'id':f.id,'filename':f.filename,'language':f.language, |
| 'line_count':f.line_count,'chat_id':f.chat_id,'message_id':f.message_id, |
| 'created_at':f.created_at.isoformat()} for f in files]) |
|
|
| @app.route('/api/files/<int:fid>/content') |
| def file_content(fid): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| gf = GeneratedFile.query.filter_by(id=fid, user_id=user.id).first() |
| if not gf or not os.path.exists(gf.filepath): return jsonify({'error': 'Not found'}), 404 |
| with open(gf.filepath,'r',encoding='utf-8',errors='replace') as fp: content = fp.read() |
| return jsonify({'content':content,'filename':gf.filename,'language':gf.language,'line_count':gf.line_count}) |
|
|
| @app.route('/api/files/<int:fid>/download') |
| def download_file(fid): |
| user = get_current_user() |
| if not user: return jsonify({'error': 'Unauthorized'}), 401 |
| gf = GeneratedFile.query.filter_by(id=fid, user_id=user.id).first() |
| if not gf or not os.path.exists(gf.filepath): return jsonify({'error': 'Not found'}), 404 |
| return send_from_directory(os.path.abspath(os.path.dirname(gf.filepath)), |
| os.path.basename(gf.filepath), as_attachment=True) |
|
|
| @app.route('/') |
| def index(): return render_template('index.html') |
|
|
| with app.app_context(): |
| db.create_all() |
|
|
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860, debug=False) |