Muhamed-Kheir commited on
Commit
121be4e
·
verified ·
1 Parent(s): 3d9381d

Upload 4 files

Browse files
Files changed (4) hide show
  1. README.txt +18 -0
  2. app.py +20 -0
  3. kmer_unique.py +281 -0
  4. requirements.txt +5 -0
README.txt ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Multi-group unique k-mer analysis
2
+
3
+ This tool compares multiple groups of FASTA sequences (one directory per group) and identifies **k-mers unique to each group** relative to all other groups. It outputs per-group TSV files, a summary Excel file, and two plots.
4
+
5
+ ## Install
6
+ pip install -r requirements.txt
7
+
8
+ ## Run
9
+ python kmer_unique.py \
10
+ --group-dirs path/to/groupA path/to/groupB path/to/groupC \
11
+ --k-min 15 --k-max 31 --min-freq 5 \
12
+ --outdir results
13
+
14
+ ## Outputs
15
+ - `results/unique_k{k}_{group}.tsv` : unique k-mers and counts
16
+ - `results/kmer_summary.xlsx` : summary table across k
17
+ - `results/unique_kmers_per_group.png`
18
+ - `results/total_freq_per_group.png`
app.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+
3
+ def show_instructions():
4
+ return (
5
+ "This Space hosts a command-line k-mer analysis script.\n\n"
6
+ "Run it locally like this:\n"
7
+ "python kmer_unique.py --group-dirs path/to/groupA path/to/groupB "
8
+ "--k-min 1 --k-max 50 --min-freq 5 --outdir kmer_results\n"
9
+ )
10
+
11
+ demo = gr.Interface(
12
+ fn=show_instructions,
13
+ inputs=[],
14
+ outputs="text",
15
+ title="Unique k-mer Analysis (CLI tool)",
16
+ description="This Space is a Gradio wrapper so the repository builds. The main tool is kmer_unique.py."
17
+ )
18
+
19
+ if __name__ == "__main__":
20
+ demo.launch()
kmer_unique.py ADDED
@@ -0,0 +1,281 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Multi-group unique k-mer analysis.
4
+
5
+ Given multiple directories (each directory = one group) containing FASTA files,
6
+ computes k-mers unique to each group (relative to all other groups), filters by
7
+ minimum frequency, writes per-group outputs, and saves a summary spreadsheet + plots.
8
+
9
+ Example:
10
+ python kmer_unique.py \
11
+ --group-dirs data/groupA data/groupB data/groupC \
12
+ --k-min 15 --k-max 31 --min-freq 5 \
13
+ --outdir results
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ import argparse
19
+ import os
20
+ from collections import Counter
21
+ from dataclasses import dataclass
22
+ from typing import Dict, Iterable, List, Sequence, Tuple
23
+
24
+ import matplotlib.pyplot as plt
25
+ import pandas as pd
26
+ from Bio import SeqIO
27
+
28
+
29
+ # -------------------------------
30
+ # Data structures
31
+ # -------------------------------
32
+
33
+ @dataclass(frozen=True)
34
+ class GroupData:
35
+ name: str
36
+ dirpath: str
37
+ fasta_files: Tuple[str, ...]
38
+
39
+
40
+ # -------------------------------
41
+ # K-mer utilities
42
+ # -------------------------------
43
+
44
+ def get_kmers(sequence: str, k: int) -> List[str]:
45
+ """Extract all k-mers from a sequence, excluding those with 'N'."""
46
+ s = str(sequence).upper()
47
+ if k <= 0 or len(s) < k:
48
+ return []
49
+ return [s[i:i + k] for i in range(len(s) - k + 1) if "N" not in s[i:i + k]]
50
+
51
+
52
+ def kmers_in_records(records: Iterable, k: int) -> set:
53
+ """Return the set of all k-mers present in a list of SeqRecords."""
54
+ out = set()
55
+ for rec in records:
56
+ out.update(get_kmers(str(rec.seq), k))
57
+ return out
58
+
59
+
60
+ def count_kmers_in_records(records: Iterable, k: int) -> Counter:
61
+ """Count k-mers across all sequences in a list of SeqRecords."""
62
+ kmers: List[str] = []
63
+ for rec in records:
64
+ kmers.extend(get_kmers(str(rec.seq), k))
65
+ return Counter(kmers)
66
+
67
+
68
+ # -------------------------------
69
+ # IO helpers
70
+ # -------------------------------
71
+
72
+ FASTA_EXTS = (".fas", ".fasta", ".fa", ".fna")
73
+
74
+
75
+ def find_fasta_files(directory: str) -> List[str]:
76
+ """Find all FASTA files in a directory."""
77
+ files = []
78
+ for fn in os.listdir(directory):
79
+ path = os.path.join(directory, fn)
80
+ if os.path.isfile(path) and fn.lower().endswith(FASTA_EXTS):
81
+ files.append(path)
82
+ files.sort()
83
+ return files
84
+
85
+
86
+ def parse_fasta_files(files: Sequence[str]) -> List:
87
+ """Parse FASTA files into BioPython SeqRecord objects."""
88
+ records = []
89
+ for fp in files:
90
+ try:
91
+ records.extend(list(SeqIO.parse(fp, "fasta")))
92
+ except Exception as e:
93
+ print(f"⚠️ Warning: could not read {fp}: {e}")
94
+ return records
95
+
96
+
97
+ def write_kmer_output(filepath: str, kmer_dict: Dict[str, int], total_freq: int) -> None:
98
+ """Write k-mers and their counts to a TSV file."""
99
+ with open(filepath, "w", encoding="utf-8") as f:
100
+ f.write(f"# Total frequency of unique k-mers: {total_freq}\n")
101
+ f.write("kmer\tcount\n")
102
+ for kmer, count in sorted(kmer_dict.items()):
103
+ f.write(f"{kmer}\t{count}\n")
104
+
105
+
106
+ # -------------------------------
107
+ # Plotting
108
+ # -------------------------------
109
+
110
+ def plot_kmer_summary(summary_df: pd.DataFrame, output_dir: str) -> None:
111
+ """Generate plots showing unique k-mers and total frequencies per group."""
112
+ groups = [c.replace(" unique k-mers", "") for c in summary_df.columns if c.endswith(" unique k-mers")]
113
+
114
+ # Plot 1: Number of unique k-mers
115
+ plt.figure(figsize=(10, 6))
116
+ for g in groups:
117
+ plt.plot(summary_df["k"], summary_df[f"{g} unique k-mers"], marker="o", label=g)
118
+ plt.xlabel("k-mer size (k)")
119
+ plt.ylabel("Number of unique k-mers")
120
+ plt.title("Unique k-mers per group across k sizes")
121
+ plt.legend()
122
+ plt.grid(True)
123
+ plt.tight_layout()
124
+ plt.savefig(os.path.join(output_dir, "unique_kmers_per_group.png"), dpi=300)
125
+ plt.close()
126
+
127
+ # Plot 2: Total frequency of unique k-mers
128
+ plt.figure(figsize=(10, 6))
129
+ for g in groups:
130
+ plt.plot(summary_df["k"], summary_df[f"{g} total frequency"], marker="s", label=g)
131
+ plt.xlabel("k-mer size (k)")
132
+ plt.ylabel("Total frequency of unique k-mers")
133
+ plt.title("Total frequency of unique k-mers per group across k sizes")
134
+ plt.legend()
135
+ plt.grid(True)
136
+ plt.tight_layout()
137
+ plt.savefig(os.path.join(output_dir, "total_freq_per_group.png"), dpi=300)
138
+ plt.close()
139
+
140
+ print(f"✅ Plots saved to: {output_dir}")
141
+
142
+
143
+ # -------------------------------
144
+ # Core logic
145
+ # -------------------------------
146
+
147
+ def compute_unique_kmers_per_group(
148
+ group_records: Dict[str, List],
149
+ k: int,
150
+ min_freq: int,
151
+ ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, int]]:
152
+ """
153
+ For each group, compute (unique kmers -> counts) after filtering by min_freq,
154
+ and total frequency of those unique kmers.
155
+ """
156
+ group_kmer_sets = {g: kmers_in_records(recs, k) for g, recs in group_records.items()}
157
+ group_counts = {g: count_kmers_in_records(recs, k) for g, recs in group_records.items()}
158
+
159
+ unique_counts: Dict[str, Dict[str, int]] = {}
160
+ total_freqs: Dict[str, int] = {}
161
+
162
+ group_names = list(group_records.keys())
163
+ for g in group_names:
164
+ unique = set(group_kmer_sets[g])
165
+ for other in group_names:
166
+ if other != g:
167
+ unique -= group_kmer_sets[other]
168
+
169
+ freq_unique = {km: group_counts[g][km] for km in unique if group_counts[g][km] >= min_freq}
170
+ unique_counts[g] = freq_unique
171
+ total_freqs[g] = sum(freq_unique.values())
172
+
173
+ return unique_counts, total_freqs
174
+
175
+
176
+ def run(
177
+ group_dirs: Sequence[str],
178
+ k_min: int,
179
+ k_max: int,
180
+ min_freq: int,
181
+ output_dir: str,
182
+ ) -> None:
183
+ os.makedirs(output_dir, exist_ok=True)
184
+
185
+ # Validate and collect groups
186
+ groups: List[GroupData] = []
187
+ for d in group_dirs:
188
+ if not os.path.exists(d):
189
+ raise FileNotFoundError(f"Group directory not found: {d}")
190
+ if not os.path.isdir(d):
191
+ raise NotADirectoryError(f"Not a directory: {d}")
192
+
193
+ name = os.path.basename(d.rstrip(os.sep))
194
+ fasta_files = tuple(find_fasta_files(d))
195
+ if not fasta_files:
196
+ raise FileNotFoundError(f"No FASTA files found in: {d}")
197
+ groups.append(GroupData(name=name, dirpath=d, fasta_files=fasta_files))
198
+
199
+ print("📂 Groups and file counts:")
200
+ for g in groups:
201
+ print(f" {g.name}: {len(g.fasta_files)} files")
202
+
203
+ summary_rows: List[dict] = []
204
+
205
+ for k in range(k_min, k_max + 1):
206
+ print(f"\n🔹 Processing k = {k}")
207
+
208
+ # Parse records once per k (same as your original logic)
209
+ group_records = {g.name: parse_fasta_files(g.fasta_files) for g in groups}
210
+
211
+ unique_counts, total_freqs = compute_unique_kmers_per_group(
212
+ group_records=group_records,
213
+ k=k,
214
+ min_freq=min_freq,
215
+ )
216
+
217
+ row = {"k": k}
218
+ for g in groups:
219
+ counts = unique_counts[g.name]
220
+ total_freq = total_freqs[g.name]
221
+ print(f" Group {g.name}: {len(counts)} unique {k}-mers | Total freq: {total_freq}")
222
+
223
+ out_path = os.path.join(output_dir, f"unique_k{k}_{g.name}.tsv")
224
+ write_kmer_output(out_path, counts, total_freq)
225
+
226
+ row[f"{g.name} unique k-mers"] = len(counts)
227
+ row[f"{g.name} total frequency"] = total_freq
228
+
229
+ summary_rows.append(row)
230
+
231
+ summary_df = pd.DataFrame(summary_rows)
232
+ summary_path = os.path.join(output_dir, "kmer_summary.xlsx")
233
+ summary_df.to_excel(summary_path, index=False)
234
+ print(f"\n✅ Summary saved to: {summary_path}")
235
+
236
+ plot_kmer_summary(summary_df, output_dir)
237
+
238
+
239
+ # -------------------------------
240
+ # CLI
241
+ # -------------------------------
242
+
243
+ def build_arg_parser() -> argparse.ArgumentParser:
244
+ p = argparse.ArgumentParser(
245
+ description="Compute group-specific unique k-mers from FASTA directories.",
246
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
247
+ )
248
+ p.add_argument(
249
+ "--group-dirs",
250
+ nargs="+",
251
+ required=True,
252
+ help="List of directories, one per group, containing FASTA files.",
253
+ )
254
+ p.add_argument("--k-min", type=int, default=1, help="Minimum k-mer size.")
255
+ p.add_argument("--k-max", type=int, default=50, help="Maximum k-mer size.")
256
+ p.add_argument("--min-freq", type=int, default=5, help="Minimum frequency threshold for unique k-mers.")
257
+ p.add_argument("--outdir", type=str, default="kmer_results", help="Output directory.")
258
+ return p
259
+
260
+
261
+ def main() -> None:
262
+ args = build_arg_parser().parse_args()
263
+
264
+ if args.k_min < 1:
265
+ raise ValueError("--k-min must be >= 1")
266
+ if args.k_max < args.k_min:
267
+ raise ValueError("--k-max must be >= --k-min")
268
+ if args.min_freq < 1:
269
+ raise ValueError("--min-freq must be >= 1")
270
+
271
+ run(
272
+ group_dirs=args.group_dirs,
273
+ k_min=args.k_min,
274
+ k_max=args.k_max,
275
+ min_freq=args.min_freq,
276
+ output_dir=args.outdir,
277
+ )
278
+
279
+
280
+ if __name__ == "__main__":
281
+ main()
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ gradio
2
+ biopython
3
+ pandas
4
+ matplotlib
5
+ openpyxl