File size: 3,585 Bytes
8662a01
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2483df
8662a01
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9471668
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import sqlite3
import random
import string
from typing import Optional
from contextlib import contextmanager
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, HttpUrl
import validators

app = FastAPI()
templates = Jinja2Templates(directory="templates")

BASE_URL = "https://arevedudaa-shortner.hf.space"
DB_FILE = "shortener.db"

class URLRequest(BaseModel):
    original_url: str

@contextmanager
def get_db_connection():
    conn = sqlite3.connect(DB_FILE)
    try:
        yield conn
    finally:
        conn.close()

def init_db():
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''CREATE TABLE IF NOT EXISTS links (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        original_url TEXT NOT NULL,
                        short_code TEXT UNIQUE NOT NULL,
                        short_url TEXT UNIQUE NOT NULL,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                      )''')
        conn.commit()

def generate_short_code(length: int = 6) -> str:
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

def store_link(original_url: str) -> str:
    with get_db_connection() as conn:
        cursor = conn.cursor()
        
        # Check if URL already exists
        cursor.execute("SELECT short_url FROM links WHERE original_url = ?", (original_url,))
        existing = cursor.fetchone()
        if existing:
            return existing[0]
        
        # Generate unique short code
        while True:
            short_code = generate_short_code()
            short_url = f"{BASE_URL}/{short_code}"
            cursor.execute("SELECT id FROM links WHERE short_code = ?", (short_code,))
            if not cursor.fetchone():
                break
        
        # Store new shortened URL
        cursor.execute(
            "INSERT INTO links (original_url, short_code, short_url) VALUES (?, ?, ?)",
            (original_url, short_code, short_url)
        )
        conn.commit()
        return short_url

@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

@app.post("/shorten/")
async def shorten_url(url_request: URLRequest):
    if not url_request.original_url:
        raise HTTPException(status_code=400, detail="URL is required")
    
    if not url_request.original_url.startswith(('http://', 'https://')):
        raise HTTPException(status_code=400, detail="URL must start with http:// or https://")
    
    try:
        short_url = store_link(url_request.original_url)
        return {"original_url": url_request.original_url, "shortened_url": short_url}
    except Exception as e:
        raise HTTPException(status_code=500, detail="Error creating shortened URL")

@app.get("/{short_code}")
async def redirect_to_url(short_code: str):
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT original_url FROM links WHERE short_code = ?", (short_code,))
        result = cursor.fetchone()
        
    if not result:
        raise HTTPException(status_code=404, detail="Short URL not found")
    
    return RedirectResponse(url=result[0])

# Initialize database on startup
init_db()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)