| import os |
|
|
| |
| import sqlalchemy |
| |
| from fastapi import FastAPI, HTTPException |
| from sqlalchemy import text |
|
|
| |
|
|
| |
| DB_HOST = os.getenv("DB_HOST") |
| DB_NAME = os.getenv("DB_NAME") |
| DB_USER = os.getenv("DB_USER") |
| DB_PASS = os.getenv("DB_PASS") |
|
|
| DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:5432/{DB_NAME}" |
|
|
| engine = sqlalchemy.create_engine(DATABASE_URL) |
|
|
| app = FastAPI() |
|
|
|
|
| |
| @app.get("/total_revenue/{ano}") |
| async def get_total_revenue(ano: int): |
| with engine.connect() as conn: |
| stmt = text( |
| f"SELECT * FROM total_revenue WHERE year = {ano}" |
| ) |
| result = conn.execute(stmt).fetchone() |
| if result: |
| return {"total_revenue": result[1]} |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/total_sales/{ano}") |
| async def get_total_sales(ano: int): |
| with engine.connect() as conn: |
| stmt = text(f"SELECT * FROM total_sales WHERE year = {ano}") |
| result = conn.execute(stmt).fetchone() |
| if result: |
| return {"total_sales": result[1]} |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/average_sale_value/{ano}") |
| async def get_average_sale_value(ano: int): |
| with engine.connect() as conn: |
| stmt = text(f"SELECT * FROM average_sale_value WHERE year = {ano}") |
| result = conn.execute(stmt).fetchone() |
| if result: |
| return {"average_sale_value": result[1]} |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/average_products_per_sale/{ano}") |
| async def get_average_products_per_sale(ano: int): |
| with engine.connect() as conn: |
| stmt = text(f"SELECT * FROM average_products_per_sale WHERE year = {ano}") |
| result = conn.execute(stmt).fetchone() |
| if result: |
| return {"average_products_per_sale": result[1]} |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/average_ticket/{ano}") |
| async def get_average_ticket(ano: int): |
| with engine.connect() as conn: |
| stmt = text(f"SELECT * FROM average_ticket WHERE year = {ano}") |
| result = conn.execute(stmt).fetchone() |
| if result: |
| return {"average_ticket": result[1]} |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/best_selling_product_value/{ano}") |
| async def get_best_selling_product_value(ano: int): |
| with engine.connect() as conn: |
| stmt = text(f"SELECT * FROM best_selling_product_value WHERE year = {ano}") |
| result = conn.execute(stmt).fetchone() |
| if result: |
| return {"produto": result[0], "total_product_revenue": result[2]} |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/best_selling_product_quantity/{ano}") |
| async def get_best_selling_product_quantity(ano: int): |
| with engine.connect() as conn: |
| stmt = text(f"SELECT * FROM best_selling_product_quantity WHERE year = {ano}") |
| result = conn.execute(stmt).fetchone() |
| if result: |
| return {"produto": result[0], "total_product_quantity": result[2]} |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/product_revenue/{ano}") |
| async def get_product_revenue(ano: int): |
| with engine.connect() as conn: |
| results = conn.execute(text(f"SELECT * FROM product_revenue WHERE year = {ano}")).fetchall() |
| if results: |
| return [{"produto": row[0], "product_revenue": row[2]} for row in results] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/top3_salesperson_value/{ano}") |
| async def get_top3_salesperson_value(ano: int): |
| with engine.connect() as conn: |
| stmt = text(f"SELECT * FROM revenue_per_salesperson WHERE year = {ano} Order by revenue_per_salesperson Desc LIMIT 3") |
| results = conn.execute(stmt).fetchall() |
| if results: |
| return [{"email": row[1], "salesperson_total_revenue": row[2]} for row in results] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/top3_salesperson_quantity/{ano}") |
| async def get_top3_salesperson_quantity(ano: int): |
| with engine.connect() as conn: |
| stmt = text(f"SELECT * FROM sales_per_salesperson WHERE year = {ano} Order by sales_per_salesperson Desc LIMIT 3") |
| results = conn.execute(stmt).fetchall() |
| if results: |
| return [{"email": row[0], "salesperson_total_sales": row[2]} for row in results] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/sales_per_salesperson/{ano}") |
| async def get_sales_per_salesperson(ano: int): |
| with engine.connect() as conn: |
| results = conn.execute(text(f"SELECT * FROM sales_per_salesperson WHERE year = {ano}")).fetchall() |
| if results: |
| return [ |
| {"email": row[0], "sales_per_salesperson": row[2]} for row in results |
| ] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/revenue_per_salesperson/{ano}") |
| async def get_revenue_per_salesperson(ano: int): |
| with engine.connect() as conn: |
| results = conn.execute( |
| text(f"SELECT * FROM revenue_per_salesperson WHERE year = {ano}") |
| ).fetchall() |
| if results: |
| return [ |
| {"email": row[1], "revenue_per_salesperson": row[2]} for row in results |
| ] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/sales_per_day/{ano}") |
| async def get_sales_per_day(ano: int): |
| with engine.connect() as conn: |
| results = conn.execute(text(f"SELECT * FROM sales_per_day WHERE year = {ano}")).fetchall() |
| if results: |
| return [ |
| {"sales_date": str(row[0]), "sales_per_day": row[2]} for row in results |
| ] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/sales_per_month/{ano}") |
| async def get_sales_per_month(ano: int): |
| with engine.connect() as conn: |
| results = conn.execute(text(f"SELECT * FROM sales_per_month WHERE year = {ano}")).fetchall() |
| if results: |
| return [ |
| {"sales_year": row[0], "sales_month": row[1], "sales_per_month": row[2]} |
| for row in results |
| ] |
| |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/sales_per_year") |
| async def get_sales_per_year(): |
| with engine.connect() as conn: |
| results = conn.execute(text(f"SELECT * FROM sales_per_year")).fetchall() |
| if results: |
| return [{"sales_year": row[0], "sales_per_year": row[1]} for row in results] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/revenue_per_day/{ano}") |
| async def get_revenue_per_day(ano: int): |
| with engine.connect() as conn: |
| results = conn.execute(text(f"SELECT * FROM revenue_per_day WHERE year = {ano}")).fetchall() |
| if results: |
| return [ |
| {"revenue_date": str(row[1]), "revenue_per_day": row[2]} |
| for row in results |
| ] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/revenue_per_month/{ano}") |
| async def get_revenue_per_month(ano: int): |
| with engine.connect() as conn: |
| results = conn.execute(text(f"SELECT * FROM revenue_per_month WHERE year = {ano}")).fetchall() |
| if results: |
| return [ |
| {"revenue_year": row[0], "revenue_month": row[1], "revenue_per_month": row[2]} |
| for row in results |
| ] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| |
| @app.get("/revenue_per_year") |
| async def get_revenue_per_year(): |
| with engine.connect() as conn: |
| results = conn.execute(text(f"SELECT * FROM revenue_per_year")).fetchall() |
| if results: |
| return [ |
| {"revenue_year": row[0], "revenue_per_year": row[1]} for row in results |
| ] |
| else: |
| raise HTTPException(status_code=404, detail="KPI não encontrada") |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| uvicorn.run("crm_api:app", host="0.0.0.0", port=5000, reload=True) |