DarthBihan's picture
Update app.py
1192cc3 verified
raw
history blame
8.26 kB
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_jwt_extended import JWTManager, jwt_required, get_jwt_identity
from routes.auth import auth_bp
from dotenv import load_dotenv
from model import enhance_code
import tempfile
import os
import subprocess
import json
from pymongo import MongoClient
from routes.reviews import reviews_bp
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from datetime import datetime
from datetime import timedelta
from bson import ObjectId
from models.reviews import reviews_collection
from schemas import ScanRequest
from pydantic import ValidationError
from flask_compress import Compress
from flask_caching import Cache
import hashlib
load_dotenv()
app = Flask(__name__)
Compress(app)
CORS(app)
# Load environment variables
env = os.getenv('FLASK_ENV', 'development')
if env == 'development':
load_dotenv('.env.development')
else:
load_dotenv('.env.production')
if os.getenv("USE_REDIS", "false").lower() == "true":
cache = Cache(config={
"CACHE_TYPE": "RedisCache",
"CACHE_REDIS_URL": os.getenv("REDIS_URL", "redis://localhost:6379/0"),
"CACHE_DEFAULT_TIMEOUT": 3600
})
else:
cache = Cache(config={
"CACHE_TYPE": "SimpleCache",
"CACHE_DEFAULT_TIMEOUT": 3600
})
cache.init_app(app)
cache.init_app(app)
def files_hash(files):
h = hashlib.sha256()
for f in sorted(files, key=lambda x: x["filename"]):
h.update(f["filename"].encode())
h.update(b"\0")
h.update(f["content"].encode())
return h.hexdigest()
app.config['MONGO_URI'] = os.getenv("MONGO_URI")
app.config['SECRET_KEY'] = os.getenv('JWT_SECRET') or 'super-secret-key'
app.config["JWT_SECRET_KEY"] = os.getenv('JWT_SECRET') or 'super-secret-key'
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(minutes=60)
app.register_blueprint(reviews_bp)
jwt = JWTManager(app)
client = MongoClient(app.config['MONGO_URI'])
db = client["codewhisperer"]
users = db["users"]
enhance_history = db["enhance_history"]
scan_history = db["scan_history"]
app.register_blueprint(auth_bp, url_prefix="/api")
limiter = Limiter(
key_func=get_remote_address,
default_limits=["20 per minute"]
)
limiter.init_app(app)
@app.route('/api/scan', methods=['POST'])
@limiter.limit("5/minute")
@jwt_required()
def scan_code():
try:
data = request.get_json()
app.logger.info(f"Incoming request: {data}")
try:
req = ScanRequest(**data)
except ValidationError as e:
app.logger.error(f"Validation error: {e.errors()}")
return jsonify({"error": e.errors()}), 400
files = [f.dict() for f in req.files]
language = req.language.lower()
username = get_jwt_identity()
key = f"scan:{username}:{files_hash(files)}"
cached = cache.get(key)
if cached:
return jsonify({"result": cached, "cached": True})
with tempfile.TemporaryDirectory() as temp_dir:
for f in files:
path = os.path.join(temp_dir, f["filename"])
with open(path, "w", encoding="utf-8") as code_file:
code_file.write(f["content"])
if language == "python":
scan_command = ["python", "-m", "bandit", "-r", temp_dir, "-f", "json"]
elif language == "javascript":
scan_command = ["python", "-m", "semgrep", "--config=p/javascript", "--json", temp_dir]
else:
return jsonify({"error": "Unsupported language"}), 400
app.logger.info(f"Running: {' '.join(scan_command)}")
result = subprocess.run(scan_command, capture_output=True, text=True)
app.logger.info(f"stdout: {result.stdout[:500]}")
app.logger.info(f"stderr: {result.stderr}")
if result.returncode not in (0, 1, 2):
return jsonify({"error": result.stderr}), 500
try:
output_json = json.loads(result.stdout)
except Exception as e:
return jsonify({"error": f"JSON parse failed: {str(e)}", "raw": result.stdout}), 500
cache.set(key, output_json)
scan_history.insert_one({
"username": username,
"language": language,
"files": files,
"result": output_json,
"timestamp": datetime.utcnow().isoformat()
})
return jsonify({"result": output_json})
except Exception as e:
app.logger.error(f"Unexpected error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/api/health")
def health():
return jsonify({"status": "ok"})
@app.route('/api/enhance', methods=['POST'])
@limiter.limit("5/minute")
@jwt_required()
def enhance():
try:
data = request.get_json()
code = data.get("code", "")
language = data.get("language", "python").lower()
username = get_jwt_identity()
if language not in ["python", "javascript"]:
return jsonify({"error": "Unsupported language"}), 400
if not code.strip():
return jsonify({"error": "No code provided"}), 400
# 🔹 New format (returns dict)
result = enhance_code(code, language)
# Save to history (with candidates + explanations)
enhance_history.insert_one({
"username": username,
"code": code,
"language": language,
"enhanced_code": result["enhanced_code"],
"diff": result["diff"],
"candidates": result.get("candidates", []),
"explanations": result.get("explanations", []),
"timestamp": datetime.utcnow().isoformat()
})
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/history', methods=['GET'])
@limiter.limit("10/minute")
@jwt_required()
def history():
try:
username = get_jwt_identity()
# Fetch both histories
enhance_records = list(enhance_history.find({"username": username}).sort("timestamp", -1))
scan_records = list(scan_history.find({"username": username}).sort("timestamp", -1))
# Convert ObjectId to string & return only relevant fields
def clean(record, record_type):
return {
"id": str(record.get("_id")),
"language": record.get("language"),
"code": record.get("code"),
"enhanced_code": record.get("enhanced_code"),
"diff": record.get("diff"),
"candidates": record.get("candidates", []), # ✅ added
"explanations": record.get("explanations", []), # ✅ added
"result": record.get("result") if record_type == "scan" else None,
"timestamp": record.get("timestamp"),
}
enhance_list = [clean(r, "enhance") for r in enhance_records]
scan_list = [clean(r, "scan") for r in scan_records]
return jsonify({
"enhance": enhance_list,
"scan": scan_list
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/reviews", methods=["POST"])
@limiter.limit("5/minute")
def submit_review():
try:
data = request.get_json()
name = data.get("name")
email = data.get("email")
rating = data.get("rating")
review = data.get("review")
date = data.get("date", datetime.utcnow().isoformat())
if not all([name, email, rating, review]):
return jsonify({"error": "All fields are required"}), 400
review_doc = {
"name": name,
"email": email,
"rating": rating,
"review": review,
"date": date,
}
result = reviews_collection.insert_one(review_doc)
return jsonify({
"message": "Review submitted successfully",
"id": str(result.inserted_id)
}), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host="0.0.0.0", port=5000, debug=True)