etsy-app / main.py
lethientien's picture
Upload 28 files
e4dfa4d verified
import os
import pandas as pd
import io
import re
import json
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, Form, Path, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.responses import RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Boolean, Date, JSON, or_, text, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship, joinedload, subqueryload
from passlib.context import CryptContext
from jose import JWTError, jwt
# ==========================================
# 1. DATABASE CONFIGURATION
# ==========================================
from dotenv import load_dotenv
# Load environment variables from .env file (if it exists)
load_dotenv()
# ==========================================
# 1. DATABASE CONFIGURATION
# ==========================================
# STRICT MODE: Requires DATABASE_URL in environment variables or .env file
SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL")
if not SQLALCHEMY_DATABASE_URL:
raise ValueError(
"CRITICAL ERROR: DATABASE_URL is missing!\n"
" - If running locally: Create a .env file (copy from .env.example) and define DATABASE_URL.\n"
" - If running on Render: Add DATABASE_URL to 'Environment Variables'."
)
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# ==========================================
# 2. DATABASE MODELS - MULTI-SHOP ARCHITECTURE
# ==========================================
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
role = Column(String, default="user") # 'admin', 'user'
permissions = Column(JSON, default=[]) # List of permission strings
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
class Shop(Base):
"""Master Shop entity - each shop is completely isolated"""
__tablename__ = "shops"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
slug = Column(String, unique=True, nullable=False, index=True)
description = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
is_active = Column(Boolean, default=True)
# Relationships - CASCADE DELETE ensures complete isolation
orders = relationship("EtsyOrder", back_populates="shop", cascade="all, delete-orphan")
providers = relationship("FulfillmentProvider", back_populates="shop", cascade="all, delete-orphan")
imports = relationship("ImportHistory", back_populates="shop", cascade="all, delete-orphan")
fulfillment_records = relationship("FulfillmentRecord", back_populates="shop", cascade="all, delete-orphan")
class ImportHistory(Base):
__tablename__ = "import_history"
id = Column(Integer, primary_key=True, index=True)
shop_id = Column(Integer, ForeignKey("shops.id", ondelete="CASCADE"), nullable=False, index=True)
import_type = Column(String) # 'etsy' or 'fulfillment'
file_name = Column(String)
upload_date = Column(DateTime, default=datetime.utcnow)
record_count = Column(Integer, default=0)
provider_id = Column(Integer, ForeignKey("fulfillment_providers.id", ondelete="SET NULL"), nullable=True)
raw_content = Column(String)
# Relationships
shop = relationship("Shop", back_populates="imports")
etsy_orders = relationship("EtsyOrder", back_populates="import_source", cascade="all, delete-orphan")
fulfillment_records = relationship("FulfillmentRecord", back_populates="import_source", cascade="all, delete-orphan", foreign_keys="FulfillmentRecord.import_id")
class EtsyOrder(Base):
__tablename__ = "etsy_orders"
__table_args__ = (
UniqueConstraint('shop_id', 'order_id', name='uq_shop_order'),
)
id = Column(Integer, primary_key=True, index=True) # New auto-increment PK
shop_id = Column(Integer, ForeignKey("shops.id", ondelete="CASCADE"), nullable=False, index=True)
order_id = Column(String, nullable=False, index=True) # Etsy's order ID (not globally unique anymore)
sale_date = Column(Date)
full_name = Column(String)
first_name = Column(String)
last_name = Column(String)
buyer_user_id = Column(String)
email = Column(String, nullable=True)
street_1 = Column(String)
street_2 = Column(String, nullable=True)
ship_city = Column(String)
ship_state = Column(String)
ship_zipcode = Column(String, index=True)
ship_country = Column(String)
currency = Column(String, default="USD")
order_value = Column(Float, default=0.0)
order_total = Column(Float, default=0.0)
order_net = Column(Float, default=0.0)
shipping_revenue = Column(Float, default=0.0)
skus = Column(String)
import_id = Column(Integer, ForeignKey("import_history.id", ondelete="SET NULL"), nullable=True)
# Relationships
shop = relationship("Shop", back_populates="orders")
import_source = relationship("ImportHistory", back_populates="etsy_orders")
fulfillment_records = relationship("FulfillmentRecord", back_populates="etsy_order")
class FulfillmentProvider(Base):
__tablename__ = "fulfillment_providers"
__table_args__ = (
UniqueConstraint('shop_id', 'name', name='uq_shop_provider_name'),
)
id = Column(Integer, primary_key=True, index=True)
shop_id = Column(Integer, ForeignKey("shops.id", ondelete="CASCADE"), nullable=False, index=True)
name = Column(String, nullable=False)
mapping_config = Column(JSON, default={})
# Relationships
shop = relationship("Shop", back_populates="providers")
fulfillment_records = relationship("FulfillmentRecord", back_populates="provider")
class FulfillmentRecord(Base):
__tablename__ = "fulfillment_records"
id = Column(Integer, primary_key=True, index=True)
shop_id = Column(Integer, ForeignKey("shops.id", ondelete="CASCADE"), nullable=False, index=True)
provider_id = Column(Integer, ForeignKey("fulfillment_providers.id", ondelete="SET NULL"), nullable=True)
ref_id_value = Column(String, index=True)
total_cost = Column(Float, default=0.0)
tracking_number = Column(String, nullable=True)
raw_data = Column(JSON)
is_matched = Column(Boolean, default=False)
match_method = Column(String, nullable=True)
etsy_order_id = Column(Integer, ForeignKey("etsy_orders.id", ondelete="SET NULL"), nullable=True)
import_id = Column(Integer, ForeignKey("import_history.id", ondelete="SET NULL"), nullable=True)
# Relationships
shop = relationship("Shop", back_populates="fulfillment_records")
provider = relationship("FulfillmentProvider", back_populates="fulfillment_records")
etsy_order = relationship("EtsyOrder", back_populates="fulfillment_records")
import_source = relationship("ImportHistory", back_populates="fulfillment_records", foreign_keys=[import_id])
# ==========================================
# 3. DATABASE MIGRATION
# ==========================================
def run_migrations(engine):
"""Run migrations to add new columns for multi-shop support"""
with engine.connect() as conn:
conn = conn.execution_options(isolation_level="AUTOCOMMIT")
# Check if shops table exists, if not the schema is fresh
try:
conn.execute(text("SELECT 1 FROM shops LIMIT 1"))
except Exception:
# Fresh database, no migration needed
return
# Add shop_id columns if missing (for existing installations)
migrations = [
("etsy_orders", "shop_id", "INTEGER REFERENCES shops(id) ON DELETE CASCADE"),
("fulfillment_providers", "shop_id", "INTEGER REFERENCES shops(id) ON DELETE CASCADE"),
("fulfillment_records", "shop_id", "INTEGER REFERENCES shops(id) ON DELETE CASCADE"),
("import_history", "shop_id", "INTEGER REFERENCES shops(id) ON DELETE CASCADE"),
("users", "permissions", "JSON DEFAULT '[]'"),
]
for table, column, definition in migrations:
try:
conn.execute(text(f"ALTER TABLE {table} ADD COLUMN {column} {definition}"))
print(f"Migrated: Added {column} to {table}")
except Exception:
pass # Column likely exists
# Create all tables
Base.metadata.create_all(bind=engine)
run_migrations(engine)
# ==========================================
# 4. HELPER FUNCTIONS
# ==========================================
# Security Config
# Security Config
SECRET_KEY = os.getenv("API_SECRET_KEY", "changeme_in_production_please_9bd5644faf")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours for convenience
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == token_data.username).first()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_admin_user(current_user: User = Depends(get_current_active_user)):
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges"
)
return current_user
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def clean_currency(value):
if pd.isna(value) or value == "":
return 0.0
if isinstance(value, (int, float)):
return float(value)
clean_str = str(value).lower().replace('$', '').replace('usd', '').replace('eur', '').strip()
clean_str = clean_str.replace(',', '')
try:
match = re.search(r'(\d+(\.\d+)?)', clean_str)
if match:
return float(match.group(1))
return 0.0
except ValueError:
return 0.0
def clean_value(value):
"""Convert pandas NaN to None for database insertion"""
if pd.isna(value):
return None
return value
def safe_str(value, default=""):
"""Safely convert value to string, handling NaN"""
if pd.isna(value) or value is None:
return default
return str(value)
def normalize_search_text(s):
if not s or pd.isna(s):
return ""
s = str(s).lower()
s = re.sub(r'[^\w\s]', ' ', s)
return re.sub(r'\s+', ' ', s).strip()
def extract_house_number(address):
if not address: return ""
match = re.search(r'\d+', str(address))
return match.group(0) if match else ""
def get_shop_or_404(db: Session, shop_id: int) -> Shop:
"""Helper to get shop or raise 404"""
shop = db.query(Shop).filter(Shop.id == shop_id).first()
if not shop:
raise HTTPException(status_code=404, detail=f"Shop with id {shop_id} not found")
return shop
# ==========================================
# 5. PYDANTIC MODELS
# ==========================================
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class UserBase(BaseModel):
username: str
class UserCreate(UserBase):
password: str
role: str = "user"
permissions: List[str] = []
class UserResponse(UserBase):
id: int
role: str
permissions: List[str] = []
is_active: bool
created_at: datetime
class Config:
from_attributes = True
class ShopCreate(BaseModel):
name: str
slug: str
description: Optional[str] = None
class ShopUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
is_active: Optional[bool] = None
class ProviderCreate(BaseModel):
name: str
mapping_config: Dict[str, str]
# ==========================================
# 6. FASTAPI APPLICATION
# ==========================================
app = FastAPI(title="Etsy Profit Manager Backend - Multi-Shop")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==========================================
# 7. AUTHENTICATION & USER MANAGEMENT
# ==========================================
@app.on_event("startup")
def create_initial_admin():
db = SessionLocal()
try:
if db.query(User).count() == 0:
admin_user = User(
username="admin",
hashed_password=get_password_hash("admin123"),
role="admin"
)
db.add(admin_user)
db.commit()
print("Default admin created: admin / admin123")
finally:
db.close()
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=UserResponse)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/admin/users", response_model=List[UserResponse])
async def read_users(db: Session = Depends(get_db), current_user: User = Depends(get_admin_user)):
users = db.query(User).all()
return users
@app.post("/admin/users", response_model=UserResponse)
async def create_user(user: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user)):
db_user = db.query(User).filter(User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
hashed_password = get_password_hash(user.password)
new_user = User(
username=user.username,
hashed_password=hashed_password,
role=user.role,
permissions=user.permissions
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
@app.put("/admin/users/{user_id}/permissions", response_model=UserResponse)
async def update_user_permissions(user_id: int, permissions: List[str], db: Session = Depends(get_db), current_user: User = Depends(get_admin_user)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Don't allow removing permissions from super admin if checks were strict,
# but here we rely on role='admin' mostly.
# Still good practice not to mess with own permissions if not needed.
user.permissions = permissions
db.commit()
db.refresh(user)
return user
@app.delete("/admin/users/{user_id}")
async def delete_user(user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot delete yourself")
db.delete(user)
db.commit()
return {"status": "success", "message": "User deleted"}
# ==========================================
# 8. SHOP MANAGEMENT ENDPOINTS
# ==========================================
@app.get("/")
def read_root():
return RedirectResponse(url="/docs")
@app.get("/api/shops")
def list_shops(db: Session = Depends(get_db)):
"""List all shops"""
shops = db.query(Shop).options(subqueryload(Shop.orders), subqueryload(Shop.providers)).filter(Shop.is_active == True).order_by(Shop.created_at.desc()).all()
return [{
"id": s.id,
"name": s.name,
"slug": s.slug,
"description": s.description,
"created_at": s.created_at,
"is_active": s.is_active,
"order_count": len(s.orders),
"provider_count": len(s.providers)
} for s in shops]
@app.post("/api/shops")
def create_shop(
name: str = Form(...),
slug: str = Form(...),
description: str = Form(None),
db: Session = Depends(get_db)
):
"""Create a new shop"""
# Validate slug uniqueness
existing = db.query(Shop).filter(Shop.slug == slug).first()
if existing:
raise HTTPException(status_code=400, detail=f"Shop with slug '{slug}' already exists")
# Normalize slug
normalized_slug = re.sub(r'[^a-z0-9-]', '-', slug.lower().strip())
shop = Shop(
name=name,
slug=normalized_slug,
description=description
)
db.add(shop)
db.commit()
db.refresh(shop)
return {
"id": shop.id,
"name": shop.name,
"slug": shop.slug,
"description": shop.description,
"message": "Shop created successfully"
}
@app.get("/api/shops/{shop_id}")
def get_shop(shop_id: int, db: Session = Depends(get_db)):
"""Get shop details"""
shop = get_shop_or_404(db, shop_id)
return {
"id": shop.id,
"name": shop.name,
"slug": shop.slug,
"description": shop.description,
"created_at": shop.created_at,
"is_active": shop.is_active,
"order_count": len(shop.orders),
"provider_count": len(shop.providers)
}
@app.put("/api/shops/{shop_id}")
def update_shop(
shop_id: int,
name: str = Form(None),
description: str = Form(None),
is_active: bool = Form(None),
db: Session = Depends(get_db)
):
"""Update shop details"""
shop = get_shop_or_404(db, shop_id)
if name is not None:
shop.name = name
if description is not None:
shop.description = description
if is_active is not None:
shop.is_active = is_active
db.commit()
return {"status": "success", "message": "Shop updated"}
@app.delete("/api/shops/{shop_id}")
def delete_shop(shop_id: int, db: Session = Depends(get_db)):
"""Delete shop and ALL its data (CASCADE)"""
shop = get_shop_or_404(db, shop_id)
db.delete(shop)
db.commit()
return {"status": "success", "message": f"Shop '{shop.name}' and all its data deleted"}
@app.post("/api/shops/{shop_id}/reset")
def reset_shop_data(shop_id: int, db: Session = Depends(get_db)):
"""Reset all data for a specific shop (keep shop itself)"""
shop = get_shop_or_404(db, shop_id)
# Delete in order respecting FK constraints
db.query(FulfillmentRecord).filter(FulfillmentRecord.shop_id == shop_id).delete()
db.query(EtsyOrder).filter(EtsyOrder.shop_id == shop_id).delete()
db.query(ImportHistory).filter(ImportHistory.shop_id == shop_id).delete()
db.query(FulfillmentProvider).filter(FulfillmentProvider.shop_id == shop_id).delete()
db.commit()
return {"status": "success", "message": f"All data for shop '{shop.name}' has been reset"}
# ==========================================
# 8. SHOP-SCOPED ORDER ENDPOINTS
# ==========================================
@app.get("/api/shops/{shop_id}/orders")
def get_orders(shop_id: int, db: Session = Depends(get_db)):
"""Get all orders for a specific shop"""
get_shop_or_404(db, shop_id)
orders = db.query(EtsyOrder).options(joinedload(EtsyOrder.fulfillment_records)).filter(EtsyOrder.shop_id == shop_id).all()
result = []
for o in orders:
status = "Pending"
fulfillment_cost = 0.0
match_info = []
if o.fulfillment_records:
status = "Shipped"
for rec in o.fulfillment_records:
fulfillment_cost += rec.total_cost
if rec.match_method:
match_info.append(rec.match_method)
result.append({
"id": o.order_id,
"internal_id": o.id,
"date": o.sale_date,
"customer": o.full_name,
"items": o.skus,
"total": o.order_total,
"status": status,
"fulfillment_cost": fulfillment_cost,
"match_info": match_info
})
return result
# ==========================================
# 9. SHOP-SCOPED PROVIDER ENDPOINTS
# ==========================================
@app.get("/api/shops/{shop_id}/providers")
def get_providers(shop_id: int, db: Session = Depends(get_db)):
"""Get all providers for a specific shop"""
get_shop_or_404(db, shop_id)
providers = db.query(FulfillmentProvider).filter(FulfillmentProvider.shop_id == shop_id).all()
return [{"id": p.id, "name": p.name, "mapping_config": p.mapping_config} for p in providers]
@app.post("/api/shops/{shop_id}/providers")
def create_provider(
shop_id: int,
name: str = Form(...),
mapping_config: str = Form(...),
db: Session = Depends(get_db)
):
"""Create or update a provider for a specific shop"""
get_shop_or_404(db, shop_id)
try:
config_dict = json.loads(mapping_config)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON for mapping_config")
# Check if provider exists for THIS shop
provider = db.query(FulfillmentProvider).filter(
FulfillmentProvider.shop_id == shop_id,
FulfillmentProvider.name == name
).first()
if not provider:
provider = FulfillmentProvider(
shop_id=shop_id,
name=name,
mapping_config=config_dict
)
db.add(provider)
else:
provider.mapping_config = config_dict
db.commit()
db.refresh(provider)
return {"status": "success", "provider_id": provider.id}
# ==========================================
# 10. SHOP-SCOPED UPLOAD ENDPOINTS
# ==========================================
@app.post("/api/shops/{shop_id}/upload/etsy")
async def upload_etsy(shop_id: int, file: UploadFile = File(...), db: Session = Depends(get_db)):
"""Upload Etsy orders CSV for a specific shop"""
get_shop_or_404(db, shop_id)
contents = await file.read()
content_str = contents.decode('utf-8')
df = pd.read_csv(io.StringIO(content_str))
df.columns = [c.strip() for c in df.columns]
# Create History Record for THIS shop
history = ImportHistory(
shop_id=shop_id,
import_type="etsy",
file_name=file.filename,
raw_content=content_str,
record_count=0
)
db.add(history)
db.commit()
db.refresh(history)
count = 0
for _, row in df.iterrows():
order_id = str(row.get('Order ID', ''))
# Check if order exists for THIS shop only
existing_order = db.query(EtsyOrder).filter(
EtsyOrder.shop_id == shop_id,
EtsyOrder.order_id == order_id
).first()
order_data = {
"shop_id": shop_id,
"order_id": order_id,
"sale_date": pd.to_datetime(row.get('Sale Date')).date(),
"full_name": clean_value(row.get('Full Name')),
"first_name": clean_value(row.get('First Name')),
"last_name": clean_value(row.get('Last Name')),
"buyer_user_id": safe_str(row.get('Buyer User ID'), None),
"street_1": clean_value(row.get('Street 1')),
"street_2": clean_value(row.get('Street 2')),
"ship_city": clean_value(row.get('Ship City')),
"ship_state": clean_value(row.get('Ship State')),
"ship_zipcode": safe_str(row.get('Ship Zipcode', ''), ''),
"ship_country": clean_value(row.get('Ship Country')),
"currency": safe_str(row.get('Currency', 'USD'), 'USD'),
"order_value": clean_currency(row.get('Order Value')),
"order_total": clean_currency(row.get('Order Total')),
"order_net": clean_currency(row.get('Order Net')),
"shipping_revenue": clean_currency(row.get('Shipping')),
"skus": clean_value(row.get('SKU')),
"import_id": history.id
}
if existing_order:
for key, value in order_data.items():
setattr(existing_order, key, value)
else:
new_order = EtsyOrder(**order_data)
db.add(new_order)
count += 1
history.record_count = count
db.commit()
return {"message": f"Successfully processed {count} Etsy orders for shop"}
@app.post("/api/shops/{shop_id}/upload/fulfillment")
async def upload_fulfillment(
shop_id: int,
provider_id: int = Form(...),
file: UploadFile = File(...),
db: Session = Depends(get_db)
):
"""Upload fulfillment CSV for a specific shop"""
get_shop_or_404(db, shop_id)
# Ensure provider belongs to this shop
provider = db.query(FulfillmentProvider).filter(
FulfillmentProvider.id == provider_id,
FulfillmentProvider.shop_id == shop_id
).first()
if not provider:
raise HTTPException(status_code=404, detail="Provider not found for this shop")
mapping = provider.mapping_config
contents = await file.read()
content_str = contents.decode('utf-8')
df = pd.read_csv(io.StringIO(content_str))
# Create History Record
history = ImportHistory(
shop_id=shop_id,
import_type="fulfillment",
file_name=file.filename,
raw_content=content_str,
record_count=0,
provider_id=provider.id
)
db.add(history)
db.commit()
db.refresh(history)
count = 0
ref_id_col = mapping.get("ref_id_col")
cost_col = mapping.get("total_cost_col")
tracking_col = mapping.get("tracking_col")
for _, row in df.iterrows():
raw_data = row.where(pd.notnull(row), None).to_dict()
ref_val = ""
if ref_id_col and ref_id_col in row:
ref_val = str(row[ref_id_col])
cost_val = 0.0
if cost_col and cost_col in row:
cost_val = clean_currency(row[cost_col])
track_val = ""
if tracking_col and tracking_col in row:
track_val = str(row[tracking_col])
record = FulfillmentRecord(
shop_id=shop_id,
provider_id=provider.id,
ref_id_value=ref_val,
total_cost=cost_val,
tracking_number=track_val,
raw_data=raw_data,
is_matched=False,
import_id=history.id
)
db.add(record)
count += 1
history.record_count = count
db.commit()
# Run matching for THIS shop only
match_result = run_matching_algorithm(db, shop_id)
return {
"message": f"Imported {count} records",
"matches_found": match_result
}
# ==========================================
# 11. SHOP-SCOPED IMPORT HISTORY ENDPOINTS
# ==========================================
@app.get("/api/shops/{shop_id}/imports")
def get_imports(
shop_id: int,
import_type: Optional[str] = None,
provider_id: Optional[int] = None,
db: Session = Depends(get_db)
):
"""Get import history for a specific shop"""
get_shop_or_404(db, shop_id)
query = db.query(ImportHistory).filter(ImportHistory.shop_id == shop_id)
if import_type:
query = query.filter(ImportHistory.import_type == import_type)
if provider_id:
query = query.filter(ImportHistory.provider_id == provider_id)
records = query.order_by(ImportHistory.upload_date.desc()).all()
return [{
"id": r.id,
"date": r.upload_date,
"file_name": r.file_name,
"count": r.record_count,
"provider_id": r.provider_id
} for r in records]
@app.get("/api/shops/{shop_id}/imports/{import_id}")
def get_import_details(shop_id: int, import_id: int, db: Session = Depends(get_db)):
"""Get import details for a specific shop"""
get_shop_or_404(db, shop_id)
record = db.query(ImportHistory).filter(
ImportHistory.id == import_id,
ImportHistory.shop_id == shop_id
).first()
if not record:
raise HTTPException(status_code=404, detail="Import not found for this shop")
return {
"id": record.id,
"file_name": record.file_name,
"raw_content": record.raw_content
}
@app.delete("/api/shops/{shop_id}/imports/{import_id}")
def delete_import(shop_id: int, import_id: int, db: Session = Depends(get_db)):
"""Delete import for a specific shop"""
get_shop_or_404(db, shop_id)
record = db.query(ImportHistory).filter(
ImportHistory.id == import_id,
ImportHistory.shop_id == shop_id
).first()
if not record:
raise HTTPException(status_code=404, detail="Import not found for this shop")
db.delete(record)
db.commit()
return {"status": "success", "message": "Import deleted"}
# ==========================================
# 12. SHOP-SCOPED ANALYTICS ENDPOINTS
# ==========================================
@app.get("/api/shops/{shop_id}/analytics/summary")
def get_shop_summary(
shop_id: int,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
db: Session = Depends(get_db)
):
"""Get comprehensive summary for a specific shop with comparison to previous period"""
from datetime import timedelta
get_shop_or_404(db, shop_id)
# Parse dates
current_start = None
current_end = None
if start_date:
current_start = datetime.strptime(start_date, "%Y-%m-%d").date()
if end_date:
current_end = datetime.strptime(end_date, "%Y-%m-%d").date()
# Calculate previous period dates
prev_start = None
prev_end = None
if current_start and current_end:
duration = (current_end - current_start).days + 1
prev_end = current_start - timedelta(days=1)
prev_start = prev_end - timedelta(days=duration - 1)
def calculate_period_stats(start_dt, end_dt):
"""Calculate stats for a date range using efficient querying"""
# Base query with EAGER LOADING of fulfillment records to prevent N+1
query = db.query(EtsyOrder).options(joinedload(EtsyOrder.fulfillment_records)).filter(EtsyOrder.shop_id == shop_id)
if start_dt and end_dt:
query = query.filter(EtsyOrder.sale_date >= start_dt, EtsyOrder.sale_date <= end_dt)
orders_list = query.all()
total_revenue = sum(o.order_total for o in orders_list)
total_net = sum(o.order_net for o in orders_list)
total_cost = 0
fulfilled_count = 0
for order in orders_list:
if order.fulfillment_records:
fulfilled_count += 1
total_cost += sum(r.total_cost for r in order.fulfillment_records)
total_profit = total_net - total_cost
margin = (total_profit / total_revenue * 100) if total_revenue > 0 else 0
fulfillment_rate = (fulfilled_count / len(orders_list) * 100) if len(orders_list) > 0 else 0
avg_order_value = total_revenue / len(orders_list) if len(orders_list) > 0 else 0
return {
"total_revenue": round(total_revenue, 2),
"total_cost": round(total_cost, 2),
"total_profit": round(total_profit, 2),
"total_orders": len(orders_list),
"total_fulfilled": fulfilled_count,
"profit_margin": round(margin, 1),
"fulfillment_rate": round(fulfillment_rate, 1),
"avg_order_value": round(avg_order_value, 2)
}
# Calculate current period stats
current_stats = calculate_period_stats(current_start, current_end)
# Calculate previous period stats
prev_stats = None
if prev_start and prev_end:
prev_stats = calculate_period_stats(prev_start, prev_end)
# Calculate percentage changes
def calc_change(current, previous):
if previous and previous > 0:
return round(((current - previous) / previous) * 100, 1)
return None
changes = {}
if prev_stats:
changes = {
"revenue_change": calc_change(current_stats["total_revenue"], prev_stats["total_revenue"]),
"cost_change": calc_change(current_stats["total_cost"], prev_stats["total_cost"]),
"profit_change": calc_change(current_stats["total_profit"], prev_stats["total_profit"]),
"orders_change": calc_change(current_stats["total_orders"], prev_stats["total_orders"]),
"fulfillment_rate_change": calc_change(current_stats["fulfillment_rate"], prev_stats["fulfillment_rate"])
}
shop = db.query(Shop).filter(Shop.id == shop_id).first()
return {
"shop": {
"id": shop.id,
"name": shop.name,
"slug": shop.slug
},
"aggregates": current_stats,
"changes": changes,
"previous_period": prev_stats
}
@app.get("/api/shops/{shop_id}/analytics/net-profit")
def get_profit_data(
shop_id: int,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
db: Session = Depends(get_db)
):
"""Get profit analytics for a specific shop"""
get_shop_or_404(db, shop_id)
# OPTIMIZATION: Use joinedload to fetch fulfillment records in the SAME query
query = db.query(EtsyOrder).options(joinedload(EtsyOrder.fulfillment_records)).filter(EtsyOrder.shop_id == shop_id)
# OPTIMIZATION: Filter by date at database level if provided
if start_date:
query = query.filter(EtsyOrder.sale_date >= start_date)
if end_date:
query = query.filter(EtsyOrder.sale_date <= end_date)
results = query.all()
data = []
for order in results:
fulfillment_recs = order.fulfillment_records
total_fulfillment_cost = sum(rec.total_cost for rec in fulfillment_recs)
net_profit = order.order_net - total_fulfillment_cost
status = "Unfulfilled"
match_info = []
if fulfillment_recs:
status = "Fulfilled"
methods = set()
for r in fulfillment_recs:
if r.match_method:
methods.add(r.match_method)
match_info = list(methods)
data.append({
"order_id": order.order_id,
"internal_id": order.id,
"sale_date": order.sale_date,
"customer": order.full_name,
"revenue": order.order_total,
"etsy_net": order.order_net,
"cost": total_fulfillment_cost,
"profit": net_profit,
"margin": (net_profit / order.order_total * 100) if order.order_total else 0,
"status": status,
"match_info": match_info
})
return data
@app.post("/api/shops/{shop_id}/actions/run-matching")
def trigger_manual_matching(shop_id: int, db: Session = Depends(get_db)):
"""Manually re-run matching for a specific shop"""
get_shop_or_404(db, shop_id)
count = run_matching_algorithm(db, shop_id)
return {"message": f"Matching completed. Found {count} new matches."}
# ==========================================
# 13. CROSS-SHOP ANALYTICS
# ==========================================
@app.get("/api/analytics/all-shops/summary")
def get_all_shops_summary(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
db: Session = Depends(get_db)
):
"""Get comprehensive summary across all shops with aggregates and comparison to previous period"""
from datetime import timedelta
# Parse dates
current_start = None
current_end = None
if start_date:
current_start = datetime.strptime(start_date, "%Y-%m-%d").date()
if end_date:
current_end = datetime.strptime(end_date, "%Y-%m-%d").date()
# Calculate previous period dates_
prev_start = None
prev_end = None
if current_start and current_end:
duration = (current_end - current_start).days + 1
prev_end = current_start - timedelta(days=1)
prev_start = prev_end - timedelta(days=duration - 1)
# Get all active shops for mapping
active_shops = db.query(Shop).filter(Shop.is_active == True).all()
active_shop_ids = [s.id for s in active_shops]
shop_map = {s.id: s for s in active_shops}
def calculate_period_stats(start_dt, end_dt):
"""Calculate stats for a specific period using efficient querying"""
# OPTIMIZATION: Query ORDERS directly with joinedload, instead of looping shops
query = db.query(EtsyOrder).options(joinedload(EtsyOrder.fulfillment_records)).filter(EtsyOrder.shop_id.in_(active_shop_ids))
if start_dt and end_dt:
query = query.filter(EtsyOrder.sale_date >= start_dt, EtsyOrder.sale_date <= end_dt)
all_orders = query.all()
# Group by shop in Python (in-memory aggregation is fast enough for <10k items, better than N+1 queries)
shop_stats_map = {}
# Initialize map
for s_id in active_shop_ids:
shop_stats_map[s_id] = {
"revenue": 0, "net": 0, "cost": 0, "orders": 0, "fulfilled_count": 0
}
grand_totals = {
"total_revenue": 0, "total_cost": 0, "total_profit": 0, "total_orders": 0, "total_fulfilled": 0
}
for order in all_orders:
s_id = order.shop_id
if s_id not in shop_stats_map: continue
stats = shop_stats_map[s_id]
stats["revenue"] += order.order_total
stats["net"] += order.order_net
stats["orders"] += 1
order_cost = 0
if order.fulfillment_records:
stats["fulfilled_count"] += 1
order_cost = sum(r.total_cost for r in order.fulfillment_records)
stats["cost"] += order_cost
# Grand totals
grand_totals["total_revenue"] += order.order_total
grand_totals["total_cost"] += order_cost
grand_totals["total_profit"] += (order.order_net - order_cost)
grand_totals["total_orders"] += 1
if order.fulfillment_records:
grand_totals["total_fulfilled"] += 1
# Format shop summaries
shop_summaries = []
for s_id, stats in shop_stats_map.items():
shop = shop_map.get(s_id)
if not shop: continue
shop_profit = stats["net"] - stats["cost"]
shop_margin = (shop_profit / stats["revenue"] * 100) if stats["revenue"] > 0 else 0
fulfillment_rate = (stats["fulfilled_count"] / stats["orders"] * 100) if stats["orders"] > 0 else 0
avg_order_value = stats["revenue"] / stats["orders"] if stats["orders"] > 0 else 0
shop_summaries.append({
"shop_id": s_id,
"shop_name": shop.name,
"order_count": stats["orders"],
"fulfilled_count": stats["fulfilled_count"],
"fulfillment_rate": round(fulfillment_rate, 1),
"total_revenue": round(stats["revenue"], 2),
"total_cost": round(stats["cost"], 2),
"total_profit": round(shop_profit, 2),
"profit_margin": round(shop_margin, 1),
"avg_order_value": round(avg_order_value, 2)
})
overall_margin = (grand_totals["total_profit"] / grand_totals["total_revenue"] * 100) if grand_totals["total_revenue"] > 0 else 0
overall_fulfillment_rate = (grand_totals["total_fulfilled"] / grand_totals["total_orders"] * 100) if grand_totals["total_orders"] > 0 else 0
return shop_summaries, grand_totals, overall_margin, overall_fulfillment_rate
# Calculate current period
shop_summaries, grand_totals, overall_margin, overall_fulfillment_rate = calculate_period_stats(current_start, current_end)
# Calculate previous period for comparison
prev_totals = None
if prev_start and prev_end:
_, prev_grand_totals, prev_margin, prev_fulfillment_rate = calculate_period_stats(prev_start, prev_end)
prev_totals = {
"total_revenue": prev_grand_totals["total_revenue"],
"total_cost": prev_grand_totals["total_cost"],
"total_profit": prev_grand_totals["total_profit"],
"total_orders": prev_grand_totals["total_orders"],
"total_fulfilled": prev_grand_totals["total_fulfilled"],
"overall_margin": prev_margin,
"overall_fulfillment_rate": prev_fulfillment_rate
}
# Calculate percentage changes
def calc_change(current, previous):
if previous and previous > 0:
return round(((current - previous) / previous) * 100, 1)
return None
changes = {}
if prev_totals:
changes = {
"revenue_change": calc_change(grand_totals["total_revenue"], prev_totals["total_revenue"]),
"cost_change": calc_change(grand_totals["total_cost"], prev_totals["total_cost"]),
"profit_change": calc_change(grand_totals["total_profit"], prev_totals["total_profit"]),
"orders_change": calc_change(grand_totals["total_orders"], prev_totals["total_orders"]),
"fulfillment_rate_change": calc_change(overall_fulfillment_rate, prev_totals["overall_fulfillment_rate"])
}
# Sort shops by profit for ranking
shop_summaries.sort(key=lambda x: x["total_profit"], reverse=True)
# Identify best/worst performers
best_performer = shop_summaries[0] if shop_summaries else None
worst_performer = shop_summaries[-1] if len(shop_summaries) > 1 else None
return {
"shops": shop_summaries,
"aggregates": {
"total_revenue": round(grand_totals["total_revenue"], 2),
"total_cost": round(grand_totals["total_cost"], 2),
"total_profit": round(grand_totals["total_profit"], 2),
"total_orders": grand_totals["total_orders"],
"total_fulfilled": grand_totals["total_fulfilled"],
"overall_margin": round(overall_margin, 1),
"overall_fulfillment_rate": round(overall_fulfillment_rate, 1),
"shop_count": len(active_shops)
},
"changes": changes,
"previous_period": prev_totals,
"best_performer": best_performer,
"worst_performer": worst_performer
}
@app.get("/api/analytics/all-shops/orders")
def get_all_shops_orders(db: Session = Depends(get_db)):
"""Get all orders from all active shops with shop info"""
# OPTIMIZATION: Query EtsyOrder directly with joinedload
orders = db.query(EtsyOrder).options(joinedload(EtsyOrder.fulfillment_records), joinedload(EtsyOrder.shop)).join(Shop).filter(Shop.is_active == True).all()
result = []
for o in orders:
status = "Pending"
fulfillment_cost = 0.0
match_info = []
if o.fulfillment_records:
status = "Shipped"
for rec in o.fulfillment_records:
fulfillment_cost += rec.total_cost
if rec.match_method:
match_info.append(rec.match_method)
result.append({
"id": o.order_id,
"internal_id": o.id,
"date": o.sale_date,
"customer": o.full_name,
"items": o.skus,
"total": o.order_total,
"status": status,
"fulfillment_cost": fulfillment_cost,
"match_info": match_info,
"shop_id": o.shop_id,
"shop_name": o.shop.name if o.shop else "Unknown"
})
# Sort by date descending
result.sort(key=lambda x: x["date"] if x["date"] else "", reverse=True)
return result
@app.get("/api/analytics/all-shops/profit")
def get_all_shops_profit(db: Session = Depends(get_db)):
"""Get profit data from all active shops with shop info"""
shops = db.query(Shop).filter(Shop.is_active == True).all()
result = []
for shop in shops:
orders = db.query(EtsyOrder).filter(EtsyOrder.shop_id == shop.id).all()
for order in orders:
fulfillment_recs = order.fulfillment_records
total_fulfillment_cost = sum(rec.total_cost for rec in fulfillment_recs)
net_profit = order.order_net - total_fulfillment_cost
status = "Unfulfilled"
match_info = []
if fulfillment_recs:
status = "Fulfilled"
methods = set()
for r in fulfillment_recs:
if r.match_method:
methods.add(r.match_method)
match_info = list(methods)
result.append({
"order_id": order.order_id,
"internal_id": order.id,
"sale_date": order.sale_date,
"customer": order.full_name,
"revenue": order.order_total,
"etsy_net": order.order_net,
"cost": total_fulfillment_cost,
"profit": net_profit,
"margin": (net_profit / order.order_total * 100) if order.order_total else 0,
"status": status,
"match_info": match_info,
"shop_id": shop.id,
"shop_name": shop.name
})
# Sort by date descending
result.sort(key=lambda x: str(x["sale_date"]) if x["sale_date"] else "", reverse=True)
return result
# ==========================================
# 14. MATCHING ALGORITHM (SHOP-ISOLATED)
# ==========================================
def run_matching_algorithm(db: Session, shop_id: int):
"""Run matching algorithm for a SPECIFIC shop only"""
# Get unmatched records for THIS shop only
unmatched_records = db.query(FulfillmentRecord).filter(
FulfillmentRecord.shop_id == shop_id,
FulfillmentRecord.is_matched == False
).order_by(FulfillmentRecord.id.asc()).all()
match_count = 0
# Get providers for THIS shop
providers = db.query(FulfillmentProvider).filter(
FulfillmentProvider.shop_id == shop_id
).all()
provider_map = {p.id: p for p in providers}
# Session tracking (prevents double-booking within this run)
session_matched_ids = set()
for record in unmatched_records:
provider = provider_map.get(record.provider_id)
if not provider:
continue
mapping = provider.mapping_config
raw = record.raw_data
def get_mapped_val(key):
col_name = mapping.get(key)
if col_name and col_name in raw:
return normalize_search_text(raw[col_name])
return ""
prov_id_txt = get_mapped_val("ref_id_col")
prov_fname_txt = get_mapped_val("first_name_col")
prov_lname_txt = get_mapped_val("last_name_col")
prov_addr_txt = get_mapped_val("address_col")
prov_city_txt = get_mapped_val("city_col")
prov_state_txt = get_mapped_val("state_col")
prov_name_context = f"{prov_fname_txt} {prov_lname_txt}"
prov_addr_context = f"{prov_addr_txt} {prov_city_txt} {prov_state_txt}"
candidate_orders = []
# 1. ID Search - WITHIN THIS SHOP ONLY
if prov_id_txt:
potential_ids = re.findall(r'\d{9,}', prov_id_txt)
for pid in potential_ids:
order = db.query(EtsyOrder).filter(
EtsyOrder.shop_id == shop_id, # CRITICAL: Shop isolation
EtsyOrder.order_id == pid
).first()
if order:
candidate_orders.append(order)
# 2. Name Search - WITHIN THIS SHOP ONLY
if len(prov_name_context) > 3:
tokens = prov_name_context.split()
for t in tokens:
if len(t) > 3:
candidates = db.query(EtsyOrder).filter(
EtsyOrder.shop_id == shop_id, # CRITICAL: Shop isolation
or_(
EtsyOrder.first_name.ilike(f"%{t}%"),
EtsyOrder.last_name.ilike(f"%{t}%")
)
).limit(50).all()
candidate_orders.extend(candidates)
if len(candidate_orders) > 100:
break
# Deduplicate
unique_candidates = {}
for c in candidate_orders:
unique_candidates[c.id] = c # Use internal ID
candidate_orders = list(unique_candidates.values())
# Exclude already matched in this session
candidate_orders = [
c for c in candidate_orders
if c.id not in session_matched_ids
]
# Sort: unfulfilled first, then by date
def sort_key(order):
is_fulfilled = 1 if order.fulfillment_records else 0
s_date = order.sale_date if order.sale_date else datetime.max.date()
return (is_fulfilled, s_date)
candidate_orders.sort(key=sort_key)
# Verification
final_match = None
match_details = []
for cand in candidate_orders:
verifications = []
etsy_id = str(cand.order_id)
etsy_fname = normalize_search_text(cand.first_name)
etsy_lname = normalize_search_text(cand.last_name)
etsy_addr1 = normalize_search_text(cand.street_1)
etsy_city = normalize_search_text(cand.ship_city)
etsy_state = normalize_search_text(cand.ship_state)
# A. ID Verification
if prov_id_txt and etsy_id in prov_id_txt:
verifications.append("ID")
# B. Name Verification
name_verified = False
if etsy_fname and etsy_lname:
if (etsy_fname in prov_name_context) and (etsy_lname in prov_name_context):
name_verified = True
verifications.append("Name")
# C. Address Verification
addr_verified = False
etsy_hn = extract_house_number(cand.street_1)
prov_hn = extract_house_number(prov_addr_context)
hn_match = False
if etsy_hn and prov_hn and etsy_hn == prov_hn:
hn_match = True
city_in_context = (len(etsy_city) > 2) and (etsy_city in prov_addr_context)
state_in_context = (len(etsy_state) >= 2) and (etsy_state in prov_addr_context)
street_name_only = etsy_addr1.replace(etsy_hn, "").strip() if etsy_hn else etsy_addr1
street_in_context = (len(street_name_only) > 3) and (street_name_only in prov_addr_context)
if hn_match:
if street_in_context or (city_in_context and state_in_context):
addr_verified = True
verifications.append("Address")
elif street_in_context and city_in_context:
addr_verified = True
verifications.append("Address(NoHN)")
# Final Decision
is_valid = False
if "ID" in str(verifications):
is_valid = True
elif name_verified and addr_verified:
is_valid = True
if is_valid:
final_match = cand
match_details = verifications
session_matched_ids.add(cand.id)
break
if final_match:
record.etsy_order_id = final_match.id # Use internal ID
record.is_matched = True
record.match_method = f"{', '.join(match_details)}"
match_count += 1
db.commit()
return match_count
# ==========================================
# 15. LEGACY ENDPOINTS (For backward compatibility)
# ==========================================
# These can be removed once frontend is fully migrated
@app.get("/api/orders")
def get_orders_legacy(db: Session = Depends(get_db)):
"""Legacy: Get all orders (returns empty if no default shop)"""
return {"warning": "Please use /api/shops/{shop_id}/orders", "data": []}
@app.get("/api/providers")
def get_providers_legacy(db: Session = Depends(get_db)):
"""Legacy: Get all providers"""
return {"warning": "Please use /api/shops/{shop_id}/providers", "data": []}
@app.post("/api/admin/reset-database")
def reset_database_legacy(db: Session = Depends(get_db)):
"""Legacy: Reset entire database"""
try:
db.query(FulfillmentRecord).delete()
db.query(EtsyOrder).delete()
db.query(ImportHistory).delete()
db.query(FulfillmentProvider).delete()
db.query(Shop).delete()
db.commit()
return {"status": "success", "message": "Database has been reset completely."}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Reset failed: {str(e)}")