QRcheckin / src /main.py
Corin1998's picture
Update src/main.py
2c102d1 verified
# src/main.py
import os
from fastapi import FastAPI, HTTPException, Response, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse, RedirectResponse, Response as FastAPIResponse
from sqlmodel import select
from dotenv import load_dotenv
from .db import init_db, get_session
from .models import User, Venue, CheckinSession, Visit, TokenBalance, RewardLog
from .schemas import VenueCreate, SessionCreate, SessionInfo, VerifyPayload, VerifyResult, Balance
from .auth import verify_signature, MESSAGE_PREFIX, SignatureError
from .qr import make_qr_png
from .utils import new_session_id, expiry, now_utc
from .scoring import calc_score, segment
from .social import share_urls
load_dotenv()
app = FastAPI(title="Checkin-CRM")
# CORS(必要に応じて制限)
origins = os.getenv("CORS_ALLOW_ORIGINS", "*").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 静的ファイル(UI)
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.on_event("startup")
def on_startup():
# ディレクトリ作成やパスの可否は db.py 側で吸収。ここはテーブル作成のみ。
init_db()
# ルートは管理UIへ
@app.get("/")
def root_redirect():
return RedirectResponse(url="/static/admin.html", status_code=307)
# -------------------------
# 管理系(簡易デモ用)
# -------------------------
@app.post("/api/admin/venues")
def create_venue(vc: VenueCreate):
# デモ簡略化のため認証なし(本番は認証を追加してください)
with get_session() as s:
exists = s.exec(select(Venue).where(Venue.slug == vc.slug)).first()
if exists:
raise HTTPException(409, "slug already exists")
v = Venue(name=vc.name, slug=vc.slug)
s.add(v); s.commit(); s.refresh(v)
return {"id": v.id, "name": v.name, "slug": v.slug}
@app.get("/api/admin/venues")
def list_venues():
with get_session() as s:
items = s.exec(select(Venue)).all()
return [{"id": v.id, "name": v.name, "slug": v.slug} for v in items]
@app.post("/api/admin/seed")
def seed_demo():
with get_session() as s:
v = s.exec(select(Venue).where(Venue.slug == "demo")).first()
if v:
return {"id": v.id, "name": v.name, "slug": v.slug, "created": False}
v = Venue(name="デモ店舗", slug="demo")
s.add(v); s.commit(); s.refresh(v)
return {"id": v.id, "name": v.name, "slug": v.slug, "created": True}
# -------------------------
# チェックイン発行〜検証
# -------------------------
@app.post("/api/checkin/session")
def create_session(sc: SessionCreate):
with get_session() as s:
venue = s.exec(select(Venue).where(Venue.slug == sc.venue_slug)).first()
if not venue:
raise HTTPException(404, "venue not found")
sid = new_session_id()
cs = CheckinSession(session_id=sid, venue_id=venue.id, nonce=new_session_id(), expires_at=expiry(5))
s.add(cs); s.commit()
return {"session_id": sid}
@app.get("/api/checkin/session/{session_id}", response_model=SessionInfo)
def get_session_info(session_id: str):
with get_session() as s:
cs = s.exec(select(CheckinSession).where(CheckinSession.session_id == session_id)).first()
if not cs:
raise HTTPException(404, "session not found")
if cs.expires_at < now_utc() or cs.used:
raise HTTPException(410, "session expired or used")
venue = s.get(Venue, cs.venue_id)
return SessionInfo(session_id=session_id, venue_name=venue.name, nonce=cs.nonce, expires_at=cs.expires_at)
# QR画像(絶対URLを埋め込み/キャッシュ抑止/埋め込み環境でも安定描画)
@app.get("/qrcode/{session_id}")
def qrcode_png(session_id: str, request: Request):
base = str(request.base_url).rstrip("/") # 例: https://xxx.hf.space
url = f"{base}/checkin/{session_id}"
png = make_qr_png(url)
return FastAPIResponse(
content=png,
media_type="image/png",
headers={
"Cache-Control": "no-store, max-age=0",
"Content-Disposition": f'inline; filename="checkin_{session_id}.png"',
},
)
# チェックイン画面(利用者UIへリダイレクト)
@app.get("/checkin/{session_id}")
def checkin_page(session_id: str):
return Response(status_code=307, headers={"Location": f"/static/index.html?session={session_id}"})
POINTS_PER_CHECKIN = 50
@app.post("/api/checkin/verify", response_model=VerifyResult)
def verify_and_reward(payload: VerifyPayload):
with get_session() as s:
cs = s.exec(select(CheckinSession).where(CheckinSession.session_id == payload.session_id)).first()
if not cs:
raise HTTPException(404, "session not found")
if cs.used or cs.expires_at < now_utc():
raise HTTPException(410, "session expired or used")
message = f"{MESSAGE_PREFIX}{cs.nonce}:{payload.address}"
try:
verify_signature(payload.address, payload.signature, message)
except SignatureError as e:
raise HTTPException(400, f"invalid signature: {e}")
# upsert user
user = s.exec(select(User).where(User.address == payload.address)).first()
if not user:
user = User(address=payload.address)
s.add(user); s.commit(); s.refresh(user)
# 訪問記録
visit = Visit(user_id=user.id, venue_id=cs.venue_id, session_id=cs.id)
s.add(visit)
# ポイント付与
balance = s.exec(
select(TokenBalance).where(TokenBalance.user_id == user.id, TokenBalance.symbol == "POINT")
).first()
if not balance:
balance = TokenBalance(user_id=user.id, symbol="POINT", amount=0)
s.add(balance); s.commit(); s.refresh(balance)
balance.amount += POINTS_PER_CHECKIN
s.add(RewardLog(user_id=user.id, venue_id=cs.venue_id, points=POINTS_PER_CHECKIN))
# セッション無効化
cs.used = True
s.add(cs); s.commit()
return VerifyResult(ok=True, message="Check-in success", points_awarded=POINTS_PER_CHECKIN)
# -------------------------
# ウォレット/共有/CRM
# -------------------------
@app.get("/api/wallet/{address}/balance", response_model=Balance)
def get_balance(address: str):
with get_session() as s:
user = s.exec(select(User).where(User.address == address)).first()
if not user:
return Balance(address=address, balance=0)
bal = s.exec(select(TokenBalance).where(TokenBalance.user_id == user.id, TokenBalance.symbol == "POINT")).first()
return Balance(address=address, balance=bal.amount if bal else 0)
@app.get("/api/share")
def get_share(text: str, url: str):
return JSONResponse(share_urls(text, url))
@app.get("/api/crm/segments")
def crm_segments():
with get_session() as s:
users = s.exec(select(User)).all()
out = []
for u in users:
sc = calc_score(u.id)
out.append({"address": u.address, "score": sc, "segment": segment(sc)})
return out
@app.get("/api/crm/export.csv")
def crm_export_csv():
import csv, io
with get_session() as s:
users = s.exec(select(User)).all()
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["address", "score", "segment"])
for u in users:
sc = calc_score(u.id)
w.writerow([u.address, f"{sc:.4f}", segment(sc)])
data = buf.getvalue().encode("utf-8")
headers = {"Content-Disposition": "attachment; filename=crm_export.csv"}
return Response(content=data, media_type="text/csv", headers=headers)