Face-Aging / test_compress.py
Robert
Show logs
b733a1f
import datetime
import hashlib
import json
import os
import pathlib
import re
import shutil
import tempfile
import threading
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Mapping
import boto3
from botocore.exceptions import ClientError
from PIL import Image, ImageFile, UnidentifiedImageError
AWS_REGION = os.environ.get("AWS_DEFAULT_REGION")
BUCKET_NAME = os.environ.get("AWS_BUCKET_NAME")
try:
s3 = boto3.client("s3", region_name=AWS_REGION) if AWS_REGION and BUCKET_NAME else None
except Exception as exc:
s3 = None
ImageFile.LOAD_TRUNCATED_IMAGES = True
_SANITIZE_PATTERN = re.compile(r"[^A-Za-z0-9._-]")
_HASH_TTL = datetime.timedelta(hours=1)
_RATE_LIMIT_WINDOW = datetime.timedelta(seconds=60)
_RATE_LIMIT_MAX = 12
_MAX_UPLOAD_BYTES = 20 * 1024 * 1024
_DEDUP_PREFIX = "dedupe/"
_recent_hashes: dict[str, datetime.datetime] = {}
_rate_tracker: dict[str, list[datetime.datetime]] = {}
_lock = threading.Lock()
_REPO_ROOT = Path(__file__).resolve().parent
try:
_EXAMPLE_ROOTS = {(_REPO_ROOT / "examples").resolve()}
except FileNotFoundError:
_EXAMPLE_ROOTS = set()
@dataclass
class PreparedUpload:
path: str
suffix: str
format_name: str
cleanup: bool = True
def _sanitize_stem(raw_stem: str) -> str:
sanitized = _SANITIZE_PATTERN.sub("_", raw_stem or "")
sanitized = sanitized.strip("._-")
trimmed = sanitized[:50]
return trimmed or "image"
def _as_rgb(image: Image.Image) -> Image.Image:
if image.mode in ("RGB", "L"):
return image.convert("RGB")
if image.mode in ("RGBA", "LA") or (image.mode == "P" and "transparency" in image.info):
rgba = image.convert("RGBA")
alpha = rgba.split()[-1]
canvas = Image.new("RGB", rgba.size, (255, 255, 255))
canvas.paste(rgba, mask=alpha)
return canvas
return image.convert("RGB")
def _mktemp(suffix: str) -> str:
fd, tmp_path = tempfile.mkstemp(prefix="face-aging_", suffix=suffix)
os.close(fd)
return tmp_path
def _save_jpeg(image: Image.Image, *, quality: int) -> PreparedUpload:
path = _mktemp(".jpg")
image.save(
path,
format="JPEG",
quality=quality,
optimize=True,
progressive=True,
)
return PreparedUpload(path=path, suffix=".jpg", format_name="jpeg")
def _copy_as_webp(local_path: str) -> PreparedUpload:
path = _mktemp(".webp")
shutil.copyfile(local_path, path)
return PreparedUpload(path=path, suffix=".webp", format_name="webp")
def _prepare_upload(local_path: str) -> PreparedUpload:
prepared: Image.Image | None = None
source_format = ""
try:
with Image.open(local_path) as img:
img.load()
source_format = (img.format or "").upper()
if source_format != "WEBP":
prepared = _as_rgb(img)
if source_format == "WEBP":
return _copy_as_webp(local_path)
if prepared is None:
with Image.open(local_path) as img:
prepared = _as_rgb(img)
return _save_jpeg(prepared, quality=82)
except UnidentifiedImageError as exc:
raise ValueError(f"Unsupported or corrupted image: {exc}") from exc
except OSError as exc:
raise ValueError(f"Failed to process image: {exc}") from exc
finally:
if prepared is not None:
prepared.close()
def _hash_file(path: str) -> str:
digest = hashlib.sha256()
with open(path, "rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def _digest_marker_key(digest: str) -> str:
return f"{_DEDUP_PREFIX}{digest}"
def _digest_exists_remotely(digest: str) -> bool:
if not s3 or not BUCKET_NAME:
return False
try:
s3.head_object(Bucket=BUCKET_NAME, Key=_digest_marker_key(digest))
return True
except ClientError as exc:
code = exc.response.get("Error", {}).get("Code")
if code in {"404", "NoSuchKey", "403", "Forbidden"}:
if code in {"403", "Forbidden"}:
print(
json.dumps(
{
"event": "dedupe_head_forbidden",
"digest": digest[:12],
"issue": "head_object_forbidden",
},
ensure_ascii=True,
sort_keys=True,
)
)
return False
raise
def _store_digest_marker(digest: str, metadata: dict[str, str]) -> None:
if not s3 or not BUCKET_NAME:
return
try:
s3.put_object(
Bucket=BUCKET_NAME,
Key=_digest_marker_key(digest),
Body=b"",
ContentType="text/plain",
Metadata=metadata,
)
except ClientError as exc:
code = exc.response.get("Error", {}).get("Code")
if code in {"403", "Forbidden"}:
print(
json.dumps(
{
"event": "dedupe_marker_forbidden",
"digest": digest[:12],
"issue": "put_object_forbidden",
},
ensure_ascii=True,
sort_keys=True,
)
)
return
raise
def _prune(now: datetime.datetime) -> None:
cutoff = now - _HASH_TTL
stale_hashes = [digest for digest, seen in _recent_hashes.items() if seen < cutoff]
for digest in stale_hashes:
_recent_hashes.pop(digest, None)
for identity, events in list(_rate_tracker.items()):
filtered = [ts for ts in events if now - ts <= _RATE_LIMIT_WINDOW]
if filtered:
_rate_tracker[identity] = filtered
else:
_rate_tracker.pop(identity, None)
def _allow_rate(identity: str, now: datetime.datetime) -> bool:
events = _rate_tracker.setdefault(identity, [])
events = [ts for ts in events if now - ts <= _RATE_LIMIT_WINDOW]
if len(events) >= _RATE_LIMIT_MAX:
_rate_tracker[identity] = events
return False
events.append(now)
_rate_tracker[identity] = events
return True
def _clean_metadata(values: dict[str, object]) -> dict[str, str]:
cleaned: dict[str, str] = {}
for key, value in values.items():
if not key or value is None:
continue
key_norm = key.strip().lower()
if not key_norm:
continue
val = str(value).strip()
if not val:
continue
cleaned[key_norm[:128]] = val[:1024]
return cleaned
def _is_example_asset(local_path: str) -> bool:
try:
resolved = Path(local_path).resolve(strict=False)
except Exception:
return False
for root in _EXAMPLE_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False
@dataclass(frozen=True)
class UploadContext:
session: str | None = None
ip: str | None = None
country: str | None = None
agent: str | None = None
extras: dict[str, str] = field(default_factory=dict)
@property
def identity(self) -> str:
return (self.session or "").strip() or (self.ip or "").strip() or "anonymous"
def metadata(self) -> dict[str, str]:
data: dict[str, str] = {}
if self.ip:
data["ip"] = self.ip.strip()
if self.session:
data["session"] = self.session.strip()
if self.country:
data["country"] = self.country.strip()
if self.agent:
data["ua"] = self.agent.strip()[:256]
data["identity"] = self.identity
for key, value in (self.extras or {}).items():
if key and value:
data[key.strip().lower()] = value.strip()
return data
def _log_event(event: str, *, context: UploadContext, path: str, **extra: object) -> None:
payload: dict[str, object] = {
"event": event,
"timestamp": datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z",
"file": Path(path).name,
}
payload.update(context.metadata())
for key, value in extra.items():
if value is None:
continue
payload[key] = value
try:
print(json.dumps(payload, ensure_ascii=True, sort_keys=True))
except Exception as exc: # pragma: no cover
print(f"[log-failed] {event} for {path}: {exc}")
def build_context(request: Any | None = None) -> UploadContext:
if request is None:
return UploadContext()
headers: Mapping[str, str] | None = None
raw_headers = getattr(request, "headers", None)
if isinstance(raw_headers, Mapping):
headers = raw_headers # type: ignore[assignment]
elif hasattr(raw_headers, "keys"):
try:
headers = {k: raw_headers.get(k) for k in raw_headers.keys()} # type: ignore[attr-defined]
except Exception:
headers = None
session = getattr(request, "session_hash", None)
ip = None
country = None
agent = None
extras: dict[str, str] = {}
if headers:
forwarded = headers.get("x-forwarded-for") or headers.get("x-real-ip")
if forwarded:
ip = forwarded.split(",")[0].strip()
country = (
headers.get("cf-ipcountry")
or headers.get("x-country-code")
or headers.get("x-appengine-country")
)
agent = headers.get("user-agent")
referer = headers.get("referer") or headers.get("origin")
if referer:
extras["referer"] = referer
if headers.get("host"):
extras["host"] = headers.get("host")
if not ip:
client = getattr(request, "client", None)
ip = getattr(client, "host", None) if client else None
return UploadContext(session=session, ip=ip, country=country, agent=agent, extras=extras)
def imagine(local_path: str, age: int, *, context: UploadContext | None = None) -> str | None:
context = context or UploadContext()
now = datetime.datetime.now()
if _is_example_asset(local_path):
_log_event("skip_example_asset", context=context, path=local_path)
print(f"Skipping upload for built-in example: {local_path}")
return None
try:
file_size = os.path.getsize(local_path)
except OSError as exc:
print(f"Unable to stat file {local_path}: {exc}")
_log_event("stat_failed", context=context, path=local_path, error=str(exc))
return None
if file_size > _MAX_UPLOAD_BYTES:
print(f"Skipping upload: {local_path} exceeds {_MAX_UPLOAD_BYTES} bytes")
_log_event(
"file_too_large",
context=context,
path=local_path,
size=file_size,
limit=_MAX_UPLOAD_BYTES,
)
return None
digest = _hash_file(local_path)
with _lock:
_prune(now)
identity = context.identity
if not _allow_rate(identity, now):
print(f"Rate limit exceeded for {identity}; skipping upload")
_log_event(
"rate_limited",
context=context,
path=local_path,
identity=identity,
rate_limit=_RATE_LIMIT_MAX,
window_seconds=int(_RATE_LIMIT_WINDOW.total_seconds()),
)
return None
seen_at = _recent_hashes.get(digest)
if seen_at and now - seen_at <= _HASH_TTL:
print(f"Skipping upload for duplicate digest {digest[:12]}… (recent)")
_log_event(
"duplicate_recent",
context=context,
path=local_path,
digest=digest[:12],
)
return None
_recent_hashes[digest] = now
if _digest_exists_remotely(digest):
print(f"Skipping upload for duplicate digest {digest[:12]}… (remote)")
_log_event(
"duplicate_remote",
context=context,
path=local_path,
digest=digest[:12],
)
return None
prepared_upload: PreparedUpload | None = None
if not s3 or not BUCKET_NAME:
print()
return None
try:
prepared_upload = _prepare_upload(local_path)
year = now.strftime("%Y")
month = now.strftime("%Y-%m")
today = now.strftime("%Y-%m-%d")
ts = now.strftime("%H%M%S")
stem = _sanitize_stem(pathlib.Path(local_path).stem)
key_prefix = f"{year}/{month}/{today}"
key = f"{key_prefix}/{age}_{ts}_{stem}{prepared_upload.suffix}"
base_metadata = context.metadata()
object_metadata = _clean_metadata(
{
**base_metadata,
"digest": digest,
"size": file_size,
"source-age": age,
"format": prepared_upload.format_name,
"uploaded-at": now.isoformat(timespec="seconds"),
"filename": pathlib.Path(local_path).name,
}
)
content_type = "image/webp" if prepared_upload.format_name == "webp" else "image/jpeg"
s3.upload_file(
Filename=prepared_upload.path,
Bucket=BUCKET_NAME,
Key=key,
ExtraArgs={
"Metadata": object_metadata,
"ContentType": content_type,
},
)
marker_metadata = _clean_metadata(
{
**base_metadata,
"digest": digest,
"size": file_size,
"object-key": key,
"uploaded-at": now.isoformat(timespec="seconds"),
}
)
_store_digest_marker(digest, marker_metadata)
source_name = pathlib.Path(local_path).name
print(f"Uploaded {source_name} ({prepared_upload.format_name}) to s3://{BUCKET_NAME}/{key}")
return key
except ValueError as exc:
print(f"Validation error: {exc}")
_log_event("validation_error", context=context, path=local_path, error=str(exc))
return None
except ClientError as exc:
with _lock:
_recent_hashes.pop(digest, None)
print(f"Error uploading (client): {exc}")
_log_event("client_error", context=context, path=local_path, error=str(exc))
return None
except Exception as exc:
with _lock:
_recent_hashes.pop(digest, None)
print(f"Error uploading: {exc}")
_log_event("unexpected_error", context=context, path=local_path, error=str(exc))
return None
finally:
if prepared_upload and prepared_upload.cleanup and os.path.exists(prepared_upload.path):
try:
os.remove(prepared_upload.path)
except OSError as cleanup_error:
print(f"Warning: could not delete temp file {prepared_upload.path}: {cleanup_error}")