Spaces:
Running
Running
| from __future__ import annotations | |
| import logging | |
| from datetime import datetime | |
| from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status | |
| from sqlalchemy import or_, select | |
| from sqlalchemy.orm import Session | |
| from app.auth import require_player | |
| from app.config import get_settings | |
| from app.db import get_session | |
| from app.memory.crud import counts_by_scope, counts_by_type, list_for_character | |
| from app.models.character import ( | |
| Character, | |
| CharacterState, | |
| ContentRating, | |
| Visibility, | |
| rating_allowed, | |
| rating_level, | |
| ) | |
| from app.models.match import Player | |
| from app.models.memory import MemoryScope, MemoryType | |
| from app.schemas.character import ( | |
| CharacterCreate, | |
| CharacterDetail, | |
| CharacterRead, | |
| CharacterSummary, | |
| CharacterUpdate, | |
| MemoryCountsByScope, | |
| MemoryCountsByType, | |
| ) | |
| from app.schemas.memory import MemoryRead | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api/characters", tags=["characters"]) | |
| def _require_character_api() -> None: | |
| if not get_settings().allow_character_api: | |
| raise HTTPException(status_code=404, detail="Not found") | |
| def _run_generation_bg(character_id: str) -> None: | |
| from app.characters.memory_generator import generate_and_store | |
| try: | |
| generate_and_store(character_id) | |
| except Exception: | |
| logger.exception("Memory generation failed for %s", character_id) | |
| def _allowed_rating_values(player_max: ContentRating) -> list[ContentRating]: | |
| max_idx = rating_level(player_max) | |
| return [r for r in (ContentRating.FAMILY, ContentRating.MATURE, ContentRating.UNRESTRICTED) | |
| if rating_level(r) <= max_idx] | |
| def _visible_to(character: Character, player: Player) -> bool: | |
| if character.deleted_at is not None: | |
| return False | |
| if character.visibility == Visibility.PRIVATE and character.owner_id != player.id: | |
| return False | |
| if not rating_allowed(character.content_rating, player.max_content_rating): | |
| return False | |
| return True | |
| def list_characters( | |
| player: Player = Depends(require_player), | |
| session: Session = Depends(get_session), | |
| ) -> list[CharacterSummary]: | |
| allowed = _allowed_rating_values(player.max_content_rating) | |
| rows = session.execute( | |
| select(Character) | |
| .where(Character.deleted_at.is_(None)) | |
| .where(Character.content_rating.in_(allowed)) | |
| .where( | |
| or_( | |
| Character.visibility == Visibility.PUBLIC, | |
| Character.owner_id == player.id, | |
| ) | |
| ) | |
| .order_by(Character.created_at.desc()) | |
| ).scalars() | |
| return [CharacterSummary.model_validate(c) for c in rows] | |
| def get_character( | |
| character_id: str, | |
| player: Player = Depends(require_player), | |
| session: Session = Depends(get_session), | |
| ) -> CharacterDetail: | |
| character = session.get(Character, character_id) | |
| if character is None or character.deleted_at is not None: | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| if not _visible_to(character, player): | |
| # Private-not-owner and rating-filtered both surface as 404 — we | |
| # don't leak existence details via auth errors. | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| scope_counts = counts_by_scope(session, character_id=character_id) | |
| type_counts = counts_by_type(session, character_id=character_id) | |
| detail = CharacterDetail.model_validate(character).model_copy( | |
| update={ | |
| "memory_count": sum(scope_counts.values()), | |
| "memory_counts_by_scope": MemoryCountsByScope(**scope_counts), | |
| "memory_counts_by_type": MemoryCountsByType(**type_counts), | |
| } | |
| ) | |
| return detail | |
| def create_character( | |
| payload: CharacterCreate, | |
| background: BackgroundTasks, | |
| player: Player = Depends(require_player), | |
| session: Session = Depends(get_session), | |
| ) -> CharacterRead: | |
| _require_character_api() | |
| character = Character( | |
| name=payload.name, | |
| short_description=payload.short_description, | |
| backstory=payload.backstory, | |
| avatar_emoji=payload.avatar_emoji, | |
| aggression=payload.aggression, | |
| risk_tolerance=payload.risk_tolerance, | |
| patience=payload.patience, | |
| trash_talk=payload.trash_talk, | |
| target_elo=payload.target_elo, | |
| current_elo=payload.target_elo, | |
| floor_elo=payload.target_elo, | |
| max_elo=payload.max_elo if payload.max_elo is not None else payload.target_elo + 400, | |
| adaptive=payload.adaptive, | |
| opening_preferences=list(payload.opening_preferences), | |
| voice_descriptor=payload.voice_descriptor, | |
| quirks=payload.quirks, | |
| visibility=payload.visibility, | |
| content_rating=payload.content_rating, | |
| owner_id=player.id, | |
| state=CharacterState.GENERATING_MEMORIES, | |
| memory_generation_started_at=datetime.utcnow(), | |
| is_preset=False, | |
| ) | |
| session.add(character) | |
| session.commit() | |
| session.refresh(character) | |
| background.add_task(_run_generation_bg, character.id) | |
| return CharacterRead.model_validate(character) | |
| def update_character( | |
| character_id: str, | |
| payload: CharacterUpdate, | |
| player: Player = Depends(require_player), | |
| session: Session = Depends(get_session), | |
| ) -> CharacterRead: | |
| _require_character_api() | |
| character = session.get(Character, character_id) | |
| if character is None or character.deleted_at is not None: | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| if character.is_preset: | |
| raise HTTPException(status_code=403, detail="Presets cannot be edited. Clone first.") | |
| if character.owner_id != player.id: | |
| raise HTTPException(status_code=403, detail="Not your character.") | |
| data = payload.model_dump(exclude_unset=True, exclude_none=True) | |
| for field, value in data.items(): | |
| setattr(character, field, value) | |
| character.updated_at = datetime.utcnow() | |
| session.commit() | |
| session.refresh(character) | |
| return CharacterRead.model_validate(character) | |
| def regenerate_memories( | |
| character_id: str, | |
| background: BackgroundTasks, | |
| player: Player = Depends(require_player), | |
| session: Session = Depends(get_session), | |
| ) -> dict: | |
| character = session.get(Character, character_id) | |
| if character is None or character.deleted_at is not None: | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| if character.is_preset: | |
| raise HTTPException(status_code=403, detail="Presets cannot be regenerated.") | |
| if character.owner_id != player.id: | |
| raise HTTPException(status_code=403, detail="Not your character.") | |
| # Drop existing non-match memories — backstory-derived lore. Keep | |
| # MATCH_RECAP / OPPONENT_SPECIFIC rows (real play history). | |
| from app.models.memory import Memory, MemoryScope as MS | |
| session.query(Memory).filter( | |
| Memory.character_id == character_id, | |
| Memory.scope.in_([MS.CHARACTER_LORE, MS.CROSS_PLAYER]), | |
| ).delete(synchronize_session=False) | |
| character.state = CharacterState.GENERATING_MEMORIES | |
| character.memory_generation_started_at = datetime.utcnow() | |
| character.memory_generation_error = None | |
| session.commit() | |
| background.add_task(_run_generation_bg, character_id) | |
| return {"status": "accepted", "character_id": character_id} | |
| def clone_character( | |
| character_id: str, | |
| background: BackgroundTasks, | |
| player: Player = Depends(require_player), | |
| session: Session = Depends(get_session), | |
| ) -> CharacterRead: | |
| _require_character_api() | |
| source = session.get(Character, character_id) | |
| if source is None or source.deleted_at is not None: | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| if not _visible_to(source, player): | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| clone = Character( | |
| name=f"{source.name} (clone)", | |
| short_description=source.short_description, | |
| backstory=source.backstory, | |
| avatar_emoji=source.avatar_emoji, | |
| aggression=source.aggression, | |
| risk_tolerance=source.risk_tolerance, | |
| patience=source.patience, | |
| trash_talk=source.trash_talk, | |
| target_elo=source.target_elo, | |
| current_elo=source.target_elo, # fresh Elo state | |
| floor_elo=source.target_elo, | |
| max_elo=source.max_elo, | |
| adaptive=source.adaptive, | |
| opening_preferences=list(source.opening_preferences or []), | |
| voice_descriptor=source.voice_descriptor, | |
| quirks=source.quirks, | |
| visibility=Visibility.PUBLIC, | |
| content_rating=source.content_rating, | |
| owner_id=player.id, | |
| is_preset=False, | |
| preset_key=None, | |
| state=CharacterState.GENERATING_MEMORIES, | |
| memory_generation_started_at=datetime.utcnow(), | |
| ) | |
| session.add(clone) | |
| session.commit() | |
| session.refresh(clone) | |
| # Fresh memory generation — independent of the source (even if the | |
| # source is still generating; see phase_3a_decisions.md). | |
| background.add_task(_run_generation_bg, clone.id) | |
| return CharacterRead.model_validate(clone) | |
| def list_memories( | |
| character_id: str, | |
| scope: MemoryScope | None = None, | |
| type: MemoryType | None = None, | |
| offset: int = Query(0, ge=0), | |
| limit: int = Query(50, ge=1, le=200), | |
| player: Player = Depends(require_player), | |
| session: Session = Depends(get_session), | |
| ) -> dict: | |
| character = session.get(Character, character_id) | |
| if character is None or character.deleted_at is not None: | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| if not _visible_to(character, player): | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| rows, total = list_for_character( | |
| session, | |
| character_id=character_id, | |
| scope=scope, | |
| type_=type, | |
| offset=offset, | |
| limit=limit, | |
| ) | |
| return { | |
| "total": total, | |
| "offset": offset, | |
| "limit": limit, | |
| "items": [MemoryRead.model_validate(r).model_dump(mode="json") for r in rows], | |
| } | |
| def delete_character( | |
| character_id: str, | |
| player: Player = Depends(require_player), | |
| session: Session = Depends(get_session), | |
| ) -> None: | |
| character = session.get(Character, character_id) | |
| if character is None or character.deleted_at is not None: | |
| raise HTTPException(status_code=404, detail="Character not found") | |
| if character.is_preset: | |
| raise HTTPException(status_code=403, detail="Preset characters cannot be deleted") | |
| if character.owner_id != player.id: | |
| raise HTTPException(status_code=403, detail="Not your character.") | |
| character.deleted_at = datetime.utcnow() | |
| session.commit() | |