Muhamed-Kheir commited on
Commit
af876ca
·
verified ·
1 Parent(s): af845d2

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +66 -36
app.py CHANGED
@@ -7,7 +7,9 @@ import zipfile
7
  import gradio as gr
8
 
9
  # Ensure repo root is importable on Spaces
10
- sys.path.append(os.path.dirname(__file__))
 
 
11
 
12
  import kmer_predict # must be in repo root
13
 
@@ -17,6 +19,7 @@ FASTA_EXTS = (".fa", ".fasta", ".fas", ".fna")
17
 
18
 
19
  def _zip_dir(folder: str, zip_path: str) -> None:
 
20
  with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z:
21
  for root, _, files in os.walk(folder):
22
  for fn in files:
@@ -24,27 +27,67 @@ def _zip_dir(folder: str, zip_path: str) -> None:
24
  rel = os.path.relpath(full, folder)
25
  z.write(full, rel)
26
 
 
27
  def _safe_extract_zip(zip_path: str, dst_dir: str) -> None:
28
- """Safely extract FASTA files from ZIP (prevents zip-slip)."""
29
  with zipfile.ZipFile(zip_path, "r") as z:
30
  for member in z.infolist():
31
  if member.is_dir():
32
  continue
33
 
 
34
  target = os.path.normpath(os.path.join(dst_dir, member.filename))
35
  if not target.startswith(os.path.abspath(dst_dir) + os.sep):
36
  continue
37
 
38
- if not member.filename.lower().endswith((".fa", ".fasta", ".fas", ".fna")):
 
39
  continue
40
 
41
  os.makedirs(os.path.dirname(target), exist_ok=True)
42
  with z.open(member) as src, open(target, "wb") as out:
43
  shutil.copyfileobj(src, out)
44
 
45
- def run_prediction(unknown_files, kmer_zip, seqtype, mode, identity, coverage, fdr):
46
- if not unknown_files:
47
- raise gr.Error("Please upload at least one unknown FASTA file.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  if not kmer_zip:
49
  raise gr.Error("Please upload the k-mer results ZIP from Space 1.")
50
 
@@ -58,36 +101,23 @@ def run_prediction(unknown_files, kmer_zip, seqtype, mode, identity, coverage, f
58
  os.makedirs(unknown_dir, exist_ok=True)
59
  os.makedirs(outdir, exist_ok=True)
60
 
61
- # Copy unknown FASTAs
62
- for idx, f in enumerate(unknown_files, start=1):
63
- src = getattr(f, "path", None) or getattr(f, "name", None) or str(f)
64
- orig = (
65
- getattr(f, "orig_name", None)
66
- or getattr(f, "filename", None)
67
- or os.path.basename(src)
68
- )
69
-
70
- lower = orig.lower()
71
-
72
- # ZIP → extract FASTA files
73
- if lower.endswith(".zip") or src.lower().endswith(".zip"):
74
- _safe_extract_zip(src, unknown_dir)
75
- continue
76
 
77
- # FASTA file
78
- if not lower.endswith((".fa", ".fasta", ".fas", ".fna")):
79
- continue
80
-
81
- dst = os.path.join(unknown_dir, os.path.basename(orig))
82
- shutil.copy(src, dst)
83
-
84
-
85
- shutil.copy(src, os.path.join(unknown_dir, os.path.basename(orig)))
86
 
87
  # K-mer ZIP path (ZIP-only)
88
  kmer_zip_path = getattr(kmer_zip, "path", None) or getattr(kmer_zip, "name", None) or str(kmer_zip)
89
  if not str(kmer_zip_path).lower().endswith(".zip"):
90
- raise gr.Error("K-mer input must be a .zip file from Space 1.")
91
 
92
  # Run prediction
93
  kmer_predict.predict(
@@ -114,13 +144,13 @@ def run_prediction(unknown_files, kmer_zip, seqtype, mode, identity, coverage, f
114
  with gr.Blocks() as demo:
115
  gr.Markdown("# K-mer Sequence Predictor")
116
  gr.Markdown(
117
- "Upload unknown FASTA sequences and the **kmer_results.zip** produced by the Unique k-mer Space."
118
  )
119
 
120
- unknown_files = gr.File(
121
- label="Unknown FASTA files",
122
  file_count="multiple",
123
- file_types=[".fa", ".fasta", ".fas", ".fna"],
124
  )
125
 
126
  kmer_zip = gr.File(
@@ -146,7 +176,7 @@ with gr.Blocks() as demo:
146
 
147
  run_btn.click(
148
  fn=run_prediction,
149
- inputs=[unknown_files, kmer_zip, seqtype, mode, identity, coverage, fdr],
150
  outputs=[out_plot, out_csv, out_zip],
151
  )
152
 
 
7
  import gradio as gr
8
 
9
  # Ensure repo root is importable on Spaces
10
+ ROOT = os.path.dirname(__file__)
11
+ if ROOT not in sys.path:
12
+ sys.path.insert(0, ROOT)
13
 
14
  import kmer_predict # must be in repo root
15
 
 
19
 
20
 
21
  def _zip_dir(folder: str, zip_path: str) -> None:
22
+ """Zip the contents of folder into zip_path."""
23
  with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z:
24
  for root, _, files in os.walk(folder):
25
  for fn in files:
 
27
  rel = os.path.relpath(full, folder)
28
  z.write(full, rel)
29
 
30
+
31
  def _safe_extract_zip(zip_path: str, dst_dir: str) -> None:
32
+ """Safely extract only FASTA files from ZIP (prevents zip-slip)."""
33
  with zipfile.ZipFile(zip_path, "r") as z:
34
  for member in z.infolist():
35
  if member.is_dir():
36
  continue
37
 
38
+ # Zip-slip protection
39
  target = os.path.normpath(os.path.join(dst_dir, member.filename))
40
  if not target.startswith(os.path.abspath(dst_dir) + os.sep):
41
  continue
42
 
43
+ # Only FASTA-like files
44
+ if not member.filename.lower().endswith(FASTA_EXTS):
45
  continue
46
 
47
  os.makedirs(os.path.dirname(target), exist_ok=True)
48
  with z.open(member) as src, open(target, "wb") as out:
49
  shutil.copyfileobj(src, out)
50
 
51
+
52
+ def _ingest_unknown_uploads(unknown_uploads, unknown_dir: str) -> None:
53
+ """
54
+ Accept unknown sequences as:
55
+ - FASTA files, and/or
56
+ - ZIP files containing FASTA files.
57
+ Copies/extracts into unknown_dir.
58
+ """
59
+ os.makedirs(unknown_dir, exist_ok=True)
60
+
61
+ if not unknown_uploads:
62
+ return
63
+
64
+ for idx, f in enumerate(unknown_uploads, start=1):
65
+ src = getattr(f, "path", None) or getattr(f, "name", None) or str(f)
66
+ orig = (
67
+ getattr(f, "orig_name", None)
68
+ or getattr(f, "filename", None)
69
+ or os.path.basename(src)
70
+ )
71
+ lower = str(orig).lower()
72
+
73
+ # ZIP → extract FASTA files
74
+ if lower.endswith(".zip") or str(src).lower().endswith(".zip"):
75
+ _safe_extract_zip(src, unknown_dir)
76
+ continue
77
+
78
+ # FASTA → copy
79
+ if lower.endswith(FASTA_EXTS):
80
+ dst_name = os.path.basename(orig)
81
+ else:
82
+ # If Gradio provides a temp name without extension, keep it readable
83
+ dst_name = f"unknown_{idx}.fasta"
84
+
85
+ shutil.copy(src, os.path.join(unknown_dir, dst_name))
86
+
87
+
88
+ def run_prediction(unknown_uploads, kmer_zip, seqtype, mode, identity, coverage, fdr):
89
+ if not unknown_uploads:
90
+ raise gr.Error("Please upload unknown FASTA files or a ZIP containing FASTA files.")
91
  if not kmer_zip:
92
  raise gr.Error("Please upload the k-mer results ZIP from Space 1.")
93
 
 
101
  os.makedirs(unknown_dir, exist_ok=True)
102
  os.makedirs(outdir, exist_ok=True)
103
 
104
+ # Ingest unknown uploads (FASTA and/or ZIP)
105
+ _ingest_unknown_uploads(unknown_uploads, unknown_dir)
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
+ # Ensure we actually got sequences
108
+ # (Lightweight check: presence of at least one fasta-like file)
109
+ found_any = any(
110
+ fn.lower().endswith(FASTA_EXTS)
111
+ for _, _, files in os.walk(unknown_dir)
112
+ for fn in files
113
+ )
114
+ if not found_any:
115
+ raise gr.Error("No FASTA files were found after processing your uploads. Please check your ZIP contents.")
116
 
117
  # K-mer ZIP path (ZIP-only)
118
  kmer_zip_path = getattr(kmer_zip, "path", None) or getattr(kmer_zip, "name", None) or str(kmer_zip)
119
  if not str(kmer_zip_path).lower().endswith(".zip"):
120
+ raise gr.Error("K-mer input must be a .zip file produced by Space 1.")
121
 
122
  # Run prediction
123
  kmer_predict.predict(
 
144
  with gr.Blocks() as demo:
145
  gr.Markdown("# K-mer Sequence Predictor")
146
  gr.Markdown(
147
+ "Upload **unknown sequences** (FASTA files or ZIP containing FASTA) and the **kmer_results.zip** from Space 1."
148
  )
149
 
150
+ unknown_uploads = gr.File(
151
+ label="Unknown sequences (FASTA files or ZIP containing FASTA)",
152
  file_count="multiple",
153
+ file_types=[".fa", ".fasta", ".fas", ".fna", ".zip"],
154
  )
155
 
156
  kmer_zip = gr.File(
 
176
 
177
  run_btn.click(
178
  fn=run_prediction,
179
+ inputs=[unknown_uploads, kmer_zip, seqtype, mode, identity, coverage, fdr],
180
  outputs=[out_plot, out_csv, out_zip],
181
  )
182