choephix commited on
Commit
8db7b2b
·
1 Parent(s): 8c60e4e

Enhance job store functionality with file locking and improved JSON handling

Browse files

- Introduced file locking mechanisms in the JobStore class to ensure safe concurrent access to job metadata.
- Updated the read and write methods for JSON files to handle locking, preventing data corruption during simultaneous operations.
- Modified the read_meta method to provide a default value if the metadata file does not exist.
- Refactored the write_json method to create temporary files for safer writes, replacing the original file only after successful serialization.
- Updated the try.sh script to change the input image and include commented-out examples for additional inputs.

Files changed (2) hide show
  1. job_store.py +46 -9
  2. try.sh +5 -1
job_store.py CHANGED
@@ -1,5 +1,7 @@
1
  import json
2
  import shutil
 
 
3
  from datetime import datetime, timedelta, timezone
4
  from pathlib import Path
5
  from typing import Any
@@ -42,7 +44,7 @@ class JobStore:
42
  return self.job_dir(job_id).is_dir()
43
 
44
  def read_meta(self, job_id: str) -> dict[str, Any] | None:
45
- return self.read_json(self.job_dir(job_id) / "meta.json")
46
 
47
  def read_request(self, job_id: str) -> dict[str, Any] | None:
48
  return self.read_json(self.job_dir(job_id) / "request.json")
@@ -52,11 +54,12 @@ class JobStore:
52
 
53
  def update_status(self, job_dir: Path, status: str, **extra: Any) -> None:
54
  meta_path = job_dir / "meta.json"
55
- meta = self.read_json(meta_path, default={})
56
- meta.update(extra)
57
- meta["status"] = status
58
- meta["updated_at"] = utc_now().isoformat()
59
- self.write_json(meta_path, meta)
 
60
 
61
  def record_failure(
62
  self,
@@ -101,11 +104,45 @@ class JobStore:
101
  shutil.rmtree(child, ignore_errors=True)
102
 
103
  @staticmethod
104
- def write_json(path: Path, payload: dict[str, Any]) -> None:
105
- path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
  @staticmethod
108
- def read_json(path: Path, default: Any = None) -> Any:
109
  if not path.exists():
110
  return default
111
  return json.loads(path.read_text(encoding="utf-8"))
 
1
  import json
2
  import shutil
3
+ import fcntl
4
+ from contextlib import contextmanager
5
  from datetime import datetime, timedelta, timezone
6
  from pathlib import Path
7
  from typing import Any
 
44
  return self.job_dir(job_id).is_dir()
45
 
46
  def read_meta(self, job_id: str) -> dict[str, Any] | None:
47
+ return self.read_json(self.job_dir(job_id) / "meta.json", default={})
48
 
49
  def read_request(self, job_id: str) -> dict[str, Any] | None:
50
  return self.read_json(self.job_dir(job_id) / "request.json")
 
54
 
55
  def update_status(self, job_dir: Path, status: str, **extra: Any) -> None:
56
  meta_path = job_dir / "meta.json"
57
+ with self._locked_file(meta_path, exclusive=True):
58
+ meta = self._read_json_unlocked(meta_path, default={})
59
+ meta.update(extra)
60
+ meta["status"] = status
61
+ meta["updated_at"] = utc_now().isoformat()
62
+ self._write_json_unlocked(meta_path, meta)
63
 
64
  def record_failure(
65
  self,
 
104
  shutil.rmtree(child, ignore_errors=True)
105
 
106
  @staticmethod
107
+ @contextmanager
108
+ def _locked_file(path: Path, *, exclusive: bool):
109
+ lock_path = path.with_name(f"{path.name}.lock")
110
+ lock_path.parent.mkdir(parents=True, exist_ok=True)
111
+ with lock_path.open("a+", encoding="utf-8") as lock_file:
112
+ fcntl.flock(
113
+ lock_file.fileno(),
114
+ fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH,
115
+ )
116
+ try:
117
+ yield
118
+ finally:
119
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
120
+
121
+ @classmethod
122
+ def write_json(cls, path: Path, payload: dict[str, Any]) -> None:
123
+ path.parent.mkdir(parents=True, exist_ok=True)
124
+ with cls._locked_file(path, exclusive=True):
125
+ cls._write_json_unlocked(path, payload)
126
+
127
+ @classmethod
128
+ def read_json(cls, path: Path, default: Any = None) -> Any:
129
+ if not path.exists():
130
+ return default
131
+ with cls._locked_file(path, exclusive=False):
132
+ try:
133
+ return cls._read_json_unlocked(path, default=default)
134
+ except json.JSONDecodeError:
135
+ return default
136
+
137
+ @staticmethod
138
+ def _write_json_unlocked(path: Path, payload: dict[str, Any]) -> None:
139
+ serialized = json.dumps(payload, indent=2, sort_keys=True)
140
+ temp_path = path.with_name(f"{path.name}.{uuid4().hex}.tmp")
141
+ temp_path.write_text(serialized, encoding="utf-8")
142
+ temp_path.replace(path)
143
 
144
  @staticmethod
145
+ def _read_json_unlocked(path: Path, default: Any = None) -> Any:
146
  if not path.exists():
147
  return default
148
  return json.loads(path.read_text(encoding="utf-8"))
try.sh CHANGED
@@ -1,5 +1,9 @@
1
  node eggs/test4/main.js \
2
- eggs/inputs/0/tank.png \
 
 
 
 
3
  # <generate>
4
 
5
  # node eggs/test4/main.js \
 
1
  node eggs/test4/main.js \
2
+ eggs/inputs/0/armor.png \
3
+ # eggs/inputs/0/elephant.png \
4
+ # eggs/inputs/0/goofy.png \
5
+ # eggs/inputs/0/pika.png \
6
+ # eggs/inputs/0/tank.png \
7
  # <generate>
8
 
9
  # node eggs/test4/main.js \