Spaces:
Runtime error
Runtime error
| import time | |
| from base import JobInput | |
| from db import get_db_cursor | |
| from ml import ProcessorRegistry, Summarizer, Tagger | |
| SLEEP_INTERVAL = 5 | |
| processor_registry = ProcessorRegistry() | |
| summarizer = Summarizer() | |
| tagger = Tagger() | |
| print("loaded ML models") | |
| def check_pending_jobs() -> list[JobInput]: | |
| """Check DB for pending jobs""" | |
| with get_db_cursor() as cursor: | |
| # fetch pending jobs, join authro and content from entries table | |
| query = """ | |
| SELECT j.entry_id, e.author, e.source | |
| FROM jobs j | |
| JOIN entries e | |
| ON j.entry_id = e.id | |
| WHERE j.status = 'pending' | |
| """ | |
| res = list(cursor.execute(query)) | |
| return [ | |
| JobInput(id=_id, author=author, content=content) for _id, author, content in res | |
| ] | |
| def store( | |
| job: JobInput, | |
| *, | |
| summary: str, | |
| tags: list[str], | |
| processor_name: str, | |
| summarizer_name: str, | |
| tagger_name: str, | |
| ) -> None: | |
| with get_db_cursor() as cursor: | |
| # write to entries, summary, tags tables | |
| cursor.execute( | |
| ( | |
| "INSERT INTO summaries (entry_id, summary, summarizer_name)" | |
| " VALUES (?, ?, ?)" | |
| ), | |
| (job.id, summary, summarizer_name), | |
| ) | |
| cursor.executemany( | |
| "INSERT INTO tags (entry_id, tag, tagger_name) VALUES (?, ?, ?)", | |
| [(job.id, tag, tagger_name) for tag in tags], | |
| ) | |
| def process_job(job: JobInput) -> None: | |
| tic = time.perf_counter() | |
| print(f"Processing job for (id={job.id[:8]})") | |
| # care: acquire cursor (which leads to locking) as late as possible, since | |
| # the processing and we don't want to block other workers during that time | |
| try: | |
| processor = processor_registry.dispatch(job) | |
| processor_name = processor.get_name() | |
| processed = processor(job) | |
| tagger_name = tagger.get_name() | |
| tags = tagger(processed) | |
| summarizer_name = summarizer.get_name() | |
| summary = summarizer(processed) | |
| store( | |
| job, | |
| summary=summary, | |
| tags=tags, | |
| processor_name=processor_name, | |
| summarizer_name=summarizer_name, | |
| tagger_name=tagger_name, | |
| ) | |
| # update job status to done | |
| with get_db_cursor() as cursor: | |
| cursor.execute( | |
| "UPDATE jobs SET status = 'done' WHERE entry_id = ?", (job.id,) | |
| ) | |
| except Exception as e: | |
| # update job status to failed | |
| with get_db_cursor() as cursor: | |
| cursor.execute( | |
| "UPDATE jobs SET status = 'failed' WHERE entry_id = ?", (job.id,) | |
| ) | |
| print(f"Failed to process job for (id={job.id[:8]}): {e}") | |
| toc = time.perf_counter() | |
| print(f"Finished processing job (id={job.id[:8]}) in {toc - tic:0.3f} seconds") | |
| def main() -> None: | |
| while True: | |
| jobs = check_pending_jobs() | |
| if not jobs: | |
| print("No pending jobs found, sleeping...") | |
| time.sleep(SLEEP_INTERVAL) | |
| continue | |
| print(f"Found {len(jobs)} pending job(s), processing...") | |
| for job in jobs: | |
| process_job(job) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("Shutting down...") | |
| exit(0) | |