from flask import Flask, request, jsonify, send_file, render_template, Response from transformers import pipeline from langchain.prompts import PromptTemplate from reportlab.pdfgen import canvas from datetime import datetime import os import uuid import sqlite3 import csv import zipfile from io import StringIO, BytesIO app = Flask(__name__) # Load model (CPU) generator = pipeline("text-generation", model="EleutherAI/gpt-neo-1.3B", device=-1) # generator = pipeline("text-generation", model="bigscience/bloomz-560m", device=-1) # Directories os.makedirs("logs", exist_ok=True) os.makedirs("pdfs", exist_ok=True) os.makedirs("data", exist_ok=True) # DB setup DB_PATH = "data/tickets.db" def init_db(): with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS tickets ( id TEXT PRIMARY KEY, first_name TEXT, last_name TEXT, email TEXT, issue TEXT, priority TEXT, date TEXT, generated TEXT, created_at TEXT ) ''') conn.commit() init_db() # Prompt template template = PromptTemplate( input_variables=["first_name", "last_name", "email", "date", "priority", "issue"], template=""" You are an HR support assistant. Generate a professional HR helpdesk ticket. Employee Details: - First Name: {first_name} - Last Name: {last_name} - Email: {email} - Issue Date: {date} - Priority: {priority} - Reported Issue: {issue} Write a clear, concise ticket title and description suitable for HR review. """ ) @app.route("/") def home(): return render_template("index.html") @app.route("/generate", methods=["POST"]) def generate_ticket(): first_name = request.form.get("firstName") last_name = request.form.get("lastName") email = request.form.get("email") issue = request.form.get("issue") priority = request.form.get("priority") date = request.form.get("date") prompt = template.format( first_name=first_name, last_name=last_name, email=email, date=date, priority=priority, issue=issue ) raw = generator(prompt,max_new_tokens=200,do_sample=True)[0]["generated_text"] # Strip the prompt itself, leaving only the new content if raw.startswith(prompt): result = raw[len(prompt):].strip() else: result = raw.strip() session_id = str(uuid.uuid4()) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Save log file log_path = f"logs/{session_id}.txt" with open(log_path, "w") as f: f.write(f"Timestamp: {timestamp}\n") f.write(f"First Name: {first_name}\nLast Name: {last_name}\n") f.write(f"Email: {email}\nDate: {date}\nPriority: {priority}\n") f.write(f"Issue: {issue}\n\nGenerated:\n{result}\n") # Save to SQLite DB with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO tickets (id, first_name, last_name, email, issue, priority, date, generated, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( session_id, first_name, last_name, email, issue, priority, date, result, timestamp )) conn.commit() return render_template("index.html", ticket=result, session_id=session_id) @app.route("/download/") def download_pdf(session_id): log_path = f"logs/{session_id}.txt" pdf_path = f"pdfs/{session_id}.pdf" if not os.path.exists(log_path): return jsonify({"error": "Session not found"}), 404 with open(log_path, "r") as f: content = f.read() c = canvas.Canvas(pdf_path) lines = content.splitlines() y = 800 for line in lines: c.drawString(50, y, line) y -= 15 c.save() return send_file(pdf_path, as_attachment=True) @app.route("/tickets") def view_tickets(): from_date = request.args.get("from") to_date = request.args.get("to") priority = request.args.get("priority") search = request.args.get("search", "").lower() page = int(request.args.get("page", 1)) per_page = 10 conn = sqlite3.connect(DB_PATH) c = conn.cursor() query = "SELECT * FROM tickets WHERE 1=1" params = [] if from_date and to_date: query += " AND date BETWEEN ? AND ?" params.extend([from_date, to_date]) if priority: query += " AND LOWER(priority) = ?" params.append(priority.lower()) query += " ORDER BY date DESC" c.execute(query, params) rows = c.fetchall() conn.close() # Filter search filtered = [r for r in rows if search in r[1].lower() or search in r[2].lower() or search in r[3].lower()] total = len(filtered) pages = (total + per_page - 1) // per_page start = (page - 1) * per_page paginated = filtered[start:start + per_page] columns = ["id", "first_name", "last_name", "email", "issue", "priority", "date", "generated", "created_at"] dict_rows = [dict(zip(columns, row)) for row in paginated] return render_template("tickets.html", tickets=dict_rows, page=page, pages=pages, total=total) @app.route("/ticket/") def view_ticket(id): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT * FROM tickets WHERE id = ?", (id,)) row = c.fetchone() conn.close() if row: ticket = { "id": row[0], "first_name": row[1], "last_name": row[2], "email": row[3], "issue": row[4], "priority": row[5], "date": row[6], "generated": row[7] } return render_template("ticket_detail.html", ticket=ticket) else: return "Ticket not found", 404 @app.route("/export/csv") def export_csv(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT * FROM tickets") rows = c.fetchall() conn.close() # Use StringIO here output = StringIO() writer = csv.writer(output) writer.writerow(["ID", "First Name", "Last Name", "Email", "Issue", "Priority", "Date", "Generated"]) writer.writerows(rows) # Encode string to bytes before returning output.seek(0) return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=tickets.csv"}) @app.route("/export/zip") def export_zip(): zip_io = BytesIO() with zipfile.ZipFile(zip_io, 'w') as zipf: for file in os.listdir("pdfs"): zipf.write(os.path.join("pdfs", file), arcname=file) zip_io.seek(0) return send_file(zip_io, mimetype="application/zip", as_attachment=True, download_name="tickets.zip") if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)