Roman190928 commited on
Commit
f99323c
·
verified ·
1 Parent(s): b2f8c8a

Flush/upload incomplete buffer only on run end when enabled

Browse files
Files changed (1) hide show
  1. crawler/shards.py +9 -32
crawler/shards.py CHANGED
@@ -52,32 +52,8 @@ class ParquetShardWriter:
52
  await self.uploader.initialize()
53
 
54
  async def consume(self, record_queue: asyncio.Queue[dict[str, Any] | None]) -> None:
55
- allow_incomplete_upload = (
56
- self.config.enable_hf_upload and self.config.upload_incomplete_shards
57
- )
58
- loop = asyncio.get_running_loop()
59
- last_flush_started = loop.time()
60
  while True:
61
- try:
62
- if allow_incomplete_upload:
63
- timeout = self.config.incomplete_shard_flush_seconds
64
- if self.buffer:
65
- elapsed = loop.time() - last_flush_started
66
- timeout = max(
67
- 0.0,
68
- self.config.incomplete_shard_flush_seconds - elapsed,
69
- )
70
- item = await asyncio.wait_for(
71
- record_queue.get(),
72
- timeout=timeout,
73
- )
74
- else:
75
- item = await record_queue.get()
76
- except asyncio.TimeoutError:
77
- if self.buffer:
78
- await self.flush()
79
- last_flush_started = loop.time()
80
- continue
81
 
82
  if item is None:
83
  record_queue.task_done()
@@ -87,17 +63,18 @@ class ParquetShardWriter:
87
  self.buffer.append(item)
88
  if len(self.buffer) >= self.config.shard_size_rows:
89
  await self.flush()
90
- last_flush_started = loop.time()
91
- elif allow_incomplete_upload and self.buffer:
92
- elapsed = loop.time() - last_flush_started
93
- if elapsed >= self.config.incomplete_shard_flush_seconds:
94
- await self.flush()
95
- last_flush_started = loop.time()
96
  finally:
97
  record_queue.task_done()
98
 
99
  if self.buffer:
100
- await self.flush()
 
 
 
 
 
 
 
101
 
102
  async def flush(self) -> None:
103
  if not self.buffer:
 
52
  await self.uploader.initialize()
53
 
54
  async def consume(self, record_queue: asyncio.Queue[dict[str, Any] | None]) -> None:
 
 
 
 
 
55
  while True:
56
+ item = await record_queue.get()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
  if item is None:
59
  record_queue.task_done()
 
63
  self.buffer.append(item)
64
  if len(self.buffer) >= self.config.shard_size_rows:
65
  await self.flush()
 
 
 
 
 
 
66
  finally:
67
  record_queue.task_done()
68
 
69
  if self.buffer:
70
+ should_flush_incomplete = (
71
+ (not self.config.enable_hf_upload)
72
+ or self.config.upload_incomplete_shards
73
+ )
74
+ if should_flush_incomplete:
75
+ await self.flush()
76
+ else:
77
+ self.buffer = []
78
 
79
  async def flush(self) -> None:
80
  if not self.buffer: