claude / app.py
soxogvv's picture
Update app.py
e5030f3 verified
import os, re, uuid, json, secrets, threading, base64
from datetime import datetime
from flask import Flask, render_template, request, jsonify, send_from_directory, Response, stream_with_context
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.utils import secure_filename
import anthropic
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
app.secret_key = os.environ.get('SECRET_KEY', 'claude-clone-secret-2024')
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///claude.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024
for d in ('outputs', 'uploads', 'static'):
os.makedirs(d, exist_ok=True)
db = SQLAlchemy(app)
MODEL = "claude-opus-4-6"
SYSTEM_PROMPT = """You are Claude, an expert AI assistant by Anthropic.
CRITICAL CODE RULES — NEVER BREAK THESE:
1. ALWAYS write COMPLETE, fully functional code — never truncate, never use "..." or "# rest of code"
2. ALWAYS label every code block with the language: ```python, ```javascript, ```html, etc.
3. Write 1000, 2000, 5000+ lines if needed — length is not a problem
4. Each file gets its own code block when creating multi-file projects
5. Include ALL imports, error handling, comments, and documentation
6. When you see uploaded code: detect ALL bugs, security issues, and performance problems
7. When you see uploaded images: analyze them in full detail
You can create complete: web apps, APIs, scripts, databases, CLI tools, games, bots — anything."""
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class UserToken(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
token = db.Column(db.String(64), unique=True, nullable=False, default=lambda: secrets.token_hex(32))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
user = db.relationship('User', backref='tokens')
class Chat(db.Model):
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
title = db.Column(db.String(200), default='New Chat')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow)
messages = db.relationship('Message', backref='chat', lazy=True,
cascade='all, delete-orphan', order_by='Message.created_at')
class Message(db.Model):
id = db.Column(db.Integer, primary_key=True)
chat_id = db.Column(db.String(36), db.ForeignKey('chat.id'), nullable=False)
role = db.Column(db.String(20), nullable=False)
content = db.Column(db.Text, nullable=False)
thinking = db.Column(db.Text, nullable=True)
files_json = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class GeneratedFile(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
chat_id = db.Column(db.String(36), db.ForeignKey('chat.id'), nullable=True)
message_id = db.Column(db.Integer, nullable=True)
filename = db.Column(db.String(200), nullable=False)
filepath = db.Column(db.String(500), nullable=False)
language = db.Column(db.String(50), nullable=True)
line_count = db.Column(db.Integer, default=0)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def get_current_user():
t = request.headers.get('X-Auth-Token')
if not t: return None
ut = UserToken.query.filter_by(token=t).first()
return ut.user if ut else None
def make_token(user):
ut = UserToken(user_id=user.id)
db.session.add(ut); db.session.commit()
return ut.token
@app.route('/api/register', methods=['POST'])
def register():
d = request.json or {}
u, e, p = d.get('username','').strip(), d.get('email','').strip().lower(), d.get('password','')
if not u or not e or not p: return jsonify({'error': 'All fields required'}), 400
if User.query.filter_by(username=u).first(): return jsonify({'error': 'Username taken'}), 409
if User.query.filter_by(email=e).first(): return jsonify({'error': 'Email exists'}), 409
user = User(username=u, email=e, password_hash=generate_password_hash(p))
db.session.add(user); db.session.commit()
return jsonify({'success': True, 'username': user.username, 'token': make_token(user)})
@app.route('/api/login', methods=['POST'])
def login():
d = request.json or {}
idf, p = d.get('identifier','').strip(), d.get('password','')
user = User.query.filter((User.username==idf)|(User.email==idf.lower())).first()
if not user or not check_password_hash(user.password_hash, p):
return jsonify({'error': 'Invalid credentials'}), 401
return jsonify({'success': True, 'username': user.username, 'token': make_token(user)})
@app.route('/api/logout', methods=['POST'])
def logout():
t = request.headers.get('X-Auth-Token')
if t: UserToken.query.filter_by(token=t).delete(); db.session.commit()
return jsonify({'success': True})
@app.route('/api/me')
def me():
user = get_current_user()
if not user: return jsonify({'authenticated': False}), 401
return jsonify({'authenticated': True, 'username': user.username})
IMAGE_EXTS = {'png','jpg','jpeg','gif','webp'}
CODE_EXTS = {'txt','py','js','ts','jsx','tsx','html','css','scss','json','yaml','yml',
'xml','sql','md','sh','bash','php','java','cpp','c','h','go','rs','rb',
'swift','kt','r','vue','svelte','env','toml','ini','cfg','log','csv',
'dockerfile','makefile','gradle','cmake'}
@app.route('/api/upload', methods=['POST'])
def upload_file():
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
if 'file' not in request.files: return jsonify({'error': 'No file'}), 400
f = request.files['file']
ext = f.filename.rsplit('.', 1)[-1].lower() if '.' in (f.filename or '') else ''
if ext not in IMAGE_EXTS | CODE_EXTS: return jsonify({'error': f'.{ext} not supported'}), 400
safe = secure_filename(f.filename)
udir = os.path.join('uploads', str(user.id)); os.makedirs(udir, exist_ok=True)
fpath = os.path.join(udir, f"{uuid.uuid4().hex}_{safe}"); f.save(fpath)
if ext in IMAGE_EXTS:
with open(fpath,'rb') as fp: b64 = base64.b64encode(fp.read()).decode()
return jsonify({'success':True,'filename':safe,'type':'image',
'media_type':f"image/{'jpeg' if ext=='jpg' else ext}",'data':b64})
with open(fpath,'r',encoding='utf-8',errors='replace') as fp: content = fp.read()
return jsonify({'success':True,'filename':safe,'type':'text','content':content[:80000]})
@app.route('/api/chats')
def get_chats():
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
chats = Chat.query.filter_by(user_id=user.id).order_by(Chat.updated_at.desc()).all()
return jsonify([{'id':c.id,'title':c.title,'updated_at':c.updated_at.isoformat()} for c in chats])
@app.route('/api/chats', methods=['POST'])
def create_chat():
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
chat = Chat(user_id=user.id); db.session.add(chat); db.session.commit()
return jsonify({'id': chat.id, 'title': chat.title})
@app.route('/api/chats/<chat_id>')
def get_chat(chat_id):
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
chat = Chat.query.filter_by(id=chat_id, user_id=user.id).first()
if not chat: return jsonify({'error': 'Not found'}), 404
msgs = [{'id':m.id,'role':m.role,'content':m.content,'thinking':m.thinking,
'files':json.loads(m.files_json) if m.files_json else [],
'created_at':m.created_at.isoformat()} for m in chat.messages]
return jsonify({'id':chat.id,'title':chat.title,'messages':msgs})
@app.route('/api/chats/<chat_id>', methods=['DELETE'])
def delete_chat(chat_id):
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
chat = Chat.query.filter_by(id=chat_id, user_id=user.id).first()
if not chat: return jsonify({'error': 'Not found'}), 404
db.session.delete(chat); db.session.commit()
return jsonify({'success': True})
@app.route('/api/chats/<chat_id>/stream', methods=['POST'])
def stream_message(chat_id):
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
chat = Chat.query.filter_by(id=chat_id, user_id=user.id).first()
if not chat: return jsonify({'error': 'Not found'}), 404
data = request.json or {}
user_text = data.get('content', '').strip()
files = data.get('files', [])
api_content = []
for f in files:
if f.get('type') == 'image':
api_content.append({"type":"image","source":{"type":"base64",
"media_type":f['media_type'],"data":f['data']}})
elif f.get('type') == 'text':
api_content.append({"type":"text",
"text":f"**📎 Uploaded: `{f['filename']}`**\n```\n{f['content']}\n```"})
if user_text:
api_content.append({"type":"text","text":user_text})
if not api_content:
return jsonify({'error': 'Empty message'}), 400
finfo = [{'filename':f['filename'],'type':f['type']} for f in files]
user_msg = Message(chat_id=chat_id, role='user', content=user_text,
files_json=json.dumps(finfo) if finfo else None)
db.session.add(user_msg)
msg_count = Message.query.filter_by(chat_id=chat_id).count()
if msg_count <= 1:
chat.title = (user_text[:60] + ('…' if len(user_text)>60 else '')) or \
(files[0]['filename'] if files else 'New Chat')
db.session.commit()
history = []
for m in chat.messages[:-1]:
history.append({"role":m.role,"content":m.content or "(attachment)"})
history.append({"role":"user","content":api_content})
api_key = os.environ.get('ANTHROPIC_API_KEY','')
chat_id_ = chat_id; user_id_ = user.id; chat_title = chat.title
def generate():
full_text = ''
if not api_key:
err = ("⚠️ **ANTHROPIC_API_KEY not set.**\n\n"
"Add it in Space Settings → Variables and secrets → `ANTHROPIC_API_KEY`\n"
"Get your key at: https://console.anthropic.com/")
full_text = err
yield f"data: {json.dumps({'type':'text','content':err})}\n\n"
else:
try:
client = anthropic.Anthropic(api_key=api_key)
with client.messages.stream(model=MODEL, max_tokens=16000,
system=SYSTEM_PROMPT, messages=history) as stream:
for chunk in stream.text_stream:
full_text += chunk
yield f"data: {json.dumps({'type':'text','content':chunk})}\n\n"
except Exception as exc:
err = f"❌ **Claude API Error:** `{str(exc)}`"
full_text = err
yield f"data: {json.dumps({'type':'text','content':err})}\n\n"
with app.app_context():
ai_msg = Message(chat_id=chat_id_, role='assistant', content=full_text)
db.session.add(ai_msg)
c = Chat.query.get(chat_id_)
if c: c.updated_at = datetime.utcnow()
db.session.commit()
mid = ai_msg.id
threading.Thread(target=extract_files, args=(full_text, chat_id_, user_id_, mid), daemon=True).start()
yield f"data: {json.dumps({'type':'done','message_id':mid,'title':chat_title})}\n\n"
return Response(stream_with_context(generate()), mimetype='text/event-stream',
headers={'Cache-Control':'no-cache','X-Accel-Buffering':'no','Connection':'keep-alive'})
EXT_MAP = {'python':'py','py':'py','javascript':'js','js':'js','typescript':'ts','ts':'ts',
'jsx':'jsx','tsx':'tsx','html':'html','css':'css','scss':'scss','java':'java',
'cpp':'cpp','c':'c','go':'go','rust':'rs','bash':'sh','sh':'sh','sql':'sql',
'json':'json','yaml':'yaml','yml':'yml','xml':'xml','php':'php','ruby':'rb',
'rb':'rb','swift':'swift','kotlin':'kt','r':'r','markdown':'md','md':'md',
'vue':'vue','svelte':'svelte','dockerfile':'dockerfile','plaintext':'txt','text':'txt'}
def extract_files(content, chat_id, user_id, message_id):
with app.app_context():
for i, (lang, code) in enumerate(re.findall(r'```(\w+)?\n([\s\S]*?)```', content)):
code = code.strip()
if not code or len(code) < 30: continue
lang_l = lang.lower() if lang else 'txt'
ext = EXT_MAP.get(lang_l, 'txt')
ts = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
fname = f"file_{ts}_{i+1}.{ext}"
udir = os.path.join('outputs', str(user_id)); os.makedirs(udir, exist_ok=True)
fpath = os.path.join(udir, fname)
with open(fpath,'w',encoding='utf-8') as fp: fp.write(code)
db.session.add(GeneratedFile(user_id=user_id, chat_id=chat_id, message_id=message_id,
filename=fname, filepath=fpath, language=lang_l, line_count=code.count('\n')+1))
db.session.commit()
@app.route('/api/files')
def list_files():
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
chat_id = request.args.get('chat_id')
q = GeneratedFile.query.filter_by(user_id=user.id)
if chat_id: q = q.filter_by(chat_id=chat_id)
files = q.order_by(GeneratedFile.created_at.desc()).limit(200).all()
return jsonify([{'id':f.id,'filename':f.filename,'language':f.language,
'line_count':f.line_count,'chat_id':f.chat_id,'message_id':f.message_id,
'created_at':f.created_at.isoformat()} for f in files])
@app.route('/api/files/<int:fid>/content')
def file_content(fid):
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
gf = GeneratedFile.query.filter_by(id=fid, user_id=user.id).first()
if not gf or not os.path.exists(gf.filepath): return jsonify({'error': 'Not found'}), 404
with open(gf.filepath,'r',encoding='utf-8',errors='replace') as fp: content = fp.read()
return jsonify({'content':content,'filename':gf.filename,'language':gf.language,'line_count':gf.line_count})
@app.route('/api/files/<int:fid>/download')
def download_file(fid):
user = get_current_user()
if not user: return jsonify({'error': 'Unauthorized'}), 401
gf = GeneratedFile.query.filter_by(id=fid, user_id=user.id).first()
if not gf or not os.path.exists(gf.filepath): return jsonify({'error': 'Not found'}), 404
return send_from_directory(os.path.abspath(os.path.dirname(gf.filepath)),
os.path.basename(gf.filepath), as_attachment=True)
@app.route('/')
def index(): return render_template('index.html')
with app.app_context():
db.create_all()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False)