FrederickSundeep's picture
update file 038
84ec51f
from flask import Flask, request, jsonify, send_file, render_template, Response
from transformers import pipeline
from langchain.prompts import PromptTemplate
from reportlab.pdfgen import canvas
from datetime import datetime
import os
import uuid
import sqlite3
import csv
import zipfile
from io import StringIO, BytesIO
app = Flask(__name__)
# Load model (CPU)
generator = pipeline("text-generation", model="EleutherAI/gpt-neo-1.3B", device=-1)
# generator = pipeline("text-generation", model="bigscience/bloomz-560m", device=-1)
# Directories
os.makedirs("logs", exist_ok=True)
os.makedirs("pdfs", exist_ok=True)
os.makedirs("data", exist_ok=True)
# DB setup
DB_PATH = "data/tickets.db"
def init_db():
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS tickets (
id TEXT PRIMARY KEY,
first_name TEXT,
last_name TEXT,
email TEXT,
issue TEXT,
priority TEXT,
date TEXT,
generated TEXT,
created_at TEXT
)
''')
conn.commit()
init_db()
# Prompt template
template = PromptTemplate(
input_variables=["first_name", "last_name", "email", "date", "priority", "issue"],
template="""
You are an HR support assistant. Generate a professional HR helpdesk ticket.
Employee Details:
- First Name: {first_name}
- Last Name: {last_name}
- Email: {email}
- Issue Date: {date}
- Priority: {priority}
- Reported Issue: {issue}
Write a clear, concise ticket title and description suitable for HR review.
"""
)
@app.route("/")
def home():
return render_template("index.html")
@app.route("/generate", methods=["POST"])
def generate_ticket():
first_name = request.form.get("firstName")
last_name = request.form.get("lastName")
email = request.form.get("email")
issue = request.form.get("issue")
priority = request.form.get("priority")
date = request.form.get("date")
prompt = template.format(
first_name=first_name,
last_name=last_name,
email=email,
date=date,
priority=priority,
issue=issue
)
raw = generator(prompt,max_new_tokens=200,do_sample=True)[0]["generated_text"]
# Strip the prompt itself, leaving only the new content
if raw.startswith(prompt):
result = raw[len(prompt):].strip()
else:
result = raw.strip()
session_id = str(uuid.uuid4())
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Save log file
log_path = f"logs/{session_id}.txt"
with open(log_path, "w") as f:
f.write(f"Timestamp: {timestamp}\n")
f.write(f"First Name: {first_name}\nLast Name: {last_name}\n")
f.write(f"Email: {email}\nDate: {date}\nPriority: {priority}\n")
f.write(f"Issue: {issue}\n\nGenerated:\n{result}\n")
# Save to SQLite DB
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO tickets (id, first_name, last_name, email, issue, priority, date, generated, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
session_id, first_name, last_name, email,
issue, priority, date, result, timestamp
))
conn.commit()
return render_template("index.html", ticket=result, session_id=session_id)
@app.route("/download/<session_id>")
def download_pdf(session_id):
log_path = f"logs/{session_id}.txt"
pdf_path = f"pdfs/{session_id}.pdf"
if not os.path.exists(log_path):
return jsonify({"error": "Session not found"}), 404
with open(log_path, "r") as f:
content = f.read()
c = canvas.Canvas(pdf_path)
lines = content.splitlines()
y = 800
for line in lines:
c.drawString(50, y, line)
y -= 15
c.save()
return send_file(pdf_path, as_attachment=True)
@app.route("/tickets")
def view_tickets():
from_date = request.args.get("from")
to_date = request.args.get("to")
priority = request.args.get("priority")
search = request.args.get("search", "").lower()
page = int(request.args.get("page", 1))
per_page = 10
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
query = "SELECT * FROM tickets WHERE 1=1"
params = []
if from_date and to_date:
query += " AND date BETWEEN ? AND ?"
params.extend([from_date, to_date])
if priority:
query += " AND LOWER(priority) = ?"
params.append(priority.lower())
query += " ORDER BY date DESC"
c.execute(query, params)
rows = c.fetchall()
conn.close()
# Filter search
filtered = [r for r in rows if search in r[1].lower() or search in r[2].lower() or search in r[3].lower()]
total = len(filtered)
pages = (total + per_page - 1) // per_page
start = (page - 1) * per_page
paginated = filtered[start:start + per_page]
columns = ["id", "first_name", "last_name", "email", "issue", "priority", "date", "generated", "created_at"]
dict_rows = [dict(zip(columns, row)) for row in paginated]
return render_template("tickets.html", tickets=dict_rows, page=page, pages=pages, total=total)
@app.route("/ticket/<id>")
def view_ticket(id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT * FROM tickets WHERE id = ?", (id,))
row = c.fetchone()
conn.close()
if row:
ticket = {
"id": row[0],
"first_name": row[1],
"last_name": row[2],
"email": row[3],
"issue": row[4],
"priority": row[5],
"date": row[6],
"generated": row[7]
}
return render_template("ticket_detail.html", ticket=ticket)
else:
return "Ticket not found", 404
@app.route("/export/csv")
def export_csv():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT * FROM tickets")
rows = c.fetchall()
conn.close()
# Use StringIO here
output = StringIO()
writer = csv.writer(output)
writer.writerow(["ID", "First Name", "Last Name", "Email", "Issue", "Priority", "Date", "Generated"])
writer.writerows(rows)
# Encode string to bytes before returning
output.seek(0)
return Response(output.getvalue(), mimetype="text/csv",
headers={"Content-Disposition": "attachment;filename=tickets.csv"})
@app.route("/export/zip")
def export_zip():
zip_io = BytesIO()
with zipfile.ZipFile(zip_io, 'w') as zipf:
for file in os.listdir("pdfs"):
zipf.write(os.path.join("pdfs", file), arcname=file)
zip_io.seek(0)
return send_file(zip_io, mimetype="application/zip", as_attachment=True, download_name="tickets.zip")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)