from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse from pydantic import BaseModel from typing import Any from .ga4_insights import router as ga4_router # Import the GA4 router for funnel insights from .pdf_generation import generate_device_funnel_analysis_pdf # Import the PDF generation function from .config import settings from .duplicate_detector import DuplicateDetector from .merchant_alias import MerchantAliasResolver from .repositories import ( ExpenseRepository, MerchantAliasRepository, MergeSuggestionRepository, build_client, ) import argparse import logging import sys # FastAPI application instance app = FastAPI( title="GA4 Insights Service", version="1.0.0", description="Fetch GA4 funnel data and generate insights using OpenAI." ) # Include the GA4 router from the ga4_insights.py file app.include_router(ga4_router) # Health check endpoint @app.get("/health") async def health() -> dict[str, str]: return {"status": "ok"} # FunnelData model to receive funnel data in POST requests class FunnelData(BaseModel): funnelData: list[dict[str, Any]] # Endpoint for generating the device funnel PDF report @app.post("/ga4/generate-device-funnel-pdf/") async def generate_device_funnel_pdf(request: FunnelData): try: # Process and generate PDF based on the request data pdf_file_path = generate_device_funnel_analysis_pdf(request.dict()) # Return the generated PDF file as a response return FileResponse(pdf_file_path, media_type='application/pdf', filename="device_funnel_analysis_report.pdf") except Exception as e: raise HTTPException(status_code=500, detail=f"Error generating PDF: {str(e)}") # Function for configuring logging def configure_logging(verbose: bool) -> None: level = logging.DEBUG if verbose else logging.INFO logging.basicConfig( level=level, format="%(asctime)s %(levelname)s %(message)s", ) # Function to parse command-line arguments def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Detect near-duplicate expenses and write merge suggestions.", ) parser.add_argument( "--minutes", dest="minutes", type=int, default=settings.time_tolerance_minutes, help="Time tolerance in minutes for comparing expenses (default: %(default)s).", ) parser.add_argument( "--amount-pct", dest="amount_pct", type=float, default=float(settings.amount_tolerance_pct), help="Amount tolerance percentage (default: %(default)s).", ) parser.add_argument( "--lookback-hours", dest="lookback_hours", type=int, default=settings.default_lookback_hours, help="How far back to fetch expenses (default: %(default)s).", ) parser.add_argument( "--limit", dest="limit", type=int, default=settings.max_batch_size, help="Maximum number of expenses to scan (default: %(default)s).", ) parser.add_argument( "--verbose", action="store_true", help="Enable debug logging.", ) return parser.parse_args(argv) # Main function to execute the duplicate detection logic def main(argv: list[str] | None = None) -> int: args = parse_args(argv) configure_logging(args.verbose) client = build_client() alias_repo = MerchantAliasRepository.from_client(client) alias_resolver = MerchantAliasResolver() alias_resolver.load_from_cursor(alias_repo.fetch_all()) expense_repo = ExpenseRepository.from_client(client) expenses = expense_repo.fetch_recent(args.lookback_hours, args.limit) if not expenses: logging.info("No expenses found for lookback window") return 0 suggestion_repo = MergeSuggestionRepository.from_client(client) detector = DuplicateDetector( alias_resolver=alias_resolver, suggestions_repo=suggestion_repo, amount_tolerance_pct=args.amount_pct, time_tolerance_minutes=args.minutes, ) clusters = detector.find_clusters(expenses) if not clusters: logging.info("No duplicate clusters detected") return 0 suggestion_ids = detector.persist_suggestions(clusters) logging.info( "Finished writing %d suggestions. Example message: %s", len(suggestion_ids), "These seem similar. Would you like to merge them?", ) return 0 # If running this script directly, execute the main function if __name__ == "__main__": sys.exit(main())