| from __future__ import annotations |
|
|
| import io |
| import json |
| import os |
| import hashlib |
| import shutil |
| import time |
| from contextlib import contextmanager |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Callable |
|
|
| from PIL import Image |
|
|
| from src.character_spike.assets import make_thumbnail_grid, normalize_to_stage_canvas, write_report |
| from src.character_spike.schema import EXPRESSIONS, default_character_package, slugify_identifier |
| from src.character_spike.tavern_import import convert_tavern_card, load_tavern_json |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| WORKSHOP_ROOT = PROJECT_ROOT / "assets" / "generated" / "character_workshop" |
| INSTALLED_CHARACTER_ROOT = PROJECT_ROOT / "assets" / "characters" |
| INSTALLED_BACKGROUND_ROOT = PROJECT_ROOT / "assets" / "backgrounds" |
| INSTALLED_CHARACTER_JSON_ROOT = PROJECT_ROOT / "characters" |
| REMBG_MODEL_ROOT = PROJECT_ROOT / ".tmp" / "rembg-models" |
| QWEN_CANDIDATE = "qwen_image" |
|
|
| EXPRESSION_PROMPTS = { |
| "idle": "neutral idle expression, relaxed posture", |
| "listening": "listening expression, attentive eyes, slight head tilt", |
| "thinking": "thinking expression, one hand near chin, focused eyes", |
| "worried": "worried expression, gentle concern, tense shoulders", |
| "smile": "small warm smile, calm and friendly", |
| "happy": "bright happy expression, open eyes, energetic", |
| "talk": "speaking expression, mouth open naturally, conversational pose", |
| "focus": "focused determined expression, stable confident posture", |
| } |
|
|
| REMOTE_PROBE: Callable[..., dict[str, Any]] | None = None |
| _REMBG_SESSION: Any = None |
| LOGIN_REQUIRED_MESSAGE = "请先使用 Hugging Face 登录,登录后会保存你的角色生成进度。" |
|
|
|
|
| @dataclass(frozen=True) |
| class UserContext: |
| username: str |
| display_name: str |
| authenticated: bool |
| user_id_hash: str |
| storage_key: str |
|
|
|
|
| def get_current_user(profile: Any | None) -> UserContext: |
| username = _profile_value(profile, "username") |
| name = _profile_value(profile, "name") or username |
| if username: |
| storage_key = slugify_identifier(username, fallback="hf_user") |
| return UserContext( |
| username=username, |
| display_name=name or username, |
| authenticated=True, |
| user_id_hash=_hash_user_id(username), |
| storage_key=storage_key, |
| ) |
| return UserContext( |
| username="anonymous", |
| display_name="未登录用户", |
| authenticated=False, |
| user_id_hash=_hash_user_id("anonymous"), |
| storage_key="legacy", |
| ) |
|
|
|
|
| def require_login_for_generation(profile: Any | None) -> UserContext: |
| user = get_current_user(profile) |
| if not user.authenticated: |
| raise ValueError(LOGIN_REQUIRED_MESSAGE) |
| return user |
|
|
|
|
| def resolve_data_root() -> Path: |
| configured = os.environ.get("VC_DATA_ROOT") |
| if configured: |
| return Path(configured) |
| data_mount = Path("/data") |
| if data_mount.exists() and os.access(data_mount, os.W_OK): |
| return data_mount / "virtual_characters" |
| return WORKSHOP_ROOT |
|
|
|
|
| def user_workshop_root(user: UserContext) -> Path: |
| return resolve_data_root() / "users" / user.storage_key / "character_workshop" |
|
|
|
|
| def create_draft_from_form( |
| *, |
| display_name: str, |
| description: str, |
| personality: str, |
| scenario: str, |
| first_mes: str, |
| tags: str = "", |
| ) -> dict[str, Any]: |
| name = (display_name or "新角色").strip() |
| character_id = slugify_identifier(name, fallback="workshop_character") |
| package = default_character_package(character_id, name) |
| if description.strip(): |
| package["description"] = description.strip() |
| package["summary"] = _summary(description.strip()) |
| package["profile"]["identity"] = _first_line(description.strip()) |
| if personality.strip(): |
| package["personality"] = personality.strip() |
| package["profile"]["core_traits"] = _split_tags(personality)[:8] or package["profile"]["core_traits"] |
| if scenario.strip(): |
| package["scenario"] = scenario.strip() |
| package["profile"]["relationship_to_user"] = _first_line(scenario.strip()) |
| if first_mes.strip(): |
| package["first_mes"] = first_mes.strip() |
| parsed_tags = _split_tags(tags) |
| if parsed_tags: |
| package["tags"] = parsed_tags |
| package["metadata"]["source"] = "character_workshop_draft" |
| return package |
|
|
|
|
| def create_draft_from_tavern_json(file: Any) -> dict[str, Any]: |
| path = _file_path(file) |
| if not path: |
| raise ValueError("请先上传 Tavern JSON 角色卡。") |
| package = convert_tavern_card(load_tavern_json(path)) |
| package["metadata"]["source"] = "character_workshop_tavern_json" |
| return package |
|
|
|
|
| def create_initial_state( |
| draft: dict[str, Any], |
| *, |
| user: UserContext | None = None, |
| persist: bool = True, |
| ) -> dict[str, Any]: |
| character_id = slugify_identifier(str(draft.get("id") or draft.get("display_name") or "workshop_character")) |
| draft["id"] = character_id |
| draft["visual"]["avatar"] = character_id |
| draft["visual"]["background_image"] = f"{character_id}_background" |
| state = { |
| "draft": draft, |
| "character_id": character_id, |
| "run_dir": None, |
| "main_candidates": [], |
| "selected_candidate_index": None, |
| "expression_assets": {}, |
| "background_asset": None, |
| "manifest_path": None, |
| "installed_character_id": None, |
| } |
| if not persist: |
| return state |
|
|
| run_dir = _unique_run_dir(character_id, user=user) |
| _initialize_run_dir(run_dir, draft, user=user) |
| state["run_dir"] = str(run_dir) |
| state["manifest_path"] = str(run_dir / "manifest.json") |
| return state |
|
|
|
|
| def ensure_user_workshop_run(state: dict[str, Any], user: UserContext) -> dict[str, Any]: |
| state = _ensure_draft_state(state) |
| if state.get("run_dir"): |
| return load_workshop_run(state["run_dir"], user=user) |
| draft = dict(state["draft"]) |
| character_id = slugify_identifier(str(draft.get("id") or draft.get("display_name") or "workshop_character")) |
| run_dir = _unique_run_dir(character_id, user=user) |
| _initialize_run_dir(run_dir, draft, user=user) |
| return load_workshop_run(run_dir, user=user) |
|
|
|
|
| def list_user_workshop_runs(user: UserContext, *, limit: int = 50) -> list[tuple[str, str]]: |
| root = user_workshop_root(user) if user.authenticated else WORKSHOP_ROOT |
| runs = [] |
| for manifest_path in root.glob("*/*/manifest.json"): |
| run_dir = manifest_path.parent |
| try: |
| summary = summarize_workshop_run(run_dir) |
| except Exception: |
| continue |
| runs.append(summary) |
| runs.sort(key=lambda item: item["updated_at_unix"], reverse=True) |
| return [(item["label"], item["run_dir"]) for item in runs[:limit]] |
|
|
|
|
| def summarize_workshop_run(run_dir: str | Path) -> dict[str, Any]: |
| run_dir = Path(run_dir) |
| manifest = _read_manifest(run_dir) |
| state = _state_from_manifest(run_dir, manifest) |
| stage = _workflow_stage(state, manifest) |
| updated = int(manifest.get("updated_at_unix") or manifest.get("created_at_unix") or run_dir.stat().st_mtime) |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(updated)) |
| display_name = str(manifest.get("display_name") or state.get("draft", {}).get("display_name") or state.get("character_id") or "未命名角色") |
| next_step = _next_step_label(stage) |
| return { |
| "label": f"{display_name} / {timestamp} / {stage} / {next_step}", |
| "run_dir": str(run_dir), |
| "stage": stage, |
| "updated_at_unix": updated, |
| "character_id": state.get("character_id"), |
| "display_name": display_name, |
| } |
|
|
|
|
| def load_workshop_run(run_dir: str | Path, *, user: UserContext | None = None) -> dict[str, Any]: |
| run_dir = Path(run_dir).resolve() |
| if user is not None: |
| allowed_root = (user_workshop_root(user) if user.authenticated else WORKSHOP_ROOT).resolve() |
| if not _is_relative_to(run_dir, allowed_root): |
| raise ValueError("不能加载其他用户的角色生成任务。") |
| manifest = _read_manifest(run_dir) |
| state = _state_from_manifest(run_dir, manifest) |
| _sync_manifest_state(run_dir, state, manifest) |
| return state |
|
|
|
|
| def record_workshop_event(user: UserContext, event: str, payload: dict[str, Any] | None = None) -> None: |
| try: |
| payload = dict(payload or {}) |
| data_root = resolve_data_root() |
| stats_dir = data_root / "stats" |
| stats_dir.mkdir(parents=True, exist_ok=True) |
| record = { |
| "time_unix": int(time.time()), |
| "user_id_hash": user.user_id_hash, |
| "authenticated": user.authenticated, |
| "event": event, |
| "stage": payload.get("stage"), |
| "character_id": payload.get("character_id"), |
| "duration_seconds": payload.get("duration_seconds"), |
| "success": payload.get("success"), |
| "failure_reason": payload.get("failure_reason"), |
| "image_count": payload.get("image_count"), |
| "modal_state": payload.get("modal_state"), |
| } |
| with (stats_dir / "events.jsonl").open("a", encoding="utf-8") as handle: |
| handle.write(json.dumps(record, ensure_ascii=False) + "\n") |
| except OSError: |
| return |
|
|
|
|
| def summarize_workshop_stats() -> dict[str, Any]: |
| stats_path = resolve_data_root() / "stats" / "events.jsonl" |
| if not stats_path.exists(): |
| return {"events": 0, "users": 0, "tasks": 0, "success_rate": None, "failures_by_stage": {}} |
| events = [] |
| for line in stats_path.read_text(encoding="utf-8").splitlines(): |
| if not line.strip(): |
| continue |
| try: |
| events.append(json.loads(line)) |
| except json.JSONDecodeError: |
| continue |
| successes = [event for event in events if event.get("success") is True] |
| completed = [event for event in events if event.get("success") is not None] |
| durations = [float(event["duration_seconds"]) for event in events if event.get("duration_seconds") is not None] |
| failures_by_stage: dict[str, int] = {} |
| for event in events: |
| if event.get("success") is False: |
| stage = str(event.get("stage") or "unknown") |
| failures_by_stage[stage] = failures_by_stage.get(stage, 0) + 1 |
| return { |
| "events": len(events), |
| "users": len({event.get("user_id_hash") for event in events if event.get("user_id_hash")}), |
| "tasks": len({event.get("character_id") for event in events if event.get("character_id")}), |
| "success_rate": round(len(successes) / len(completed), 4) if completed else None, |
| "average_duration_seconds": round(sum(durations) / len(durations), 3) if durations else None, |
| "failures_by_stage": failures_by_stage, |
| } |
|
|
|
|
| def generate_main_candidates(state: dict[str, Any], *, seed: int = 42) -> dict[str, Any]: |
| state = _ensure_state(state) |
| draft = state["draft"] |
| run_dir = Path(state["run_dir"]) |
| manifest = _read_manifest(run_dir) |
| existing = _existing_main_candidates(run_dir, manifest) |
| if len(existing) >= 4: |
| state["main_candidates"] = [str(path) for path in existing[:4]] |
| if state.get("selected_candidate_index") is None: |
| state["selected_candidate_index"] = int(manifest.get("selected_candidate_index") or 0) |
| _sync_manifest_state(run_dir, state, manifest) |
| return state |
|
|
| started = time.perf_counter() |
| prompt = _main_visual_prompt(draft) |
| result = _remote_probe( |
| candidate_id=QWEN_CANDIDATE, |
| prompt=prompt, |
| batch_size=4, |
| seed=seed, |
| width=768, |
| height=1024, |
| ) |
| _raise_for_probe_failure(result) |
| paths = _write_image_payloads(run_dir / "main_candidates", result.get("images", []), expected=4) |
| state["main_candidates"] = [str(path) for path in paths] |
| state["selected_candidate_index"] = 0 |
| manifest["main_candidates"] = [str(path.relative_to(run_dir)) for path in paths] |
| manifest["selected_candidate_index"] = 0 |
| manifest["model_results"].append(_probe_record(result, "main_candidates", prompt, started)) |
| manifest["workflow_stage"] = "main_candidates" |
| _touch_manifest(manifest) |
| _write_manifest(manifest, run_dir) |
| return state |
|
|
|
|
| def select_main_candidate(state: dict[str, Any], index: int) -> dict[str, Any]: |
| state = _ensure_state(state) |
| candidates = state.get("main_candidates") or [] |
| if not candidates: |
| raise ValueError("请先生成主视觉候选。") |
| clamped = max(0, min(int(index), len(candidates) - 1)) |
| state["selected_candidate_index"] = clamped |
| run_dir = Path(state["run_dir"]) |
| manifest = _read_manifest(run_dir) |
| manifest["selected_candidate_index"] = clamped |
| _touch_manifest(manifest) |
| _write_manifest(manifest, run_dir) |
| return state |
|
|
|
|
| def generate_expression_pack(state: dict[str, Any], *, seed: int = 100) -> dict[str, Any]: |
| state = _ensure_state(state) |
| draft = state["draft"] |
| run_dir = Path(state["run_dir"]) |
| expression_dir = run_dir / "expressions_raw" |
| expression_dir.mkdir(parents=True, exist_ok=True) |
| manifest = _read_manifest(run_dir) |
| expression_assets: dict[str, str] = _existing_expression_assets(run_dir, manifest) |
|
|
| with _remote_probe_context() as probe: |
| for index, expression in enumerate(EXPRESSIONS): |
| if expression in expression_assets and Path(expression_assets[expression]).exists(): |
| continue |
| started = time.perf_counter() |
| prompt = f"{_main_visual_prompt(draft)}, {EXPRESSION_PROMPTS[expression]}" |
| result = probe( |
| candidate_id=QWEN_CANDIDATE, |
| prompt=prompt, |
| batch_size=1, |
| seed=seed + index, |
| width=768, |
| height=1024, |
| ) |
| _raise_for_probe_failure(result) |
| path = _write_image_payloads(expression_dir / expression, result.get("images", []), expected=1)[0] |
| expression_assets[expression] = str(path) |
| manifest["model_results"].append(_probe_record(result, f"expression_{expression}", prompt, started)) |
| manifest["expression_assets_raw"] = { |
| key: str(Path(value).relative_to(run_dir)) for key, value in expression_assets.items() |
| } |
| manifest["workflow_stage"] = "expressions_complete" if set(EXPRESSIONS) <= set(expression_assets) else "expressions_partial" |
| _touch_manifest(manifest) |
| _write_manifest(manifest, run_dir) |
|
|
| state["expression_assets"] = expression_assets |
| manifest["expression_assets_raw"] = { |
| key: str(Path(value).relative_to(run_dir)) for key, value in expression_assets.items() |
| } |
| manifest["workflow_stage"] = "expressions_complete" if set(EXPRESSIONS) <= set(expression_assets) else "expressions_partial" |
| _touch_manifest(manifest) |
| _write_manifest(manifest, run_dir) |
| return state |
|
|
|
|
| def generate_background(state: dict[str, Any], *, seed: int = 900) -> dict[str, Any]: |
| state = _ensure_state(state) |
| draft = state["draft"] |
| run_dir = Path(state["run_dir"]) |
| manifest = _read_manifest(run_dir) |
| existing = _existing_background(run_dir, manifest) |
| if existing is not None: |
| state["background_asset"] = str(existing) |
| _sync_manifest_state(run_dir, state, manifest) |
| return state |
|
|
| started = time.perf_counter() |
| prompt = _background_prompt(draft) |
| result = _remote_probe( |
| candidate_id=QWEN_CANDIDATE, |
| prompt=prompt, |
| batch_size=1, |
| seed=seed, |
| width=1344, |
| height=768, |
| ) |
| _raise_for_probe_failure(result) |
| path = _write_image_payloads(run_dir / "background_raw", result.get("images", []), expected=1)[0] |
| state["background_asset"] = str(path) |
| manifest["background_raw"] = str(path.relative_to(run_dir)) |
| manifest["model_results"].append(_probe_record(result, "background", prompt, started)) |
| manifest["workflow_stage"] = "assets_ready" if set(EXPRESSIONS) <= set(state.get("expression_assets") or {}) else "background_ready" |
| _touch_manifest(manifest) |
| _write_manifest(manifest, run_dir) |
| return state |
|
|
|
|
| def matte_and_package_assets(state: dict[str, Any], *, mode: str = "rembg_isnet_anime") -> dict[str, Any]: |
| state = _ensure_state(state) |
| run_dir = Path(state["run_dir"]) |
| character_id = state["character_id"] |
| draft = state["draft"] |
| package_root = run_dir / "package" |
| manifest = _read_manifest(run_dir) |
| if _package_complete(package_root, character_id): |
| state["package_dir"] = str(package_root) |
| state["manifest_path"] = str(run_dir / "manifest.json") |
| manifest["workflow_stage"] = "packaged" |
| manifest["ready_for_chat"] = True |
| _touch_manifest(manifest) |
| _write_manifest(manifest, run_dir) |
| return state |
|
|
| character_dir = package_root / "assets" / "characters" / character_id |
| background_dir = package_root / "assets" / "backgrounds" |
| generated_dir = package_root / "generated" |
| character_dir.mkdir(parents=True, exist_ok=True) |
| background_dir.mkdir(parents=True, exist_ok=True) |
| generated_dir.mkdir(parents=True, exist_ok=True) |
|
|
| expression_assets = state.get("expression_assets") or {} |
| if set(EXPRESSIONS) - set(expression_assets): |
| raise ValueError("请先生成完整 8 表情资产。") |
|
|
| packaged_assets = [] |
| for expression in EXPRESSIONS: |
| source = Path(expression_assets[expression]) |
| image = Image.open(source).convert("RGBA") |
| processed, matting_status = _matte_image(image, mode=mode) |
| normalized = normalize_to_stage_canvas(processed) |
| target = character_dir / f"{expression}.png" |
| normalized.save(target) |
| packaged_assets.append( |
| { |
| "slot": expression, |
| "path": str(target.relative_to(package_root)), |
| "source_path": str(source.relative_to(run_dir)), |
| "matting_status": matting_status, |
| "bytes": target.stat().st_size, |
| "usable": True, |
| } |
| ) |
|
|
| background_source = Path(state["background_asset"]) if state.get("background_asset") else None |
| background_target = background_dir / f"{character_id}_background.png" |
| if background_source and background_source.exists(): |
| _fit_background(Image.open(background_source).convert("RGB")).save(background_target) |
| else: |
| _fallback_background(str(draft.get("display_name") or draft.get("name") or character_id)).save(background_target) |
|
|
| package = dict(draft) |
| package["metadata"] = {**package.get("metadata", {}), "source": "character_workshop", "generation_manifest": str(run_dir / "manifest.json")} |
| package["visual"] = {**package.get("visual", {}), "avatar": character_id, "background_image": f"{character_id}_background"} |
| character_json = package_root / "characters" / f"{character_id}.json" |
| character_json.parent.mkdir(parents=True, exist_ok=True) |
| _write_json(character_json, package) |
|
|
| grid = generated_dir / "asset_grid.png" |
| make_thumbnail_grid([character_dir / f"{slot}.png" for slot in EXPRESSIONS], grid) |
| smoke = generated_dir / "stage_smoke.html" |
| _write_stage_smoke(package, package_root, smoke) |
|
|
| manifest["packaged_assets"] = packaged_assets |
| manifest["background_asset"] = str(background_target.relative_to(package_root)) |
| manifest["character_package"] = str(character_json.relative_to(package_root)) |
| manifest["package_run_dir"] = str(package_root) |
| manifest["asset_grid"] = str(grid.relative_to(package_root)) |
| manifest["stage_smoke"] = str(smoke.relative_to(package_root)) |
| manifest["ready_for_chat"] = True |
| manifest["workflow_stage"] = "packaged" |
| _touch_manifest(manifest) |
| _write_manifest(manifest, run_dir) |
| write_report(manifest, package_root / "generated" / "report.md") |
| state["package_dir"] = str(package_root) |
| state["manifest_path"] = str(run_dir / "manifest.json") |
| return state |
|
|
|
|
| def install_character_package(state: dict[str, Any]) -> dict[str, Any]: |
| state = _ensure_state(state) |
| package_dir = Path(state.get("package_dir") or Path(state["run_dir"]) / "package") |
| character_id = state["character_id"] |
| source_character_dir = package_dir / "assets" / "characters" / character_id |
| source_background = package_dir / "assets" / "backgrounds" / f"{character_id}_background.png" |
| source_json = package_dir / "characters" / f"{character_id}.json" |
| if not source_character_dir.exists() or not source_json.exists(): |
| raise ValueError("请先完成打包预览。") |
|
|
| target_character_dir = INSTALLED_CHARACTER_ROOT / character_id |
| target_background = INSTALLED_BACKGROUND_ROOT / f"{character_id}_background.png" |
| target_json = INSTALLED_CHARACTER_JSON_ROOT / f"{character_id}.json" |
| shutil.copytree(source_character_dir, target_character_dir, dirs_exist_ok=True) |
| target_background.parent.mkdir(parents=True, exist_ok=True) |
| if source_background.exists(): |
| shutil.copy2(source_background, target_background) |
| target_json.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy2(source_json, target_json) |
|
|
| manifest = _read_manifest(Path(state["run_dir"])) |
| manifest["install_paths"] = { |
| "character_assets": str(target_character_dir), |
| "background": str(target_background), |
| "character_json": str(target_json), |
| } |
| manifest["installed_at_unix"] = int(time.time()) |
| manifest["workflow_stage"] = "installed" |
| _touch_manifest(manifest) |
| _write_manifest(manifest, Path(state["run_dir"])) |
| _clear_stage_caches() |
| state["installed_character_id"] = character_id |
| return state |
|
|
|
|
| def render_packaged_stage_preview(state: dict[str, Any]) -> str: |
| state = _ensure_state(state) |
| package_dir = Path(state.get("package_dir") or Path(state["run_dir"]) / "package") |
| character_path = package_dir / "characters" / f"{state['character_id']}.json" |
| if not character_path.exists(): |
| return "" |
| character = json.loads(character_path.read_text(encoding="utf-8")) |
| from src import stage_driver |
|
|
| old_asset_root = stage_driver.ASSET_ROOT |
| old_background_root = stage_driver.BACKGROUND_ROOT |
| try: |
| stage_driver.ASSET_ROOT = package_dir / "assets" / "characters" |
| stage_driver.BACKGROUND_ROOT = package_dir / "assets" / "backgrounds" |
| stage_driver._asset_data_uri.cache_clear() |
| stage_driver._background_data_uri.cache_clear() |
| return stage_driver.render_character_stage( |
| character, |
| {"expression": "smile", "motion": "talk", "intensity": 0.72}, |
| ) |
| finally: |
| stage_driver.ASSET_ROOT = old_asset_root |
| stage_driver.BACKGROUND_ROOT = old_background_root |
| stage_driver._asset_data_uri.cache_clear() |
| stage_driver._background_data_uri.cache_clear() |
|
|
|
|
| def set_remote_probe_for_tests(fn: Callable[..., dict[str, Any]] | None) -> None: |
| global REMOTE_PROBE |
| REMOTE_PROBE = fn |
|
|
|
|
| def _remote_probe(**kwargs: Any) -> dict[str, Any]: |
| with _remote_probe_context() as probe: |
| return probe(**kwargs) |
|
|
|
|
| @contextmanager |
| def _remote_probe_context(): |
| if REMOTE_PROBE is not None: |
| yield REMOTE_PROBE |
| return |
| from modal_apps.modal_character_spike import CharacterGenerationSpike, app |
|
|
| with app.run(): |
| yield CharacterGenerationSpike().probe.remote |
|
|
|
|
| def _write_image_payloads(output_dir: Path, images: list[bytes], *, expected: int) -> list[Path]: |
| if len(images) < expected: |
| raise ValueError(f"模型返回 {len(images)} 张图,期望 {expected} 张。") |
| output_dir.mkdir(parents=True, exist_ok=True) |
| paths = [] |
| for index, payload in enumerate(images[:expected]): |
| path = output_dir / f"{index:02d}.png" |
| path.write_bytes(payload) |
| paths.append(path) |
| return paths |
|
|
|
|
| def _matte_image(image: Image.Image, *, mode: str) -> tuple[Image.Image, str]: |
| if mode != "rembg_isnet_anime": |
| return image.convert("RGBA"), "fallback_keep_background" |
| try: |
| REMBG_MODEL_ROOT.mkdir(parents=True, exist_ok=True) |
| os.environ.setdefault("U2NET_HOME", str(REMBG_MODEL_ROOT)) |
| from rembg import new_session, remove |
|
|
| global _REMBG_SESSION |
| if _REMBG_SESSION is None: |
| _REMBG_SESSION = new_session("isnet-anime") |
| buffer = io.BytesIO() |
| image.convert("RGB").save(buffer, format="PNG") |
| output = remove(buffer.getvalue(), session=_REMBG_SESSION) |
| matte = Image.open(io.BytesIO(output)).convert("RGBA") |
| if not _valid_alpha(matte): |
| return image.convert("RGBA"), "fallback_keep_background" |
| return matte, "rembg_isnet_anime" |
| except Exception: |
| return image.convert("RGBA"), "fallback_keep_background" |
|
|
|
|
| def _valid_alpha(image: Image.Image) -> bool: |
| alpha = image.getchannel("A") |
| bbox = alpha.getbbox() |
| if bbox is None: |
| return False |
| non_zero = sum(1 for value in alpha.getdata() if value > 8) |
| ratio = non_zero / (image.width * image.height) |
| return 0.05 <= ratio <= 0.92 |
|
|
|
|
| def _main_visual_prompt(draft: dict[str, Any]) -> str: |
| name = str(draft.get("display_name") or draft.get("name") or "original character") |
| description = " ".join(str(draft.get(key) or "").strip() for key in ("description", "personality", "scenario")) |
| return ( |
| f"single original anime-style virtual character portrait for {name}, {description}, " |
| "young adult original character, half body character sprite, centered composition, " |
| "plain light background, clean silhouette, no text, no logo, no poster, not a landscape, " |
| "not an existing commercial character" |
| ) |
|
|
|
|
| def _background_prompt(draft: dict[str, Any]) -> str: |
| scenario = str(draft.get("scenario") or "quiet virtual communication room") |
| return ( |
| f"empty background for a virtual character conversation stage, {scenario}, " |
| "wide landscape image, no people, no character, no text, no logo, clean atmospheric sci-fi interior" |
| ) |
|
|
|
|
| def _new_manifest(draft: dict[str, Any], run_dir: Path, *, user: UserContext | None = None) -> dict[str, Any]: |
| now = int(time.time()) |
| owner = { |
| "username": user.username if user and user.authenticated else None, |
| "authenticated": bool(user and user.authenticated), |
| } |
| return { |
| "schema_version": 1, |
| "run_type": "character_workshop", |
| "character_id": draft["id"], |
| "display_name": draft.get("display_name") or draft.get("name"), |
| "created_at_unix": now, |
| "updated_at_unix": now, |
| "run_dir": str(run_dir), |
| "draft_character": "draft_character.json", |
| "model": QWEN_CANDIDATE, |
| "model_results": [], |
| "owner": owner, |
| "workflow_stage": "draft", |
| "stats_run_id": _hash_user_id(f"{run_dir}:{now}")[:16], |
| "ready_for_chat": False, |
| } |
|
|
|
|
| def _probe_record(result: dict[str, Any], case: str, prompt: str, started: float) -> dict[str, Any]: |
| return { |
| "candidate_id": result.get("candidate_id"), |
| "benchmark_case": case, |
| "status": result.get("status"), |
| "image_count": result.get("image_count"), |
| "duration_seconds": result.get("duration_seconds"), |
| "client_wall_seconds": round(time.perf_counter() - started, 3), |
| "prompt": prompt, |
| "seed": result.get("seed"), |
| "failure_reason": result.get("failure_reason"), |
| } |
|
|
|
|
| def _raise_for_probe_failure(result: dict[str, Any]) -> None: |
| if result.get("status") != "ok": |
| reason = result.get("failure_reason") or "未知错误" |
| raise RuntimeError(f"图像生成失败:{reason}") |
|
|
|
|
| def _ensure_state(state: dict[str, Any] | None) -> dict[str, Any]: |
| if not state or not state.get("draft") or not state.get("run_dir"): |
| raise ValueError("请先创建角色草案。") |
| return dict(state) |
|
|
|
|
| def _ensure_draft_state(state: dict[str, Any] | None) -> dict[str, Any]: |
| if not state or not state.get("draft"): |
| raise ValueError("请先创建角色草案。") |
| return dict(state) |
|
|
|
|
| def _unique_run_dir(character_id: str, *, user: UserContext | None = None) -> Path: |
| root = user_workshop_root(user) if user and user.authenticated else WORKSHOP_ROOT |
| return root / character_id / time.strftime("%Y%m%d_%H%M%S") |
|
|
|
|
| def _initialize_run_dir(run_dir: Path, draft: dict[str, Any], *, user: UserContext | None = None) -> None: |
| run_dir.mkdir(parents=True, exist_ok=True) |
| _write_json(run_dir / "draft_character.json", draft) |
| manifest = _new_manifest(draft, run_dir, user=user) |
| _write_manifest(manifest, run_dir) |
|
|
|
|
| def _read_manifest(run_dir: Path) -> dict[str, Any]: |
| return json.loads((run_dir / "manifest.json").read_text(encoding="utf-8")) |
|
|
|
|
| def _write_manifest(manifest: dict[str, Any], run_dir: Path) -> None: |
| _write_json(run_dir / "manifest.json", manifest) |
|
|
|
|
| def _touch_manifest(manifest: dict[str, Any]) -> None: |
| manifest["updated_at_unix"] = int(time.time()) |
|
|
|
|
| def _write_json(path: Path, payload: dict[str, Any]) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") |
|
|
|
|
| def _state_from_manifest(run_dir: Path, manifest: dict[str, Any]) -> dict[str, Any]: |
| draft_path = run_dir / str(manifest.get("draft_character") or "draft_character.json") |
| draft = json.loads(draft_path.read_text(encoding="utf-8")) if draft_path.exists() else default_character_package( |
| str(manifest.get("character_id") or "workshop_character"), |
| str(manifest.get("display_name") or "工坊角色"), |
| ) |
| character_id = slugify_identifier(str(draft.get("id") or manifest.get("character_id") or "workshop_character")) |
| selected_index = int(manifest.get("selected_candidate_index") or 0) |
| main_candidates = _existing_main_candidates(run_dir, manifest) |
| expression_assets = _existing_expression_assets(run_dir, manifest) |
| background = _existing_background(run_dir, manifest) |
| package_dir = Path(str(manifest.get("package_run_dir") or run_dir / "package")) |
| state = { |
| "draft": draft, |
| "character_id": character_id, |
| "run_dir": str(run_dir), |
| "main_candidates": [str(path) for path in main_candidates], |
| "selected_candidate_index": selected_index if main_candidates else None, |
| "expression_assets": expression_assets, |
| "background_asset": str(background) if background else None, |
| "manifest_path": str(run_dir / "manifest.json"), |
| "installed_character_id": character_id if manifest.get("installed_at_unix") else None, |
| } |
| if package_dir.exists(): |
| state["package_dir"] = str(package_dir) |
| return state |
|
|
|
|
| def _sync_manifest_state(run_dir: Path, state: dict[str, Any], manifest: dict[str, Any]) -> None: |
| main_candidates = [Path(path) for path in state.get("main_candidates") or [] if Path(path).exists()] |
| if main_candidates: |
| manifest["main_candidates"] = [str(path.relative_to(run_dir)) for path in main_candidates] |
| manifest["selected_candidate_index"] = int(state.get("selected_candidate_index") or 0) |
| expression_assets = {key: value for key, value in (state.get("expression_assets") or {}).items() if Path(value).exists()} |
| if expression_assets: |
| manifest["expression_assets_raw"] = { |
| key: str(Path(value).relative_to(run_dir)) for key, value in expression_assets.items() |
| } |
| if state.get("background_asset") and Path(state["background_asset"]).exists(): |
| manifest["background_raw"] = str(Path(state["background_asset"]).relative_to(run_dir)) |
| manifest["workflow_stage"] = _workflow_stage(state, manifest) |
| _touch_manifest(manifest) |
| _write_manifest(manifest, run_dir) |
|
|
|
|
| def _existing_main_candidates(run_dir: Path, manifest: dict[str, Any]) -> list[Path]: |
| paths = [] |
| for value in manifest.get("main_candidates") or []: |
| path = _resolve_run_path(run_dir, value) |
| if path.exists(): |
| paths.append(path) |
| if paths: |
| return paths |
| return sorted((run_dir / "main_candidates").glob("*.png")) |
|
|
|
|
| def _existing_expression_assets(run_dir: Path, manifest: dict[str, Any]) -> dict[str, str]: |
| assets = {} |
| raw = manifest.get("expression_assets_raw") or {} |
| for expression in EXPRESSIONS: |
| value = raw.get(expression) |
| path = _resolve_run_path(run_dir, value) if value else run_dir / "expressions_raw" / expression / "00.png" |
| if path.exists(): |
| assets[expression] = str(path) |
| return assets |
|
|
|
|
| def _existing_background(run_dir: Path, manifest: dict[str, Any]) -> Path | None: |
| value = manifest.get("background_raw") |
| path = _resolve_run_path(run_dir, value) if value else run_dir / "background_raw" / "00.png" |
| return path if path.exists() else None |
|
|
|
|
| def _package_complete(package_root: Path, character_id: str) -> bool: |
| character_dir = package_root / "assets" / "characters" / character_id |
| background = package_root / "assets" / "backgrounds" / f"{character_id}_background.png" |
| character_json = package_root / "characters" / f"{character_id}.json" |
| return ( |
| character_json.exists() |
| and background.exists() |
| and all((character_dir / f"{expression}.png").exists() for expression in EXPRESSIONS) |
| ) |
|
|
|
|
| def _workflow_stage(state: dict[str, Any], manifest: dict[str, Any]) -> str: |
| if manifest.get("installed_at_unix"): |
| return "installed" |
| character_id = str(state.get("character_id") or manifest.get("character_id") or "") |
| package_root = Path(str(state.get("package_dir") or manifest.get("package_run_dir") or Path(state["run_dir"]) / "package")) |
| if character_id and _package_complete(package_root, character_id): |
| return "packaged" |
| expressions = state.get("expression_assets") or {} |
| background = state.get("background_asset") |
| if set(EXPRESSIONS) <= set(expressions) and background and Path(background).exists(): |
| return "assets_ready" |
| if expressions: |
| return "expressions_partial" |
| if len(state.get("main_candidates") or []) >= 4: |
| return "main_candidates" |
| return "draft" |
|
|
|
|
| def _next_step_label(stage: str) -> str: |
| return { |
| "draft": "生成主视觉", |
| "main_candidates": "生成8表情", |
| "expressions_partial": "补齐表情", |
| "assets_ready": "打包预览", |
| "packaged": "安装角色", |
| "installed": "已安装", |
| }.get(stage, "继续") |
|
|
|
|
| def _resolve_run_path(run_dir: Path, value: Any) -> Path: |
| if not value: |
| return run_dir / "__missing__" |
| path = Path(str(value)) |
| return path if path.is_absolute() else run_dir / path |
|
|
|
|
| def _is_relative_to(path: Path, root: Path) -> bool: |
| try: |
| path.relative_to(root) |
| return True |
| except ValueError: |
| return False |
|
|
|
|
| def _profile_value(profile: Any, key: str) -> str: |
| if profile is None: |
| return "" |
| if isinstance(profile, dict): |
| return str(profile.get(key) or "").strip() |
| return str(getattr(profile, key, "") or "").strip() |
|
|
|
|
| def _hash_user_id(value: str) -> str: |
| return hashlib.sha256(str(value).encode("utf-8")).hexdigest() |
|
|
|
|
| def _file_path(file: Any) -> Path | None: |
| if file is None: |
| return None |
| if isinstance(file, (str, Path)): |
| return Path(file) |
| if isinstance(file, dict): |
| value = file.get("path") or file.get("name") |
| return Path(value) if value else None |
| value = getattr(file, "path", None) or getattr(file, "name", None) |
| return Path(value) if value else None |
|
|
|
|
| def _split_tags(value: str) -> list[str]: |
| text = str(value or "") |
| for separator in (",", "、", ";", ";", "\n"): |
| text = text.replace(separator, ",") |
| return [item.strip() for item in text.split(",") if item.strip()] |
|
|
|
|
| def _summary(text: str, limit: int = 120) -> str: |
| compact = " ".join(text.split()) |
| return compact if len(compact) <= limit else compact[: limit - 1].rstrip() + "…" |
|
|
|
|
| def _first_line(text: str) -> str: |
| for line in text.splitlines(): |
| if line.strip(): |
| return line.strip() |
| return text.strip() |
|
|
|
|
| def _fit_background(image: Image.Image) -> Image.Image: |
| image = image.convert("RGB") |
| target = (1600, 900) |
| image.thumbnail(target, Image.Resampling.LANCZOS) |
| canvas = Image.new("RGB", target, (12, 18, 31)) |
| canvas.paste(image, ((target[0] - image.width) // 2, (target[1] - image.height) // 2)) |
| return canvas |
|
|
|
|
| def _fallback_background(display_name: str) -> Image.Image: |
| from src.character_spike.assets import _draw_mock_background |
|
|
| return _draw_mock_background(display_name, __import__("random").Random(42)) |
|
|
|
|
| def _write_stage_smoke(character: dict[str, Any], package_root: Path, output: Path) -> None: |
| from src import stage_driver |
|
|
| old_asset_root = stage_driver.ASSET_ROOT |
| old_background_root = stage_driver.BACKGROUND_ROOT |
| try: |
| stage_driver.ASSET_ROOT = package_root / "assets" / "characters" |
| stage_driver.BACKGROUND_ROOT = package_root / "assets" / "backgrounds" |
| stage_driver._asset_data_uri.cache_clear() |
| stage_driver._background_data_uri.cache_clear() |
| states = [ |
| {"expression": "idle", "motion": "breathe", "intensity": 0.35}, |
| {"expression": "listening", "motion": "gentle_blink", "intensity": 0.35}, |
| {"expression": "smile", "motion": "talk", "intensity": 0.72}, |
| {"expression": "thinking", "motion": "focus", "intensity": 0.7}, |
| ] |
| html = "<!doctype html><meta charset='utf-8'><body style='margin:0;background:#020617'>" |
| html += "\n".join(stage_driver.render_character_stage(character, state) for state in states) |
| html += "</body>" |
| output.parent.mkdir(parents=True, exist_ok=True) |
| output.write_text(html, encoding="utf-8") |
| finally: |
| stage_driver.ASSET_ROOT = old_asset_root |
| stage_driver.BACKGROUND_ROOT = old_background_root |
| stage_driver._asset_data_uri.cache_clear() |
| stage_driver._background_data_uri.cache_clear() |
|
|
|
|
| def _clear_stage_caches() -> None: |
| try: |
| from src import stage_driver |
|
|
| stage_driver._asset_data_uri.cache_clear() |
| stage_driver._background_data_uri.cache_clear() |
| except Exception: |
| pass |
|
|