|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import FileResponse |
|
|
from pydantic import BaseModel |
|
|
from typing import Any |
|
|
from .ga4_insights import router as ga4_router |
|
|
from .pdf_generation import generate_device_funnel_analysis_pdf |
|
|
from .config import settings |
|
|
from .duplicate_detector import DuplicateDetector |
|
|
from .merchant_alias import MerchantAliasResolver |
|
|
from .repositories import ( |
|
|
ExpenseRepository, |
|
|
MerchantAliasRepository, |
|
|
MergeSuggestionRepository, |
|
|
build_client, |
|
|
) |
|
|
import argparse |
|
|
import logging |
|
|
import sys |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="GA4 Insights Service", |
|
|
version="1.0.0", |
|
|
description="Fetch GA4 funnel data and generate insights using OpenAI." |
|
|
) |
|
|
|
|
|
|
|
|
app.include_router(ga4_router) |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health() -> dict[str, str]: |
|
|
return {"status": "ok"} |
|
|
|
|
|
|
|
|
class FunnelData(BaseModel): |
|
|
funnelData: list[dict[str, Any]] |
|
|
|
|
|
|
|
|
@app.post("/ga4/generate-device-funnel-pdf/") |
|
|
async def generate_device_funnel_pdf(request: FunnelData): |
|
|
try: |
|
|
|
|
|
pdf_file_path = generate_device_funnel_analysis_pdf(request.dict()) |
|
|
|
|
|
|
|
|
return FileResponse(pdf_file_path, media_type='application/pdf', filename="device_funnel_analysis_report.pdf") |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error generating PDF: {str(e)}") |
|
|
|
|
|
|
|
|
def configure_logging(verbose: bool) -> None: |
|
|
level = logging.DEBUG if verbose else logging.INFO |
|
|
logging.basicConfig( |
|
|
level=level, |
|
|
format="%(asctime)s %(levelname)s %(message)s", |
|
|
) |
|
|
|
|
|
|
|
|
def parse_args(argv: list[str] | None = None) -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Detect near-duplicate expenses and write merge suggestions.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--minutes", |
|
|
dest="minutes", |
|
|
type=int, |
|
|
default=settings.time_tolerance_minutes, |
|
|
help="Time tolerance in minutes for comparing expenses (default: %(default)s).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--amount-pct", |
|
|
dest="amount_pct", |
|
|
type=float, |
|
|
default=float(settings.amount_tolerance_pct), |
|
|
help="Amount tolerance percentage (default: %(default)s).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--lookback-hours", |
|
|
dest="lookback_hours", |
|
|
type=int, |
|
|
default=settings.default_lookback_hours, |
|
|
help="How far back to fetch expenses (default: %(default)s).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--limit", |
|
|
dest="limit", |
|
|
type=int, |
|
|
default=settings.max_batch_size, |
|
|
help="Maximum number of expenses to scan (default: %(default)s).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--verbose", |
|
|
action="store_true", |
|
|
help="Enable debug logging.", |
|
|
) |
|
|
return parser.parse_args(argv) |
|
|
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int: |
|
|
args = parse_args(argv) |
|
|
configure_logging(args.verbose) |
|
|
|
|
|
client = build_client() |
|
|
alias_repo = MerchantAliasRepository.from_client(client) |
|
|
alias_resolver = MerchantAliasResolver() |
|
|
alias_resolver.load_from_cursor(alias_repo.fetch_all()) |
|
|
|
|
|
expense_repo = ExpenseRepository.from_client(client) |
|
|
expenses = expense_repo.fetch_recent(args.lookback_hours, args.limit) |
|
|
|
|
|
if not expenses: |
|
|
logging.info("No expenses found for lookback window") |
|
|
return 0 |
|
|
|
|
|
suggestion_repo = MergeSuggestionRepository.from_client(client) |
|
|
detector = DuplicateDetector( |
|
|
alias_resolver=alias_resolver, |
|
|
suggestions_repo=suggestion_repo, |
|
|
amount_tolerance_pct=args.amount_pct, |
|
|
time_tolerance_minutes=args.minutes, |
|
|
) |
|
|
|
|
|
clusters = detector.find_clusters(expenses) |
|
|
if not clusters: |
|
|
logging.info("No duplicate clusters detected") |
|
|
return 0 |
|
|
|
|
|
suggestion_ids = detector.persist_suggestions(clusters) |
|
|
logging.info( |
|
|
"Finished writing %d suggestions. Example message: %s", |
|
|
len(suggestion_ids), |
|
|
"These seem similar. Would you like to merge them?", |
|
|
) |
|
|
return 0 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit(main()) |
|
|
|