import sqlite3 import os import pandas as pd from UserDetail import UserDetail from datetime import datetime # Use persistent storage path DATA_DIR = "/data" DB_PATH = os.path.join(DATA_DIR, "school.sqlite") # Ensure data directory exists os.makedirs(DATA_DIR, exist_ok=True) class Database: def __init__(self): self.conn = sqlite3.connect(DB_PATH, check_same_thread=False) self.create_tables() def create_tables(self): c = self.conn.cursor() # Student details table c.execute(''' CREATE TABLE IF NOT EXISTS student_detail ( student_id TEXT PRIMARY KEY, name TEXT NOT NULL, dob DATE NOT NULL, class TEXT NOT NULL, section TEXT NOT NULL, father_name TEXT, mother_name TEXT, address TEXT, phone TEXT, email TEXT, blood_group TEXT, registration_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Marks table c.execute(''' CREATE TABLE IF NOT EXISTS marks ( id INTEGER PRIMARY KEY AUTOINCREMENT, student_id TEXT NOT NULL, exam_month TEXT NOT NULL, exam_year INTEGER NOT NULL, subject1_name TEXT DEFAULT 'Tamil', subject1_marks INTEGER, subject2_name TEXT DEFAULT 'English', subject2_marks INTEGER, subject3_name TEXT DEFAULT 'Mathematics', subject3_marks INTEGER, subject4_name TEXT DEFAULT 'Science', subject4_marks INTEGER, subject5_name TEXT DEFAULT 'Social Science', subject5_marks INTEGER, total_marks INTEGER, percentage REAL, entry_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (student_id) REFERENCES student_detail (student_id), UNIQUE(student_id, exam_month, exam_year) ) ''') self.conn.commit() c.close() def generate_student_id(self, class_std): c = self.conn.cursor() year = datetime.now().year prefix = f"SGH{year}{class_std}" c.execute(''' SELECT student_id FROM student_detail WHERE student_id LIKE ? ORDER BY student_id DESC LIMIT 1 ''', (f"{prefix}%",)) result = c.fetchone() if result: last_id = result[0] sequence = int(last_id.replace(prefix, "")) + 1 else: sequence = 1 c.close() return f"{prefix}{sequence:03d}" def insert_student(self, userDetail): c = self.conn.cursor() try: c.execute(''' INSERT INTO student_detail (student_id, name, dob, class, section, father_name, mother_name, address, phone, email, blood_group) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (userDetail.student_id, userDetail.student_name, userDetail.dob, userDetail.class_std, userDetail.section, userDetail.father_name, userDetail.mother_name, userDetail.address, userDetail.phone, userDetail.email, userDetail.blood_group)) self.conn.commit() c.close() return True, "Student registered successfully!" except sqlite3.IntegrityError: c.close() return False, "Student ID already exists!" except Exception as e: c.close() return False, f"Error: {str(e)}" def get_student(self, student_id): c = self.conn.cursor() c.execute('SELECT * FROM student_detail WHERE student_id = ?', (student_id,)) row = c.fetchone() c.close() if row: return UserDetail(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10]) return None def get_all_students(self, class_filter=None): c = self.conn.cursor() if class_filter: c.execute('SELECT * FROM student_detail WHERE class = ? ORDER BY name', (class_filter,)) else: c.execute('SELECT * FROM student_detail ORDER BY class, name') rows = c.fetchall() c.close() return rows def update_student(self, student_id, **kwargs): c = self.conn.cursor() update_fields = [] values = [] for key, value in kwargs.items(): if value: update_fields.append(f"{key} = ?") values.append(value) if update_fields: values.append(student_id) query = f"UPDATE student_detail SET {', '.join(update_fields)} WHERE student_id = ?" c.execute(query, values) self.conn.commit() c.close() def insert_marks(self, student_id, exam_month, exam_year, subject_names, marks_list): c = self.conn.cursor() try: total = sum(marks_list) percentage = (total / 500) * 100 c.execute(''' INSERT OR REPLACE INTO marks (student_id, exam_month, exam_year, subject1_name, subject1_marks, subject2_name, subject2_marks, subject3_name, subject3_marks, subject4_name, subject4_marks, subject5_name, subject5_marks, total_marks, percentage) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (student_id, exam_month, exam_year, subject_names[0], marks_list[0], subject_names[1], marks_list[1], subject_names[2], marks_list[2], subject_names[3], marks_list[3], subject_names[4], marks_list[4], total, percentage)) self.conn.commit() c.close() return True, "Marks updated successfully!" except Exception as e: c.close() return False, f"Error: {str(e)}" def get_student_marks(self, student_id): c = self.conn.cursor() c.execute(''' SELECT exam_month, exam_year, subject1_name, subject1_marks, subject2_name, subject2_marks, subject3_name, subject3_marks, subject4_name, subject4_marks, subject5_name, subject5_marks, total_marks, percentage, entry_date FROM marks WHERE student_id = ? ORDER BY exam_year DESC, exam_month DESC ''', (student_id,)) rows = c.fetchall() c.close() return rows def get_class_performance(self, class_std, exam_month=None, exam_year=None): c = self.conn.cursor() query = ''' SELECT s.student_id, s.name, m.exam_month, m.exam_year, m.total_marks, m.percentage FROM student_detail s JOIN marks m ON s.student_id = m.student_id WHERE s.class = ? ''' params = [class_std] if exam_month: query += ' AND m.exam_month = ?' params.append(exam_month) if exam_year: query += ' AND m.exam_year = ?' params.append(exam_year) query += ' ORDER BY m.percentage DESC' c.execute(query, params) rows = c.fetchall() c.close() return rows def get_statistics(self): c = self.conn.cursor() # Total students c.execute('SELECT COUNT(*) FROM student_detail') total_students = c.fetchone()[0] # Students by class c.execute('SELECT class, COUNT(*) FROM student_detail GROUP BY class') class_wise = c.fetchall() # Recent exams c.execute(''' SELECT COUNT(DISTINCT student_id) FROM marks WHERE exam_year = ? ''', (datetime.now().year,)) students_with_marks = c.fetchone()[0] c.close() return { 'total_students': total_students, 'class_wise': class_wise, 'students_with_marks': students_with_marks } def delete_student(self, student_id): c = self.conn.cursor() try: c.execute('DELETE FROM marks WHERE student_id = ?', (student_id,)) c.execute('DELETE FROM student_detail WHERE student_id = ?', (student_id,)) self.conn.commit() c.close() return True, "Student deleted successfully!" except Exception as e: c.close() return False, f"Error: {str(e)}" def __del__(self): if hasattr(self, 'conn'): self.conn.close()