Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from flask_jwt_extended import JWTManager, jwt_required, get_jwt_identity | |
| from routes.auth import auth_bp | |
| from dotenv import load_dotenv | |
| from model import enhance_code | |
| import tempfile | |
| import os | |
| import subprocess | |
| import json | |
| from pymongo import MongoClient | |
| from routes.reviews import reviews_bp | |
| from flask_limiter import Limiter | |
| from flask_limiter.util import get_remote_address | |
| from datetime import datetime | |
| from datetime import timedelta | |
| from bson import ObjectId | |
| from models.reviews import reviews_collection | |
| from schemas import ScanRequest | |
| from pydantic import ValidationError | |
| from flask_compress import Compress | |
| from flask_caching import Cache | |
| import hashlib | |
| load_dotenv() | |
| app = Flask(__name__) | |
| Compress(app) | |
| CORS(app) | |
| # Load environment variables | |
| env = os.getenv('FLASK_ENV', 'development') | |
| if env == 'development': | |
| load_dotenv('.env.development') | |
| else: | |
| load_dotenv('.env.production') | |
| if os.getenv("USE_REDIS", "false").lower() == "true": | |
| cache = Cache(config={ | |
| "CACHE_TYPE": "RedisCache", | |
| "CACHE_REDIS_URL": os.getenv("REDIS_URL", "redis://localhost:6379/0"), | |
| "CACHE_DEFAULT_TIMEOUT": 3600 | |
| }) | |
| else: | |
| cache = Cache(config={ | |
| "CACHE_TYPE": "SimpleCache", | |
| "CACHE_DEFAULT_TIMEOUT": 3600 | |
| }) | |
| cache.init_app(app) | |
| cache.init_app(app) | |
| def files_hash(files): | |
| h = hashlib.sha256() | |
| for f in sorted(files, key=lambda x: x["filename"]): | |
| h.update(f["filename"].encode()) | |
| h.update(b"\0") | |
| h.update(f["content"].encode()) | |
| return h.hexdigest() | |
| app.config['MONGO_URI'] = os.getenv("MONGO_URI") | |
| app.config['SECRET_KEY'] = os.getenv('JWT_SECRET') or 'super-secret-key' | |
| app.config["JWT_SECRET_KEY"] = os.getenv('JWT_SECRET') or 'super-secret-key' | |
| app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(minutes=60) | |
| app.register_blueprint(reviews_bp) | |
| jwt = JWTManager(app) | |
| client = MongoClient(app.config['MONGO_URI']) | |
| db = client["codewhisperer"] | |
| users = db["users"] | |
| enhance_history = db["enhance_history"] | |
| scan_history = db["scan_history"] | |
| app.register_blueprint(auth_bp, url_prefix="/api") | |
| limiter = Limiter( | |
| key_func=get_remote_address, | |
| default_limits=["20 per minute"] | |
| ) | |
| limiter.init_app(app) | |
| def scan_code(): | |
| try: | |
| data = request.get_json() | |
| app.logger.info(f"Incoming request: {data}") | |
| try: | |
| req = ScanRequest(**data) | |
| except ValidationError as e: | |
| app.logger.error(f"Validation error: {e.errors()}") | |
| return jsonify({"error": e.errors()}), 400 | |
| files = [f.dict() for f in req.files] | |
| language = req.language.lower() | |
| username = get_jwt_identity() | |
| key = f"scan:{username}:{files_hash(files)}" | |
| cached = cache.get(key) | |
| if cached: | |
| return jsonify({"result": cached, "cached": True}) | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| for f in files: | |
| path = os.path.join(temp_dir, f["filename"]) | |
| with open(path, "w", encoding="utf-8") as code_file: | |
| code_file.write(f["content"]) | |
| if language == "python": | |
| scan_command = ["python", "-m", "bandit", "-r", temp_dir, "-f", "json"] | |
| elif language == "javascript": | |
| scan_command = ["python", "-m", "semgrep", "--config=p/javascript", "--json", temp_dir] | |
| else: | |
| return jsonify({"error": "Unsupported language"}), 400 | |
| app.logger.info(f"Running: {' '.join(scan_command)}") | |
| result = subprocess.run(scan_command, capture_output=True, text=True) | |
| app.logger.info(f"stdout: {result.stdout[:500]}") | |
| app.logger.info(f"stderr: {result.stderr}") | |
| if result.returncode not in (0, 1, 2): | |
| return jsonify({"error": result.stderr}), 500 | |
| try: | |
| output_json = json.loads(result.stdout) | |
| except Exception as e: | |
| return jsonify({"error": f"JSON parse failed: {str(e)}", "raw": result.stdout}), 500 | |
| cache.set(key, output_json) | |
| scan_history.insert_one({ | |
| "username": username, | |
| "language": language, | |
| "files": files, | |
| "result": output_json, | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| return jsonify({"result": output_json}) | |
| except Exception as e: | |
| app.logger.error(f"Unexpected error: {str(e)}") | |
| return jsonify({"error": str(e)}), 500 | |
| def health(): | |
| return jsonify({"status": "ok"}) | |
| def enhance(): | |
| try: | |
| data = request.get_json() | |
| code = data.get("code", "") | |
| language = data.get("language", "python").lower() | |
| username = get_jwt_identity() | |
| if language not in ["python", "javascript"]: | |
| return jsonify({"error": "Unsupported language"}), 400 | |
| if not code.strip(): | |
| return jsonify({"error": "No code provided"}), 400 | |
| # 🔹 New format (returns dict) | |
| result = enhance_code(code, language) | |
| # Save to history (with candidates + explanations) | |
| enhance_history.insert_one({ | |
| "username": username, | |
| "code": code, | |
| "language": language, | |
| "enhanced_code": result["enhanced_code"], | |
| "diff": result["diff"], | |
| "candidates": result.get("candidates", []), | |
| "explanations": result.get("explanations", []), | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| return jsonify(result), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def history(): | |
| try: | |
| username = get_jwt_identity() | |
| # Fetch both histories | |
| enhance_records = list(enhance_history.find({"username": username}).sort("timestamp", -1)) | |
| scan_records = list(scan_history.find({"username": username}).sort("timestamp", -1)) | |
| # Convert ObjectId to string & return only relevant fields | |
| def clean(record, record_type): | |
| return { | |
| "id": str(record.get("_id")), | |
| "language": record.get("language"), | |
| "code": record.get("code"), | |
| "enhanced_code": record.get("enhanced_code"), | |
| "diff": record.get("diff"), | |
| "candidates": record.get("candidates", []), # ✅ added | |
| "explanations": record.get("explanations", []), # ✅ added | |
| "result": record.get("result") if record_type == "scan" else None, | |
| "timestamp": record.get("timestamp"), | |
| } | |
| enhance_list = [clean(r, "enhance") for r in enhance_records] | |
| scan_list = [clean(r, "scan") for r in scan_records] | |
| return jsonify({ | |
| "enhance": enhance_list, | |
| "scan": scan_list | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def submit_review(): | |
| try: | |
| data = request.get_json() | |
| name = data.get("name") | |
| email = data.get("email") | |
| rating = data.get("rating") | |
| review = data.get("review") | |
| date = data.get("date", datetime.utcnow().isoformat()) | |
| if not all([name, email, rating, review]): | |
| return jsonify({"error": "All fields are required"}), 400 | |
| review_doc = { | |
| "name": name, | |
| "email": email, | |
| "rating": rating, | |
| "review": review, | |
| "date": date, | |
| } | |
| result = reviews_collection.insert_one(review_doc) | |
| return jsonify({ | |
| "message": "Review submitted successfully", | |
| "id": str(result.inserted_id) | |
| }), 201 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(host="0.0.0.0", port=5000, debug=True) | |