# src/main.py import os from fastapi import FastAPI, HTTPException, Response, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import JSONResponse, RedirectResponse, Response as FastAPIResponse from sqlmodel import select from dotenv import load_dotenv from .db import init_db, get_session from .models import User, Venue, CheckinSession, Visit, TokenBalance, RewardLog from .schemas import VenueCreate, SessionCreate, SessionInfo, VerifyPayload, VerifyResult, Balance from .auth import verify_signature, MESSAGE_PREFIX, SignatureError from .qr import make_qr_png from .utils import new_session_id, expiry, now_utc from .scoring import calc_score, segment from .social import share_urls load_dotenv() app = FastAPI(title="Checkin-CRM") # CORS(必要に応じて制限) origins = os.getenv("CORS_ALLOW_ORIGINS", "*").split(",") app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 静的ファイル(UI) app.mount("/static", StaticFiles(directory="static"), name="static") @app.on_event("startup") def on_startup(): # ディレクトリ作成やパスの可否は db.py 側で吸収。ここはテーブル作成のみ。 init_db() # ルートは管理UIへ @app.get("/") def root_redirect(): return RedirectResponse(url="/static/admin.html", status_code=307) # ------------------------- # 管理系(簡易デモ用) # ------------------------- @app.post("/api/admin/venues") def create_venue(vc: VenueCreate): # デモ簡略化のため認証なし(本番は認証を追加してください) with get_session() as s: exists = s.exec(select(Venue).where(Venue.slug == vc.slug)).first() if exists: raise HTTPException(409, "slug already exists") v = Venue(name=vc.name, slug=vc.slug) s.add(v); s.commit(); s.refresh(v) return {"id": v.id, "name": v.name, "slug": v.slug} @app.get("/api/admin/venues") def list_venues(): with get_session() as s: items = s.exec(select(Venue)).all() return [{"id": v.id, "name": v.name, "slug": v.slug} for v in items] @app.post("/api/admin/seed") def seed_demo(): with get_session() as s: v = s.exec(select(Venue).where(Venue.slug == "demo")).first() if v: return {"id": v.id, "name": v.name, "slug": v.slug, "created": False} v = Venue(name="デモ店舗", slug="demo") s.add(v); s.commit(); s.refresh(v) return {"id": v.id, "name": v.name, "slug": v.slug, "created": True} # ------------------------- # チェックイン発行〜検証 # ------------------------- @app.post("/api/checkin/session") def create_session(sc: SessionCreate): with get_session() as s: venue = s.exec(select(Venue).where(Venue.slug == sc.venue_slug)).first() if not venue: raise HTTPException(404, "venue not found") sid = new_session_id() cs = CheckinSession(session_id=sid, venue_id=venue.id, nonce=new_session_id(), expires_at=expiry(5)) s.add(cs); s.commit() return {"session_id": sid} @app.get("/api/checkin/session/{session_id}", response_model=SessionInfo) def get_session_info(session_id: str): with get_session() as s: cs = s.exec(select(CheckinSession).where(CheckinSession.session_id == session_id)).first() if not cs: raise HTTPException(404, "session not found") if cs.expires_at < now_utc() or cs.used: raise HTTPException(410, "session expired or used") venue = s.get(Venue, cs.venue_id) return SessionInfo(session_id=session_id, venue_name=venue.name, nonce=cs.nonce, expires_at=cs.expires_at) # QR画像(絶対URLを埋め込み/キャッシュ抑止/埋め込み環境でも安定描画) @app.get("/qrcode/{session_id}") def qrcode_png(session_id: str, request: Request): base = str(request.base_url).rstrip("/") # 例: https://xxx.hf.space url = f"{base}/checkin/{session_id}" png = make_qr_png(url) return FastAPIResponse( content=png, media_type="image/png", headers={ "Cache-Control": "no-store, max-age=0", "Content-Disposition": f'inline; filename="checkin_{session_id}.png"', }, ) # チェックイン画面(利用者UIへリダイレクト) @app.get("/checkin/{session_id}") def checkin_page(session_id: str): return Response(status_code=307, headers={"Location": f"/static/index.html?session={session_id}"}) POINTS_PER_CHECKIN = 50 @app.post("/api/checkin/verify", response_model=VerifyResult) def verify_and_reward(payload: VerifyPayload): with get_session() as s: cs = s.exec(select(CheckinSession).where(CheckinSession.session_id == payload.session_id)).first() if not cs: raise HTTPException(404, "session not found") if cs.used or cs.expires_at < now_utc(): raise HTTPException(410, "session expired or used") message = f"{MESSAGE_PREFIX}{cs.nonce}:{payload.address}" try: verify_signature(payload.address, payload.signature, message) except SignatureError as e: raise HTTPException(400, f"invalid signature: {e}") # upsert user user = s.exec(select(User).where(User.address == payload.address)).first() if not user: user = User(address=payload.address) s.add(user); s.commit(); s.refresh(user) # 訪問記録 visit = Visit(user_id=user.id, venue_id=cs.venue_id, session_id=cs.id) s.add(visit) # ポイント付与 balance = s.exec( select(TokenBalance).where(TokenBalance.user_id == user.id, TokenBalance.symbol == "POINT") ).first() if not balance: balance = TokenBalance(user_id=user.id, symbol="POINT", amount=0) s.add(balance); s.commit(); s.refresh(balance) balance.amount += POINTS_PER_CHECKIN s.add(RewardLog(user_id=user.id, venue_id=cs.venue_id, points=POINTS_PER_CHECKIN)) # セッション無効化 cs.used = True s.add(cs); s.commit() return VerifyResult(ok=True, message="Check-in success", points_awarded=POINTS_PER_CHECKIN) # ------------------------- # ウォレット/共有/CRM # ------------------------- @app.get("/api/wallet/{address}/balance", response_model=Balance) def get_balance(address: str): with get_session() as s: user = s.exec(select(User).where(User.address == address)).first() if not user: return Balance(address=address, balance=0) bal = s.exec(select(TokenBalance).where(TokenBalance.user_id == user.id, TokenBalance.symbol == "POINT")).first() return Balance(address=address, balance=bal.amount if bal else 0) @app.get("/api/share") def get_share(text: str, url: str): return JSONResponse(share_urls(text, url)) @app.get("/api/crm/segments") def crm_segments(): with get_session() as s: users = s.exec(select(User)).all() out = [] for u in users: sc = calc_score(u.id) out.append({"address": u.address, "score": sc, "segment": segment(sc)}) return out @app.get("/api/crm/export.csv") def crm_export_csv(): import csv, io with get_session() as s: users = s.exec(select(User)).all() buf = io.StringIO() w = csv.writer(buf) w.writerow(["address", "score", "segment"]) for u in users: sc = calc_score(u.id) w.writerow([u.address, f"{sc:.4f}", segment(sc)]) data = buf.getvalue().encode("utf-8") headers = {"Content-Disposition": "attachment; filename=crm_export.csv"} return Response(content=data, media_type="text/csv", headers=headers)