| import os |
| import io |
| import json |
| import time |
| import socket |
| import asyncio |
| import threading |
| from pathlib import Path |
|
|
| import pyarrow as pa |
| import pyarrow.parquet as pq |
| import pandas as pd |
|
|
| OUT_DIR = "/data/image-shards" |
|
|
| def serve(): |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| s.bind(("0.0.0.0", 7860)) |
| s.listen(5) |
| print("β Listening on port 7860") |
| while True: |
| conn, _ = s.accept() |
| conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK") |
| conn.close() |
|
|
| def fix_shards(): |
| shards = list(Path(OUT_DIR).glob("*.parquet")) |
| print(f"Found {len(shards)} shards") |
| for shard in shards: |
| table = pq.read_table(shard) |
| df = table.to_pandas() |
| df["image"] = df["image"].apply( |
| lambda b: {"bytes": b["bytes"], "path": None} if isinstance(b, dict) else {"bytes": b, "path": None} |
| ) |
| pq.write_table(pa.Table.from_pandas(df), shard, compression="snappy") |
| print(f"β Fixed {shard.name}") |
| print("Done") |
|
|
| def main(): |
| threading.Thread(target=serve, daemon=True).start() |
| fix_shards() |
|
|
| if __name__ == "__main__": |
| main() |