Klarity / app.py
HAKORADev's picture
Upload app.py with huggingface_hub
135078a verified
"""
Klarity - AI Image & Video Restoration (HF Space)
https://github.com/HAKORADev/Klarity
v19 - Fix: Compare viewer now calculates zoom to fit the larger image
within the container, always showing 100% as the default view
regardless of upscale factor (2x, 4x, etc).
- Fix: Process button no longer stays disabled after processing.
Removed the persistent guard interval that was re-disabling the
button after Gradio re-enabled it. Fixed _empty_results() tuple
length to match output count.
- Fix: Video players properly cleaned up when switching to image
mode, and vice versa.
- Fix: Video player now uses fixed height with object-fit:contain
for consistent display regardless of video resolution.
"""
import os
import sys
import time
import shutil
import threading
import traceback
import logging
import uuid
from huggingface_hub import HfApi, hf_hub_download
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
import gradio as gr
SRC = os.path.join(os.path.dirname(os.path.abspath(__file__)), "src")
sys.path.insert(0, SRC)
MODELS_DIR = "models"
OUTPUT_DIR = "output"
UPLOAD_DIR = "uploads"
for d in (MODELS_DIR, OUTPUT_DIR, UPLOAD_DIR):
os.makedirs(d, exist_ok=True)
GRADIO_SERVE_DIR = "/tmp/klarity_serve"
os.makedirs(GRADIO_SERVE_DIR, exist_ok=True)
# ---------------------------------------------------------------------------
# Served Meter Persistence
# ---------------------------------------------------------------------------
STATS_REPO = "HAKORADev/Klarity-Served-Meter"
HF_TOKEN = os.environ.get("HF_TOKEN")
api = HfApi(token=HF_TOKEN)
_local_count = 0
_count_lock = threading.Lock()
def get_served_count():
global _local_count
with _count_lock:
try:
path = hf_hub_download(repo_id=STATS_REPO, filename="count.txt", repo_type="dataset", token=HF_TOKEN, force_download=True)
with open(path, "r") as f:
_local_count = int(f.read().strip())
except Exception as e:
log.warning("Could not fetch remote count: %s", e)
return _local_count
def increment_served_count():
global _local_count
with _count_lock:
_local_count += 1
def _sync():
try:
with open("count.txt", "w") as f:
f.write(str(_local_count))
api.upload_file(path_or_fileobj="count.txt", path_in_repo="count.txt", repo_id=STATS_REPO, repo_type="dataset", commit_message=f"Update served count to {_local_count}")
except Exception as e:
log.error("Could not sync count to Hub: %s", e)
threading.Thread(target=_sync, daemon=True).start()
return _local_count
def format_served_badge(count=None):
if count is None:
count = _local_count
return f"""<div style="display: flex; justify-content: flex-end; margin-bottom: -40px;">
<div style="background: rgba(76, 175, 80, 0.1); border: 1px solid #4CAF50; color: #4CAF50;
padding: 4px 12px; border-radius: 20px; font-weight: 600; font-size: 0.85rem; font-family: monospace;">
Served: {count:,}
</div>
</div>"""
# ---------------------------------------------------------------------------
# Gradio Setup
# ---------------------------------------------------------------------------
# Tell Gradio to allow serving files from our directories.
# Without this, Gradio's security check (_check_allowed) blocks
# serving files from local paths that aren't in its allowed list.
# /tmp/klarity_serve/ works by default (child of tempfile.gettempdir()),
# but uploads/ and output/ do not.
gr.set_static_paths([UPLOAD_DIR, OUTPUT_DIR])
from huggingface_hub import snapshot_download
from processing import (
ModelManager, process_file, is_image, is_video, video_info,
MODE_SETTINGS,
)
MODEL_REPO = "HAKORADev/Klarity-models"
mm = None
REQUIRED_FILES = [
"denoise-lite.pth", "deblur-lite.pth",
"upscale-lite.pth", "framegen-lite.pkl",
]
def _models_ready():
return all(os.path.isfile(os.path.join(MODELS_DIR, f)) for f in REQUIRED_FILES)
def _init_models():
global mm
os.makedirs(MODELS_DIR, exist_ok=True)
if not _models_ready():
log.info("Downloading models from %s ...", MODEL_REPO)
snapshot_download(repo_id=MODEL_REPO, allow_patterns=["*.pth", "*.pkl"], local_dir=MODELS_DIR)
else:
log.info("Models found in %s", MODELS_DIR)
mm = ModelManager(MODELS_DIR)
log.info("ModelManager created")
for name, loader in [
("Denoise", mm.load_denoise),
("Deblur", mm.load_deblur),
("Upscale", mm.load_upscale),
("FrameGen", mm.load_framegen),
]:
try:
loader()
log.info("%s model loaded OK", name)
except Exception as e:
log.error("Failed to load %s model: %s", name, e)
traceback.print_exc()
# ---------------------------------------------------------------------------
# File helpers
# ---------------------------------------------------------------------------
def _resolve_file(file_val):
if file_val is None:
return None
if hasattr(file_val, "path"):
p = file_val.path
elif isinstance(file_val, dict):
p = file_val.get("path")
elif isinstance(file_val, str):
p = file_val
elif hasattr(file_val, "name"):
p = file_val.name
else:
p = str(file_val)
if p and os.path.isfile(p):
return p
return None
def _persist_upload(file_val):
path = _resolve_file(file_val)
if path is None:
return None
name = os.path.basename(path)
dest = os.path.join(UPLOAD_DIR, f"{uuid.uuid4().hex[:8]}_{name}")
shutil.copy2(path, dest)
log.info("Persisted: %s -> %s", name, dest)
return dest
def _serve_file(src_path):
if src_path is None:
return None
ext = os.path.splitext(src_path)[1]
name = uuid.uuid4().hex[:12] + ext
dst = os.path.join(GRADIO_SERVE_DIR, name)
shutil.copy2(src_path, dst)
log.info("Served: %s -> %s", os.path.basename(src_path), name)
return dst
# ---------------------------------------------------------------------------
# Job management
# ---------------------------------------------------------------------------
_jobs = {}
_jobs_lock = threading.Lock()
_job_seq = 0
def new_job():
global _job_seq
with _jobs_lock:
_job_seq += 1
jid = _job_seq
_jobs[jid] = threading.Event()
return jid
def cancel_job(jid):
if jid is None:
return
with _jobs_lock:
ev = _jobs.get(jid)
if ev and not ev.is_set():
ev.set()
log.info("Job %s cancelled", jid)
def cancel_all_jobs():
with _jobs_lock:
for jid, ev in _jobs.items():
if not ev.is_set():
ev.set()
log.info("Job %s cancelled (cancel-all)", jid)
def finish_job(jid):
with _jobs_lock:
_jobs.pop(jid, None)
def active_count():
with _jobs_lock:
return len(_jobs)
def schedule_cleanup(files, delay=300):
"""Delete files after delay seconds in a daemon thread."""
if not files:
return
def _cleanup():
time.sleep(delay)
for f in files:
try:
if f and os.path.isfile(f):
os.remove(f)
log.info("Auto-cleaned: %s", os.path.basename(f))
except Exception as e:
log.warning("Cleanup error for %s: %s", f, e)
threading.Thread(target=_cleanup, daemon=True).start()
# ---------------------------------------------------------------------------
# Mode definitions
# ---------------------------------------------------------------------------
IMAGE_MODES = [
"Denoise",
"Deblur",
"Upscale",
"Clean (Denoise + Deblur)",
"Full (Denoise + Deblur + Upscale)",
]
VIDEO_MODES = IMAGE_MODES + [
"Frame Generation",
"Clean + Frame Gen",
"Full + Frame Gen",
]
LABEL_TO_KEY = {
"Denoise": "denoise",
"Deblur": "deblur",
"Upscale": "upscale",
"Clean (Denoise + Deblur)": "clean",
"Full (Denoise + Deblur + Upscale)": "full",
"Frame Generation": "frame-gen",
"Clean + Frame Gen": "clean-frame-gen",
"Full + Frame Gen": "full-frame-gen",
}
FG_MODES = {"frame-gen", "clean-frame-gen", "full-frame-gen"}
def _empty_results():
"""Return blank updates for all 6 visual outputs.
Total = 6 items so that (*_empty_results(), btn_update) = 7 items
matching the process_btn.click output count."""
return (
gr.update(value=None, visible=False), # orig_img
gr.update(value=None, visible=False), # orig_vid
gr.update(value=None, visible=False), # res_img
gr.update(value=None, visible=False), # res_vid
None, # result_file
)
# ---------------------------------------------------------------------------
# UI callbacks
# ---------------------------------------------------------------------------
def on_file_change(file_val):
path = _resolve_file(file_val)
btn = gr.update(interactive=False) if path is None else gr.update(interactive=True)
if path is None:
return (
gr.update(choices=VIDEO_MODES, value=VIDEO_MODES[0]),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False, label="Target FPS", value=None),
{"fps": 30.0, "is_video": False},
"",
btn,
)
if is_video(path):
try:
orig_fps, frame_count, w, h = video_info(path)
except Exception:
orig_fps, frame_count, w, h = 30.0, 0, 0, 0
info = f"Video detected | {w}x{h} | {frame_count} frames | {orig_fps:.2f} FPS"
return (
gr.update(choices=VIDEO_MODES, value=VIDEO_MODES[0]),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False, label="Target FPS (blank = auto max)", value=None),
{"fps": orig_fps, "is_video": True},
info,
btn,
)
else:
info = "Image detected"
return (
gr.update(choices=IMAGE_MODES, value=IMAGE_MODES[0]),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False, label="Target FPS", value=None),
{"fps": 30.0, "is_video": False},
info,
btn,
)
def on_mode_change(mode, vinfo):
key = LABEL_TO_KEY.get(mode, "denoise")
settings = MODE_SETTINGS.get(key, {})
show_upscale = settings.get("upscale", False)
show_multi = settings.get("multi", False)
show_fps = settings.get("fps", False) and vinfo.get("is_video", False)
if show_fps:
orig = vinfo.get("fps", 30.0)
fps_label = f"Target FPS [{orig:.1f} / {orig * 2:.1f}] (blank = max)"
fps_update = gr.update(
visible=True, label=fps_label,
minimum=orig, maximum=orig * 2, value=None,
)
else:
fps_update = gr.update(visible=False, value=None)
return (
gr.update(visible=show_upscale),
gr.update(visible=show_multi),
fps_update,
)
def on_multi_change(multi, vinfo):
orig = vinfo.get("fps", 30.0)
m = int(str(multi).replace("x", "")) if multi else 2
max_fps = orig * m
return gr.update(
label=f"Target FPS [{orig:.1f} / {max_fps:.1f}] (blank = max)",
minimum=orig,
maximum=max_fps,
)
def process(file_val, mode, upscale, multi, fps_val, vinfo):
log.info("process: file=%s mode=%s", type(file_val).__name__ if file_val else "None", mode)
if file_val is None:
badge = format_served_badge(get_served_count())
return ("Please upload a file first.", *_empty_results(), gr.update(value="Process", interactive=True), badge)
path = _persist_upload(file_val)
if path is None:
badge = format_served_badge(get_served_count())
return ("Could not read the uploaded file. Try re-uploading.", *_empty_results(), gr.update(value="Process", interactive=True), badge)
key = LABEL_TO_KEY.get(mode)
if key is None:
badge = format_served_badge(get_served_count())
return ("Invalid mode selected.", *_empty_results(), gr.update(value="Process", interactive=True), badge)
if is_image(path) and key in FG_MODES:
badge = format_served_badge(get_served_count())
return ("Frame generation modes require a video, not an image.", *_empty_results(), gr.update(value="Process", interactive=True), badge)
try:
uf = int(str(upscale).replace("x", "")) if upscale else 4
except (ValueError, TypeError):
uf = 4
try:
mu = int(str(multi).replace("x", "")) if multi else 2
except (ValueError, TypeError):
mu = 2
try:
fp = float(fps_val) if fps_val else None
except (ValueError, TypeError):
fp = None
log.info("Processing: %s mode=%s uf=%s mu=%s fp=%s", os.path.basename(path), key, uf, mu, fp)
session_out = os.path.join(OUTPUT_DIR, str(int(time.time())))
os.makedirs(session_out, exist_ok=True)
jid = new_job()
cancel_ev = _jobs[jid]
try:
t0 = time.time()
result = process_file(
path, key, mm, session_out,
upscale_factor=uf, multi=mu, fps=fp, cb=None,
cancel_event=cancel_ev,
)
elapsed = time.time() - t0
elapsed_str = f"{elapsed:.1f}s" if elapsed < 60 else f"{elapsed/60:.1f}min"
log.info("Done in %s", elapsed_str)
# Increment served meter
new_count = increment_served_count()
new_badge = format_served_badge(new_count)
before_p = result["before"]
after_p = result["after"]
serve_before = _serve_file(before_p)
serve_after = _serve_file(after_p)
is_vid = result["type"] == "video"
log.info("Result: type=%s before=%s (%d bytes) after=%s (%d bytes)",
result["type"],
before_p, os.path.getsize(before_p) if before_p else 0,
after_p, os.path.getsize(after_p) if after_p else 0)
status = f"Done in {elapsed_str}."
if is_vid:
status += " Video result ready for download."
to_clean = [f for f in [serve_after, serve_before] if f]
schedule_cleanup(to_clean, delay=300)
return (
status,
gr.update(value=serve_before if not is_vid else None, visible=not is_vid),
gr.update(value=serve_before if is_vid else None, visible=is_vid),
gr.update(value=serve_after if not is_vid else None, visible=not is_vid),
gr.update(value=serve_after if is_vid else None, visible=is_vid),
serve_after,
gr.update(value="Process", interactive=True),
new_badge,
)
except Exception as exc:
traceback.print_exc()
msg = str(exc)
badge = format_served_badge(get_served_count())
if "cancelled" in msg.lower():
return ("Processing cancelled.", *_empty_results(), gr.update(value="Process", interactive=True), badge)
return (f"Processing failed: {msg}", *_empty_results(), gr.update(value="Process", interactive=True), badge)
finally:
finish_job(jid)
def on_session_end():
cancel_all_jobs()
# ---------------------------------------------------------------------------
# Theme
# ---------------------------------------------------------------------------
from gradio.themes.utils import colors
theme = gr.themes.Soft(primary_hue=colors.emerald).set(
body_background_fill="#0d0d0d",
body_background_fill_dark="#0d0d0d",
background_fill_primary="#161616",
background_fill_primary_dark="#161616",
background_fill_secondary="#1a1a1a",
background_fill_secondary_dark="#1a1a1a",
border_color_primary="#2e2e2e",
border_color_primary_dark="#2e2e2e",
body_text_color="#e5e5e5",
body_text_color_dark="#e5e5e5",
body_text_color_subdued="#a0a0a0",
body_text_color_subdued_dark="#a0a0a0",
button_primary_background_fill="#4CAF50",
button_primary_background_fill_dark="#4CAF50",
button_primary_background_fill_hover="#45a049",
button_primary_text_color="#ffffff",
button_primary_text_color_dark="#ffffff",
block_border_color="#2a2a2a",
block_border_color_dark="#2a2a2a",
block_label_background_fill="#161616",
block_label_background_fill_dark="#161616",
block_title_text_color="#e5e5e5",
block_title_text_color_dark="#e5e5e5",
)
CUSTOM_CSS = """
.gradio-container{max-width:1000px!important;margin:0 auto;}
footer{border-top:1px solid #2a2a2a!important;margin-top:2rem!important;padding:1.2rem!important;}
footer a{color:#4CAF50!important;}
footer a:hover{text-decoration:underline!important;}
#file-info{color:#a0a0a0;font-size:.88rem;margin-bottom:.4rem;}
.side-label{color:#e5e5e5;font-size:.95rem;font-weight:600;text-align:center;margin:0 0 .3rem;}
/* Compare viewer */
#compare-viewer video,#compare-viewer img{pointer-events:none;display:block;width:100%;height:100%;object-fit:contain;}
#compare-slider-handle{transition:transform .15s ease;}
#compare-slider:hover #compare-slider-handle{transform:translate(-50%,-50%) scale(1.15);}
#compare-reset-btn:hover{background:#2a2a2a!important;}
.cv-sb{transition:background .15s;}
.cv-sb:hover{background:#333!important;}
"""
# ---------------------------------------------------------------------------
# Init models
# ---------------------------------------------------------------------------
log.info("Initializing models...")
_init_models()
log.info("App ready.")
# ---------------------------------------------------------------------------
# JavaScript
# ---------------------------------------------------------------------------
# JS that runs on page load: sets up compare viewer, video players, slider
def _on_load():
return ()
LOAD_JS = r"""
() => {
// ===== COMPARE VIEWER - For images only =====
var CV = {
zoom: 1, panX: 0, panY: 0,
dSlider: false, dPan: false, lx: 0, ly: 0,
init: function(bSrc, aSrc) {
this.zoom = 1;
this.panX = 0; this.panY = 0;
var bL = document.getElementById('compare-before-layer');
var aL = document.getElementById('compare-after-layer');
if (!bL || !aL) return;
bL.innerHTML = ''; aL.innerHTML = '';
var mk = function(s) { var im = document.createElement('img'); im.src = s; return im; };
bL.appendChild(mk(bSrc));
aL.appendChild(mk(aSrc));
var sl = document.getElementById('compare-slider');
if (sl) sl.style.left = '50%';
bL.style.clipPath = 'inset(0 50% 0 0)';
document.getElementById('compare-wrapper').style.display = '';
this._tf(); this._zi();
},
_tf: function() {
var t = 'translate('+this.panX+'px,'+this.panY+'px) scale('+this.zoom+')';
document.querySelectorAll('#compare-before-layer>*,#compare-after-layer>*').forEach(function(el){el.style.transform=t;el.style.transformOrigin='0 0';});
},
resetView: function() {
this.zoom = 1;
this.panX = 0; this.panY = 0;
this._tf(); this._zi();
},
_zi: function() {
var el = document.getElementById('compare-zoom-info');
if (el) {
var pct = Math.round(this.zoom * 100);
el.textContent = pct + '%';
}
}
};
window._klarityCV = CV;
// Slider interaction
(function() {
var sl=document.getElementById('compare-slider');
var vw=document.getElementById('compare-viewer');
var bL=document.getElementById('compare-before-layer');
if (!sl||!vw||!bL) return;
var mv=function(e) {
if (!CV.dSlider) return;
var r=vw.getBoundingClientRect();
var cx=e.touches?e.touches[0].clientX:e.clientX;
var x=Math.max(0,Math.min(cx-r.left,r.width));
var p=(x/r.width)*100;
sl.style.left=p+'%';
bL.style.clipPath='inset(0 '+(100-p)+'% 0 0)';
};
var up=function(){CV.dSlider=false;document.removeEventListener('mousemove',mv);document.removeEventListener('mouseup',up);document.removeEventListener('touchmove',mv);document.removeEventListener('touchend',up);};
sl.addEventListener('mousedown',function(e){e.preventDefault();CV.dSlider=true;document.addEventListener('mousemove',mv);document.addEventListener('mouseup',up);});
sl.addEventListener('touchstart',function(){CV.dSlider=true;document.addEventListener('touchmove',mv);document.addEventListener('touchend',up);},{passive:true});
})();
// Zoom with Ctrl+scroll
(function() {
var vw=document.getElementById('compare-viewer');
if (!vw) return;
vw.addEventListener('wheel',function(e){
if (!e.ctrlKey) return; e.preventDefault();
var r=vw.getBoundingClientRect();
var mx=e.clientX-r.left,my=e.clientY-r.top;
var old=CV.zoom;
CV.zoom=Math.max(0.1,Math.min(20,CV.zoom*(e.deltaY>0?0.9:1.1)));
var ratio=CV.zoom/old;
CV.panX=mx-(mx-CV.panX)*ratio;
CV.panY=my-(my-CV.panY)*ratio;
CV._tf();CV._zi();
},{passive:false});
})();
// Pan when zoomed
(function() {
var vw=document.getElementById('compare-viewer');
if (!vw) return;
vw.addEventListener('mousedown',function(e){
if (CV.dSlider||e.target.closest('#compare-slider')||CV.zoom<=0.1) return;
CV.dPan=true;CV.lx=e.clientX;CV.ly=e.clientY;
var pmv=function(e2){if(!CV.dPan)return;CV.panX+=e2.clientX-CV.lx;CV.panY+=e2.clientY-CV.ly;CV.lx=e2.clientX;CV.ly=e2.clientY;CV._tf();};
var pup=function(){CV.dPan=false;document.removeEventListener('mousemove',pmv);document.removeEventListener('mouseup',pup);};
document.addEventListener('mousemove',pmv);
document.addEventListener('mouseup',pup);
});
})();
var rb=document.getElementById('compare-reset-btn');
if (rb) rb.addEventListener('click',function(){CV.resetView();});
// Result detection for images
var _lb=null,_la=null;
var chk=function(){
var oi=document.querySelector('#orig-img img[src]');
var ri=document.querySelector('#res-img img[src]');
var bs=oi?oi.src:null;
var as2=ri?ri.src:null;
if (bs&&as2&&(bs!==_lb||as2!==_la)){_lb=bs;_la=as2;CV.init(bs,as2);}
if (!bs&&!as2&&(_lb||_la)){_lb=_la=null;var w=document.getElementById('compare-wrapper');if(w)w.style.display='none';}
};
['orig-img','res-img'].forEach(function(id){
var el=document.getElementById(id);
if (el) new MutationObserver(chk).observe(el,{childList:true,subtree:true,attributes:true});
});
setInterval(chk,600);
return [];
}
"""
# JS that runs when a new file is uploaded.
# Re-enables the process button as a safety net (handles edge cases
# where the button might still be disabled from a previous session).
FILE_CHANGE_JS = r"""
(file) => {
document.querySelectorAll('button').forEach(function(b) {
if ((b.textContent || '').includes('Process')) {
b.disabled = false;
b.style.opacity = '';
b.style.pointerEvents = '';
}
});
return [file];
}
"""
# Compare viewer HTML (for images)
COMPARE_HTML = """<div id="compare-wrapper" style="display:none;margin-top:1rem;">
<div style="display:flex;justify-content:space-between;align-items:center;margin-bottom:6px;">
<span style="color:#4CAF50;font-weight:600;font-size:.9rem;">Compare</span>
<div style="display:flex;gap:6px;align-items:center;">
<span id="compare-zoom-info" style="color:#888;font-size:.78rem;margin-right:6px;">100%</span>
<span id="compare-reset-btn" style="display:inline-block;padding:2px 10px;border:1px solid #2e2e2e;border-radius:4px;color:#e5e5e5;font-size:.78rem;cursor:pointer;background:#1a1a1a;">Reset View</span>
</div>
</div>
<div id="compare-viewer" style="position:relative;width:100%;height:400px;max-height:80vh;overflow:hidden;border-radius:8px;border:1px solid #2e2e2e;background:#0a0a0a;cursor:crosshair;-webkit-user-select:none;user-select:none;">
<div id="compare-before-label" style="position:absolute;top:8px;left:8px;z-index:20;background:rgba(0,0,0,0.7);color:#fff;padding:2px 10px;border-radius:4px;font-size:.78rem;font-weight:600;pointer-events:none;">Before</div>
<div id="compare-after-label" style="position:absolute;top:8px;right:8px;z-index:20;background:rgba(0,0,0,0.7);color:#fff;padding:2px 10px;border-radius:4px;font-size:.78rem;font-weight:600;pointer-events:none;">After</div>
<div id="compare-after-layer" style="position:absolute;top:0;left:0;width:100%;height:100%;z-index:1;overflow:hidden;"></div>
<div id="compare-before-layer" style="position:absolute;top:0;left:0;width:100%;height:100%;z-index:5;overflow:hidden;clip-path:inset(0 50% 0 0);"></div>
<div id="compare-slider" style="position:absolute;top:0;left:50%;width:3px;height:100%;background:#4CAF50;z-index:10;cursor:ew-resize;transform:translateX(-50%);pointer-events:auto;">
<div id="compare-slider-handle" style="position:absolute;top:50%;left:50%;transform:translate(-50%,-50%);width:28px;height:28px;border-radius:50%;background:#4CAF50;border:2px solid #fff;box-shadow:0 2px 8px rgba(0,0,0,0.5);"></div>
</div>
</div>
<div style="text-align:center;color:#555;font-size:.75rem;margin-top:3px;">Ctrl + Scroll to zoom &middot; Drag image to pan when zoomed</div>
</div>"""
# ---------------------------------------------------------------------------
# UI
# ---------------------------------------------------------------------------
with gr.Blocks(title="Klarity") as demo:
# Fetch real count immediately on initialization
initial_count = get_served_count()
served_badge = gr.HTML(format_served_badge(initial_count))
# Auto-refresh counter every 5 minutes
timer = gr.Timer(300, active=True)
timer.tick(lambda: format_served_badge(get_served_count()), outputs=[served_badge])
gr.Markdown(
"""
<div style="text-align:center;padding:0.6rem 0 0.2rem;">
<span style="font-size:2.6rem;font-weight:900;color:#4CAF50;">K</span>
</div>
<h1 style="text-align:center;margin:0;font-size:1.8rem;font-weight:700;letter-spacing:-0.02em;">
Klarity
</h1>
<p style="text-align:center;margin:0 0 .6rem;color:#a0a0a0;font-size:.95rem;line-height:1.5;">
AI-Powered Image Restoration &middot; Denoise, Deblur &amp; Upscale
</p>
<p style="text-align:center;margin:0 0 .4rem;color:#666;font-size:.82rem;">
Lite mode &middot; For the full experience with <b>Video Support</b>, Heavy models and GPU support, see the
<a href="https://github.com/HAKORADev/Klarity" target="_blank" rel="noopener" style="color:#4CAF50;">desktop app</a>.
</p>
"""
)
gr.Markdown("---")
upload = gr.File(label="Upload Image", file_types=["image"])
file_info_md = gr.Markdown("", elem_id="file-info")
mode_dd = gr.Dropdown(
IMAGE_MODES,
label="Processing Mode",
value=IMAGE_MODES[0],
interactive=True,
)
upscale_rd = gr.Radio(["2x", "4x"], label="Upscale Factor", value="4x", visible=False)
multi_rd = gr.Radio(["2x", "4x"], label="Frame Multiplier", value="2x", visible=False)
fps_num = gr.Number(label="Target FPS", value=None, precision=1, visible=False)
video_info_state = gr.State({"fps": 30.0, "is_video": False})
process_btn = gr.Button("Process", variant="primary", size="lg", interactive=False, elem_id="process-btn")
status_txt = gr.Textbox(label="Status", interactive=False, lines=2, max_lines=6)
# --- Side-by-side rendering ---
with gr.Row(equal_height=True):
with gr.Column(scale=1):
gr.Markdown('<p class="side-label">Original</p>', elem_classes="side-label")
orig_img = gr.Image(label="", show_label=False, visible=False, elem_id="orig-img")
orig_vid = gr.Video(label="", show_label=False, visible=False, elem_id="orig-vid")
with gr.Column(scale=1):
gr.Markdown('<p class="side-label">Result</p>', elem_classes="side-label")
res_img = gr.Image(label="", show_label=False, visible=False, elem_id="res-img")
res_vid = gr.Video(label="", show_label=False, visible=False, elem_id="res-vid")
result_file = gr.File(label="Download Result")
# Compare viewer for images
gr.HTML(COMPARE_HTML)
gr.Markdown(
"""
---
<div style="text-align:center;padding:1rem 0;">
<span style="color:#888;">Full experience with Heavy models, GPU support &amp; GUI:</span><br>
<a href="https://github.com/HAKORADev/Klarity" target="_blank" rel="noopener">
<strong style="color:#4CAF50;">github.com/HAKORADev/Klarity</strong>
</a>
</div>
"""
)
# --- Wire up callbacks ---
upload.change(
on_file_change,
js=FILE_CHANGE_JS,
inputs=[upload],
outputs=[mode_dd, upscale_rd, multi_rd, fps_num, video_info_state, file_info_md, process_btn],
)
mode_dd.change(
on_mode_change,
inputs=[mode_dd, video_info_state],
outputs=[upscale_rd, multi_rd, fps_num],
)
multi_rd.change(
on_multi_change,
inputs=[multi_rd, video_info_state],
outputs=[fps_num],
)
process_btn.click(
lambda: gr.update(interactive=False, value="Processing..."),
outputs=[process_btn],
).then(
process,
inputs=[upload, mode_dd, upscale_rd, multi_rd, fps_num, video_info_state],
outputs=[status_txt, orig_img, orig_vid, res_img, res_vid, result_file, process_btn, served_badge],
).then(
lambda: gr.update(interactive=True, value="Process"),
outputs=[process_btn],
)
# --- Cancel on page refresh / close ---
demo.unload(on_session_end)
# --- JS: compare viewer, video sync on page load ---
demo.load(fn=lambda: format_served_badge(get_served_count()), outputs=[served_badge], js=LOAD_JS)
demo.launch(show_error=True, server_name="0.0.0.0", css=CUSTOM_CSS, theme=theme)