Bhuvandesai's picture
Re-deploy llama.cpp + GGUF CPU serving (default Q4_K_M); fast CPU inference
e44cdab verified
Raw
History Blame Contribute Delete
9.71 kB
import os
import sys
import sqlite3
import uuid
import pandas as pd
import subprocess
import threading
import time
from flask import Flask, jsonify, request, render_template
app = Flask(__name__, template_folder="../templates", static_folder="../static")
# Global state
model_pipeline = None
model_loading = False
model_loading_error = None
train_process = None
train_log_path = "models/train.log"
job_store = {} # {job_id: {"status": "running"|"done"|"error", "result": {...}}}
def load_model_async(force=False):
global model_pipeline, model_loading, model_loading_error
try:
model_loading = True
model_loading_error = None
if force:
print("=== Thread: Reloading Model. Freeing memory... ===")
model_pipeline = None
import gc
gc.collect()
# torch is only present on the CUDA/dev build; the CPU Space serves via
# llama.cpp and has no torch, so guard the VRAM cleanup.
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except ImportError:
pass
time.sleep(1) # Give time for garbage collector
print("=== Thread: Loading Text-to-SQL Model ===")
# Import inside thread to prevent server startup delay
from inference import TextToSQLInference
model_pipeline = TextToSQLInference()
print("=== Thread: Model Loaded Successfully! ===")
except Exception as e:
model_loading_error = str(e)
print(f"=== Thread Error Loading Model: {model_loading_error} ===")
finally:
model_loading = False
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/model_status", methods=["GET"])
def model_status():
global model_pipeline, model_loading, model_loading_error
if model_pipeline is not None:
return jsonify({
"status": "loaded",
"has_adapter": model_pipeline.has_adapter,
"device": model_pipeline.device
})
elif model_loading:
return jsonify({"status": "loading"})
elif model_loading_error:
return jsonify({"status": "error", "error": model_loading_error})
else:
return jsonify({"status": "unloaded"})
@app.route("/api/load_model", methods=["POST"])
def load_model():
global model_pipeline, model_loading
data = request.json or {}
force = data.get("force", False)
if model_pipeline is not None and not force:
return jsonify({"status": "already_loaded"})
if model_loading:
return jsonify({"status": "loading"})
# Start loading in background thread
thread = threading.Thread(target=load_model_async, args=(force,))
thread.daemon = True
thread.start()
return jsonify({"status": "started"})
@app.route("/api/query", methods=["POST"])
def run_query():
global model_pipeline, job_store
if model_pipeline is None:
return jsonify({"success": False, "error": "Model is not loaded. Please wait for it to load."}), 400
data = request.json or {}
question = data.get("question", "")
use_adapter = data.get("use_adapter", True)
if not question:
return jsonify({"success": False, "error": "Question is required."}), 400
job_id = str(uuid.uuid4())[:8]
job_store[job_id] = {"status": "running", "result": None}
def run_job():
print(f"Job {job_id}: Question='{question}', UseAdapter={use_adapter}")
result = model_pipeline.query_pipeline(question, use_adapter=use_adapter)
job_store[job_id] = {"status": "done", "result": result}
print(f"Job {job_id}: Complete")
thread = threading.Thread(target=run_job)
thread.daemon = True
thread.start()
return jsonify({"job_id": job_id, "status": "running"})
@app.route("/api/query_result/<job_id>", methods=["GET"])
def query_result(job_id):
job = job_store.get(job_id)
if not job:
return jsonify({"status": "not_found"}), 404
return jsonify(job)
@app.route("/api/start_train", methods=["POST"])
def start_train():
global train_process
if train_process is not None and train_process.poll() is None:
return jsonify({"status": "running", "message": "Training is already in progress."})
# Prepare directory
os.makedirs("models", exist_ok=True)
# Empty log
with open(train_log_path, "w", encoding="utf-8") as f:
f.write("=== Fine-Tuning Process Initiated ===\n")
try:
# Launch python src/train.py as a separate subprocess
cmd = [sys.executable, "src/train.py"]
print(f"Launching training process: {' '.join(cmd)}")
# We write outputs to train.log
log_file = open(train_log_path, "a", encoding="utf-8")
# Ensure UTF-8 mode on Windows for loading libraries correctly
env = os.environ.copy()
env["PYTHONUTF8"] = "1"
train_process = subprocess.Popen(
cmd,
stdout=log_file,
stderr=subprocess.STDOUT,
cwd=os.getcwd(),
text=True,
env=env
)
return jsonify({"status": "started", "message": "Training process launched successfully."})
except Exception as e:
return jsonify({"status": "failed", "error": str(e)}), 500
@app.route("/api/train_status", methods=["GET"])
def train_status():
global train_process
# Read log file
log_content = ""
if os.path.exists(train_log_path):
try:
with open(train_log_path, "r", encoding="utf-8") as f:
# Read last 100 lines to keep request lightweight
lines = f.readlines()
log_content = "".join(lines[-100:])
except Exception as e:
log_content = f"Error reading logs: {str(e)}"
if train_process is None:
return jsonify({"status": "idle", "logs": log_content})
exit_code = train_process.poll()
if exit_code is None:
return jsonify({"status": "running", "logs": log_content})
elif exit_code == 0:
# Check if adapter directory was created to confirm success
adapter_exists = os.path.exists("models/phi3-text-to-sql-adapter")
return jsonify({
"status": "completed",
"exit_code": exit_code,
"logs": log_content,
"adapter_created": adapter_exists
})
else:
return jsonify({"status": "failed", "exit_code": exit_code, "logs": log_content})
@app.route("/api/schema", methods=["GET"])
def get_schema():
# Return structured schema details to display in a beautiful sidebar
schema = {
"departments": [
{"name": "id", "type": "INTEGER", "key": "PRIMARY KEY"},
{"name": "name", "type": "TEXT", "key": "UNIQUE"},
{"name": "manager_id", "type": "INTEGER", "key": "FOREIGN KEY (employees.id)"}
],
"employees": [
{"name": "id", "type": "INTEGER", "key": "PRIMARY KEY"},
{"name": "name", "type": "TEXT", "key": ""},
{"name": "department_id", "type": "INTEGER", "key": "FOREIGN KEY (departments.id)"},
{"name": "salary", "type": "INTEGER", "key": ""},
{"name": "hire_date", "type": "TEXT", "key": ""},
{"name": "manager_id", "type": "INTEGER", "key": "FOREIGN KEY (employees.id)"}
],
"products": [
{"name": "id", "type": "INTEGER", "key": "PRIMARY KEY"},
{"name": "name", "type": "TEXT", "key": "UNIQUE"},
{"name": "category", "type": "TEXT", "key": ""},
{"name": "price", "type": "REAL", "key": ""}
],
"sales": [
{"name": "id", "type": "INTEGER", "key": "PRIMARY KEY"},
{"name": "employee_id", "type": "INTEGER", "key": "FOREIGN KEY (employees.id)"},
{"name": "product_id", "type": "INTEGER", "key": "FOREIGN KEY (products.id)"},
{"name": "amount", "type": "REAL", "key": ""},
{"name": "quantity", "type": "INTEGER", "key": ""},
{"name": "sale_date", "type": "TEXT", "key": ""}
]
}
return jsonify(schema)
if __name__ == "__main__":
# Ensure required directories exist
os.makedirs("templates", exist_ok=True)
os.makedirs("static", exist_ok=True)
os.makedirs("models", exist_ok=True)
os.makedirs("data", exist_ok=True)
# Cold-start: regenerate SQLite DB if missing (e.g. first run in HF Space)
if not os.path.exists("data/company_sales.db"):
print("=== Database not found. Running setup... ===")
try:
sys.path.insert(0, os.path.dirname(__file__))
from database import setup_database
setup_database()
print("=== Database ready. ===")
except Exception as e:
print(f"=== Warning: Could not create database: {e} ===")
# Auto-load model on startup so users don't have to click "Load Model"
print("=== Auto-starting model load in background thread ===")
startup_thread = threading.Thread(target=load_model_async, args=(False,))
startup_thread.daemon = True
startup_thread.start()
# HF Spaces requires port 7860 and host 0.0.0.0
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port, debug=False)