|
|
import sqlite3 |
|
|
import random |
|
|
import string |
|
|
from typing import Optional |
|
|
from contextlib import contextmanager |
|
|
from fastapi import FastAPI, HTTPException, Request |
|
|
from fastapi.responses import HTMLResponse, RedirectResponse |
|
|
from fastapi.templating import Jinja2Templates |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from pydantic import BaseModel, HttpUrl |
|
|
import validators |
|
|
|
|
|
app = FastAPI() |
|
|
templates = Jinja2Templates(directory="templates") |
|
|
|
|
|
BASE_URL = "https://triflix-shortner.hf.space" |
|
|
DB_FILE = "shortener.db" |
|
|
|
|
|
class URLRequest(BaseModel): |
|
|
original_url: str |
|
|
|
|
|
@contextmanager |
|
|
def get_db_connection(): |
|
|
conn = sqlite3.connect(DB_FILE) |
|
|
try: |
|
|
yield conn |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def init_db(): |
|
|
with get_db_connection() as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS links ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
original_url TEXT NOT NULL, |
|
|
short_code TEXT UNIQUE NOT NULL, |
|
|
short_url TEXT UNIQUE NOT NULL, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
)''') |
|
|
conn.commit() |
|
|
|
|
|
def generate_short_code(length: int = 6) -> str: |
|
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) |
|
|
|
|
|
def store_link(original_url: str) -> str: |
|
|
with get_db_connection() as conn: |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute("SELECT short_url FROM links WHERE original_url = ?", (original_url,)) |
|
|
existing = cursor.fetchone() |
|
|
if existing: |
|
|
return existing[0] |
|
|
|
|
|
|
|
|
while True: |
|
|
short_code = generate_short_code() |
|
|
short_url = f"{BASE_URL}/{short_code}" |
|
|
cursor.execute("SELECT id FROM links WHERE short_code = ?", (short_code,)) |
|
|
if not cursor.fetchone(): |
|
|
break |
|
|
|
|
|
|
|
|
cursor.execute( |
|
|
"INSERT INTO links (original_url, short_code, short_url) VALUES (?, ?, ?)", |
|
|
(original_url, short_code, short_url) |
|
|
) |
|
|
conn.commit() |
|
|
return short_url |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def read_root(request: Request): |
|
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
@app.post("/shorten/") |
|
|
async def shorten_url(url_request: URLRequest): |
|
|
if not url_request.original_url: |
|
|
raise HTTPException(status_code=400, detail="URL is required") |
|
|
|
|
|
if not url_request.original_url.startswith(('http://', 'https://')): |
|
|
raise HTTPException(status_code=400, detail="URL must start with http:// or https://") |
|
|
|
|
|
try: |
|
|
short_url = store_link(url_request.original_url) |
|
|
return {"original_url": url_request.original_url, "shortened_url": short_url} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail="Error creating shortened URL") |
|
|
|
|
|
@app.get("/{short_code}") |
|
|
async def redirect_to_url(short_code: str): |
|
|
with get_db_connection() as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute("SELECT original_url FROM links WHERE short_code = ?", (short_code,)) |
|
|
result = cursor.fetchone() |
|
|
|
|
|
if not result: |
|
|
raise HTTPException(status_code=404, detail="Short URL not found") |
|
|
|
|
|
return RedirectResponse(url=result[0]) |
|
|
|
|
|
|
|
|
init_db() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |