shortner / main.py
triflix's picture
Create main.py
28f6650 verified
import sqlite3
import random
import string
from typing import Optional
from contextlib import contextmanager
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, HttpUrl
import validators
app = FastAPI()
templates = Jinja2Templates(directory="templates")
BASE_URL = "https://triflix-shortner.hf.space"
DB_FILE = "shortener.db"
class URLRequest(BaseModel):
original_url: str
@contextmanager
def get_db_connection():
conn = sqlite3.connect(DB_FILE)
try:
yield conn
finally:
conn.close()
def init_db():
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_url TEXT NOT NULL,
short_code TEXT UNIQUE NOT NULL,
short_url TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
conn.commit()
def generate_short_code(length: int = 6) -> str:
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def store_link(original_url: str) -> str:
with get_db_connection() as conn:
cursor = conn.cursor()
# Check if URL already exists
cursor.execute("SELECT short_url FROM links WHERE original_url = ?", (original_url,))
existing = cursor.fetchone()
if existing:
return existing[0]
# Generate unique short code
while True:
short_code = generate_short_code()
short_url = f"{BASE_URL}/{short_code}"
cursor.execute("SELECT id FROM links WHERE short_code = ?", (short_code,))
if not cursor.fetchone():
break
# Store new shortened URL
cursor.execute(
"INSERT INTO links (original_url, short_code, short_url) VALUES (?, ?, ?)",
(original_url, short_code, short_url)
)
conn.commit()
return short_url
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/shorten/")
async def shorten_url(url_request: URLRequest):
if not url_request.original_url:
raise HTTPException(status_code=400, detail="URL is required")
if not url_request.original_url.startswith(('http://', 'https://')):
raise HTTPException(status_code=400, detail="URL must start with http:// or https://")
try:
short_url = store_link(url_request.original_url)
return {"original_url": url_request.original_url, "shortened_url": short_url}
except Exception as e:
raise HTTPException(status_code=500, detail="Error creating shortened URL")
@app.get("/{short_code}")
async def redirect_to_url(short_code: str):
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT original_url FROM links WHERE short_code = ?", (short_code,))
result = cursor.fetchone()
if not result:
raise HTTPException(status_code=404, detail="Short URL not found")
return RedirectResponse(url=result[0])
# Initialize database on startup
init_db()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)