Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import os | |
| from src.schema import Skill | |
| class SkillsDB: | |
| def __init__(self, db_name="skills.db"): | |
| # Check if the database file exists | |
| db_exists = os.path.exists(db_name) | |
| # Connect to the database | |
| self.conn = sqlite3.connect(db_name) | |
| self.cursor = self.conn.cursor() | |
| # If the database file didn't exist before, create and initialize the database | |
| if not db_exists: | |
| self.create_db() | |
| def create_db(self): | |
| # Create skill table | |
| self.cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS skills ( | |
| id INTEGER PRIMARY KEY, | |
| repo_id TEXT NOT NULL, | |
| skill_name TEXT NOT NULL, | |
| skill_description TEXT, | |
| author TEXT, | |
| created_at TEXT, | |
| skill_usage_example TEXT, | |
| skill_program_language TEXT, | |
| skill_tags TEXT | |
| ); | |
| ''') | |
| # Create tags table | |
| self.cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS tags ( | |
| id INTEGER PRIMARY KEY, | |
| tag TEXT NOT NULL UNIQUE | |
| ); | |
| ''') | |
| self.conn.commit() | |
| def add_skill(self, skill): | |
| self.cursor.execute('SELECT id FROM skills WHERE skill_name = ? AND author = ?;', (skill.skill_name, skill.author)) | |
| if self.cursor.fetchone() is not None: | |
| return f"Skill with name '{skill.skill_name}' by author '{skill.author}' already exists!" | |
| # Handle tags: check if they exist; if not, insert them | |
| tag_ids = [] | |
| for tag in skill.skill_tags: | |
| self.cursor.execute('SELECT id FROM tags WHERE tag = ?;', (tag,)) | |
| tag_id = self.cursor.fetchone() | |
| if tag_id is None: | |
| self.cursor.execute('INSERT INTO tags (tag) VALUES (?);', (tag,)) | |
| tag_id = self.cursor.lastrowid | |
| else: | |
| tag_id = tag_id[0] | |
| tag_ids.append(str(tag_id)) | |
| tags_str = ",".join(tag_ids) | |
| # Insert skill into skills table | |
| self.cursor.execute(''' | |
| INSERT INTO skills (repo_id, skill_name, skill_description, author, created_at, skill_usage_example, skill_program_language, skill_tags) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?); | |
| ''', (skill.repo_id, skill.skill_name, skill.skill_description, skill.author, skill.created_at, skill.skill_usage_example, skill.skill_program_language, tags_str)) | |
| self.conn.commit() | |
| return "ok" | |
| def get_skills(self): | |
| # Fetch all skills from the skills table along with column names | |
| self.cursor.execute('SELECT * FROM skills;') | |
| col_names = [col[0] for col in self.cursor.description] | |
| skills_data = self.cursor.fetchall() | |
| # Extract data using column names and create Skill objects | |
| skills = [] | |
| for skill_data in skills_data: | |
| skill_dict = dict(zip(col_names, skill_data)) | |
| tag_ids = skill_dict['skill_tags'].split(',') | |
| self.cursor.execute('SELECT tag FROM tags WHERE id IN (%s);' % ','.join(['?'] * len(tag_ids)), tag_ids) | |
| tags = [tag[0] for tag in self.cursor.fetchall()] | |
| skill_obj = Skill(skill_dict['repo_id'], skill_dict['skill_name'], skill_dict['skill_description'], skill_dict['author'], | |
| skill_dict['created_at'], skill_dict['skill_usage_example'], skill_dict['skill_program_language'], tags) | |
| skills.append(skill_obj) | |
| return skills | |
| def get_tags(self): | |
| # Fetch all tags from the tags table | |
| self.cursor.execute('SELECT tag FROM tags;') | |
| tags = self.cursor.fetchall() | |
| if tags is not None: | |
| return [tag[0] for tag in tags] | |
| return [] | |
| def close(self): | |
| self.conn.close() |