Anish530 commited on
Commit
0196ddb
·
1 Parent(s): de35def

Added separate storage and fixed worker, tasks, which previously got stuck on PROCESSING (NULL PROCESSOR)

Browse files
backend/app/core/storage.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from abc import ABC, abstractmethod
3
+ from fastapi import UploadFile
4
+
5
+ class StorageProvider(ABC):
6
+ @abstractmethod
7
+ def save(self, file: UploadFile, filename: str) -> str:
8
+ pass
9
+
10
+ @abstractmethod
11
+ def delete(self, filepath: str) -> bool:
12
+ pass
13
+
14
+ class LocalStorageProvider(StorageProvider):
15
+ def __init__(self, upload_dir: str = "uploads"):
16
+ self.upload_dir = upload_dir
17
+ os.makedirs(self.upload_dir, exist_ok=True)
18
+
19
+ def save(self, file: UploadFile, filename: str) -> str:
20
+ filepath = os.path.join(self.upload_dir, filename)
21
+ with open(filepath, "wb") as buffer:
22
+ while chunk := file.file.read(1024 * 1024):
23
+ buffer.write(chunk)
24
+ return filepath
25
+
26
+ def delete(self, filepath: str) -> bool:
27
+ if os.path.exists(filepath):
28
+ os.remove(filepath)
29
+ return True
30
+ return False
31
+
32
+ active_storage: StorageProvider = LocalStorageProvider()
backend/app/models/job_model.py CHANGED
@@ -6,7 +6,7 @@ class Job(Base):
6
  __tablename__ = "jobs"
7
 
8
  job_id = Column(String, primary_key=True, index=True)
9
- file_id = Column(Integer, ForeignKey("file.id"), nullable=False)
10
  status = Column(String, default="QUEUED")
11
  created_at = Column(DateTime, default=lambda: datetime.now(UTC))
12
  started_at = Column(DateTime, nullable=True)
 
6
  __tablename__ = "jobs"
7
 
8
  job_id = Column(String, primary_key=True, index=True)
9
+ file_id = Column(Integer, ForeignKey("file.id", ondelete="CASCADE"), nullable=False)
10
  status = Column(String, default="QUEUED")
11
  created_at = Column(DateTime, default=lambda: datetime.now(UTC))
12
  started_at = Column(DateTime, nullable=True)
backend/app/services/file_delete_service.py CHANGED
@@ -2,6 +2,7 @@ import os
2
  from sqlalchemy.orm import Session
3
  from fastapi import HTTPException
4
  from app.models.file_model import File
 
5
 
6
  def delete_file_service(db: Session, file_id: int, user_id: int):
7
  file = db.query(File).filter(File.id == file_id).first()
@@ -12,8 +13,7 @@ def delete_file_service(db: Session, file_id: int, user_id: int):
12
  if file.owner_id != user_id:
13
  raise HTTPException(status_code=403, detail="Not Authorized to delete this file")
14
 
15
- if os.path.exists(file.filepath):
16
- os.remove(file.filepath)
17
 
18
  db.delete(file)
19
  db.commit()
 
2
  from sqlalchemy.orm import Session
3
  from fastapi import HTTPException
4
  from app.models.file_model import File
5
+ from app.core.storage import active_storage
6
 
7
  def delete_file_service(db: Session, file_id: int, user_id: int):
8
  file = db.query(File).filter(File.id == file_id).first()
 
13
  if file.owner_id != user_id:
14
  raise HTTPException(status_code=403, detail="Not Authorized to delete this file")
15
 
16
+ active_storage.delete(file.filepath)
 
17
 
18
  db.delete(file)
19
  db.commit()
backend/app/utils/file_handler.py CHANGED
@@ -1,6 +1,4 @@
1
- from kombu.exceptions import HttpError
2
- from h11._abnf import status_code
3
- from cv2 import detail
4
  import os
5
  import uuid
6
  import magic
@@ -51,11 +49,4 @@ def generate_unique_filename(filename: str) -> str:
51
  return unique_name
52
 
53
  def save_file(file: UploadFile, filename: str):
54
- os.makedirs(UPLOAD_DIR, exist_ok=True)
55
- filepath = os.path.join(UPLOAD_DIR, filename)
56
-
57
- with open(filepath, "wb") as buffer:
58
- while chunk := file.file.read(1024*1024): # Uploading File in Chunks (1 MB PER Chunk)
59
- buffer.write(chunk)
60
-
61
- return filepath
 
1
+ from app.core.storage import active_storage
 
 
2
  import os
3
  import uuid
4
  import magic
 
49
  return unique_name
50
 
51
  def save_file(file: UploadFile, filename: str):
52
+ return active_storage.save(file, filename)
 
 
 
 
 
 
 
backend/app/worker/tasks.py CHANGED
@@ -46,6 +46,7 @@ def process_file_task(self, file_id: int):
46
  job.status = "COMPLETED"
47
  job.finished_at = datetime.now(UTC)
48
  logger.info(f"Successfully processed file_id: {file_id}")
 
49
 
50
  except SoftTimeLimitExceeded as exc:
51
  logger.error(f"Task for file_id {file_id} timed out. Requeuing...")
@@ -64,7 +65,12 @@ def process_file_task(self, file_id: int):
64
  db.commit()
65
  db.close()
66
  raise self.retry(exc=exc)
 
 
 
 
 
 
67
 
68
  finally:
69
- db.commit()
70
  db.close()
 
46
  job.status = "COMPLETED"
47
  job.finished_at = datetime.now(UTC)
48
  logger.info(f"Successfully processed file_id: {file_id}")
49
+ db.commit()
50
 
51
  except SoftTimeLimitExceeded as exc:
52
  logger.error(f"Task for file_id {file_id} timed out. Requeuing...")
 
65
  db.commit()
66
  db.close()
67
  raise self.retry(exc=exc)
68
+
69
+ except BaseException as exc:
70
+ job.status = "FAILED"
71
+ job.error_message = f"CRITICAL WORKER DEATH {type(exc).__name__}"
72
+ db.commit()
73
+ raise
74
 
75
  finally:
 
76
  db.close()