Update app.py
Browse files
app.py
CHANGED
|
@@ -1,14 +1,45 @@
|
|
| 1 |
-
import
|
| 2 |
-
import
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 3 |
from pathlib import Path
|
| 4 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 5 |
OUT_DIR = "/data/image-shards"
|
| 6 |
|
| 7 |
-
|
| 8 |
-
|
| 9 |
-
|
| 10 |
-
|
| 11 |
-
|
| 12 |
-
|
| 13 |
-
|
| 14 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import os
|
| 2 |
+
import io
|
| 3 |
+
import json
|
| 4 |
+
import time
|
| 5 |
+
import socket
|
| 6 |
+
import asyncio
|
| 7 |
+
import threading
|
| 8 |
from pathlib import Path
|
| 9 |
|
| 10 |
+
import pyarrow as pa
|
| 11 |
+
import pyarrow.parquet as pq
|
| 12 |
+
import pandas as pd
|
| 13 |
+
|
| 14 |
OUT_DIR = "/data/image-shards"
|
| 15 |
|
| 16 |
+
def serve():
|
| 17 |
+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| 18 |
+
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
| 19 |
+
s.bind(("0.0.0.0", 7860))
|
| 20 |
+
s.listen(5)
|
| 21 |
+
print("✓ Listening on port 7860")
|
| 22 |
+
while True:
|
| 23 |
+
conn, _ = s.accept()
|
| 24 |
+
conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
|
| 25 |
+
conn.close()
|
| 26 |
+
|
| 27 |
+
def fix_shards():
|
| 28 |
+
shards = list(Path(OUT_DIR).glob("*.parquet"))
|
| 29 |
+
print(f"Found {len(shards)} shards")
|
| 30 |
+
for shard in shards:
|
| 31 |
+
table = pq.read_table(shard)
|
| 32 |
+
df = table.to_pandas()
|
| 33 |
+
df["image"] = df["image"].apply(
|
| 34 |
+
lambda b: {"bytes": b["bytes"], "path": None} if isinstance(b, dict) else {"bytes": b, "path": None}
|
| 35 |
+
)
|
| 36 |
+
pq.write_table(pa.Table.from_pandas(df), shard, compression="snappy")
|
| 37 |
+
print(f"✓ Fixed {shard.name}")
|
| 38 |
+
print("Done")
|
| 39 |
+
|
| 40 |
+
def main():
|
| 41 |
+
threading.Thread(target=serve, daemon=True).start()
|
| 42 |
+
fix_shards()
|
| 43 |
+
|
| 44 |
+
if __name__ == "__main__":
|
| 45 |
+
main()
|