Spaces:
Runtime error
Runtime error
| import logging | |
| import uuid | |
| from fastapi import FastAPI | |
| from gistillery.base import EntriesResult, JobStatus, JobStatusResult, RequestInput | |
| from gistillery.db import TABLES, get_db_cursor | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.DEBUG) | |
| app = FastAPI() | |
| # status | |
| def status() -> str: | |
| return "OK" | |
| def submit_job(input: RequestInput) -> str: | |
| # submit a new job, poor man's job queue | |
| _id = uuid.uuid4().hex | |
| logger.info(f"Submitting job for (_id={_id[:8]})") | |
| with get_db_cursor() as cursor: | |
| # create a job | |
| query = "INSERT INTO jobs (entry_id, status) VALUES (?, ?)" | |
| cursor.execute(query, (_id, "pending")) | |
| # create an entry | |
| query = "INSERT INTO entries (id, author, source) VALUES (?, ?, ?)" | |
| cursor.execute(query, (_id, input.author, input.content)) | |
| return f"Submitted job {_id}" | |
| def check_job_status(_id: str) -> JobStatusResult: | |
| with get_db_cursor() as cursor: | |
| cursor.execute( | |
| "SELECT status, last_updated FROM jobs WHERE entry_id = ?", (_id,) | |
| ) | |
| result = cursor.fetchone() | |
| if result is None: | |
| return JobStatusResult(id=_id, status=JobStatus.not_found, last_updated=None) | |
| status, last_updated = result | |
| return JobStatusResult(id=_id, status=status, last_updated=last_updated) | |
| def recent() -> list[EntriesResult]: | |
| with get_db_cursor() as cursor: | |
| # get the last 10 entries, join summary and tag, where each tag is | |
| # joined to a comma separated str | |
| cursor.execute(""" | |
| SELECT e.id, e.author, s.summary, GROUP_CONCAT(t.tag, ",") tags, e.created_at | |
| FROM entries e | |
| JOIN summaries s ON e.id = s.entry_id | |
| JOIN tags t ON e.id = t.entry_id | |
| GROUP BY e.id | |
| ORDER BY e.created_at DESC | |
| LIMIT 10 | |
| """) | |
| results = cursor.fetchall() | |
| entries = [] | |
| for _id, author, summary, tags, date in results: | |
| entry = EntriesResult( | |
| id=_id, author=author, summary=summary, tags=tags.split(","), date=date | |
| ) | |
| entries.append(entry) | |
| return entries | |
| def recent_tag(tag: str) -> list[EntriesResult]: | |
| if not tag.startswith("#"): | |
| tag = "#" + tag | |
| # same as recent, but filter by tag | |
| with get_db_cursor() as cursor: | |
| cursor.execute( | |
| """ | |
| SELECT e.id, e.author, s.summary, GROUP_CONCAT(t.tag, ",") tags, e.created_at | |
| FROM entries e | |
| JOIN summaries s ON e.id = s.entry_id | |
| JOIN tags t ON e.id = t.entry_id | |
| WHERE e.id IN ( | |
| SELECT entry_id FROM tags WHERE tag = ? | |
| ) | |
| GROUP BY e.id | |
| ORDER BY e.created_at DESC | |
| LIMIT 10 | |
| """, | |
| (tag,), | |
| ) | |
| results = cursor.fetchall() | |
| entries = [] | |
| for _id, author, summary, tags, date in results: | |
| entry = EntriesResult( | |
| id=_id, author=author, summary=summary, tags=tags.split(","), date=date | |
| ) | |
| entries.append(entry) | |
| return entries | |
| def clear() -> str: | |
| # clear all tables | |
| logger.warning("Clearing all tables") | |
| with get_db_cursor() as cursor: | |
| for table_name in TABLES: | |
| cursor.execute(f"DELETE FROM {table_name}") | |
| return "OK" | |