Enhanced Codon Optimization Tool - Key Code Sections
class plTrainHarness(pl.LightningModule):
"""
PyTorch Lightning training harness for ENCOT with Augmented-Lagrangian Method (ALM) GC control.
This class implements the training loop for fine-tuning CodonTransformer on E. coli sequences
with precise GC content control using an Augmented-Lagrangian Method. The ALM approach allows
the model to learn codon preferences while maintaining GC content within a target range (e.g., 52%).
Key features:
- Masked language modeling (MLM) loss for codon prediction
- ALM-based GC content constraint enforcement
- Curriculum learning: warm-up epochs before enforcing GC constraints
- Adaptive penalty coefficient (rho) adjustment based on constraint violation progress
The ALM method minimizes: L = L_MLM + ฮปยท(GC - ฮผ) + (ฯ/2)(GC - ฮผ)ยฒ
where ฮป is the Lagrangian multiplier and ฯ is the penalty coefficient.
"""
def __init__(self, model, learning_rate, warmup_fraction, gc_penalty_weight, tokenizer,
gc_target=0.52, use_lagrangian=False, lagrangian_rho=10.0, curriculum_epochs=3,
alm_tolerance=1e-5, alm_dual_tolerance=1e-5, alm_penalty_update_factor=10.0,
alm_initial_penalty_factor=20.0, alm_tolerance_update_factor=0.1,
alm_rel_penalty_increase_threshold=0.1, alm_max_penalty=1e6, alm_min_penalty=1e-6):
super().__init__()
self.model = model
self.learning_rate = learning_rate
self.warmup_fraction = warmup_fraction
self.gc_penalty_weight = gc_penalty_weight
self.tokenizer = tokenizer
# Augmented-Lagrangian GC Control parameters
self.gc_target = gc_target
self.use_lagrangian = use_lagrangian
self.lagrangian_rho = lagrangian_rho
self.curriculum_epochs = curriculum_epochs
# Enhanced ALM parameters (inspired by alpaqa research)
self.alm_tolerance = alm_tolerance
self.alm_dual_tolerance = alm_dual_tolerance
self.alm_penalty_update_factor = alm_penalty_update_factor
self.alm_initial_penalty_factor = alm_initial_penalty_factor
self.alm_tolerance_update_factor = alm_tolerance_update_factor
self.alm_rel_penalty_increase_threshold = alm_rel_penalty_increase_threshold
self.alm_max_penalty = alm_max_penalty
self.alm_min_penalty = alm_min_penalty
# Initialize Lagrangian multiplier as buffer (persists across checkpoints)
self.register_buffer("lambda_gc", torch.tensor(0.0))
# Adaptive penalty coefficient (rho) - starts as parameter, becomes adaptive
self.register_buffer("rho_adaptive", torch.tensor(self.lagrangian_rho))
# Step counter for periodic lambda updates
self.register_buffer("step_counter", torch.tensor(0))
# ALM convergence tracking
self.register_buffer("previous_constraint_violation", torch.tensor(float('inf')))
def training_step(self, batch, batch_idx):
outputs = self.model(**batch)
mlm_loss = outputs.loss
# Enhanced Lagrangian-based GC penalty
if self.use_lagrangian and self.current_epoch >= self.curriculum_epochs:
# Compute GC content from logits
logits = outputs.logits
predicted_tokens = torch.argmax(logits, dim=-1)
# Calculate GC content per sequence
gc_content_batch = []
for seq_tokens in predicted_tokens:
valid_tokens = seq_tokens[seq_tokens >= 26]
if len(valid_tokens) == 0:
gc_content_batch.append(self.gc_target)
continue
gc_counts = sum(1 for token in valid_tokens if token.item() in G_indices + C_indices)
gc_content = gc_counts / len(valid_tokens)
gc_content_batch.append(gc_content)
gc_content_mean = sum(gc_content_batch) / len(gc_content_batch)
# Compute GC constraint violation
gc_constraint = gc_content_mean - self.gc_target
# Augmented Lagrangian loss term
lagrangian_loss = (
self.lambda_gc * gc_constraint +
(self.rho_adaptive / 2) * (gc_constraint ** 2)
)
total_loss = mlm_loss + lagrangian_loss
# Log metrics
self.log("train/mlm_loss", mlm_loss, prog_bar=True)
self.log("train/gc_constraint", gc_constraint, prog_bar=True)
self.log("train/lagrangian_loss", lagrangian_loss, prog_bar=False)
self.log("train/lambda_gc", self.lambda_gc, prog_bar=False)
self.log("train/rho", self.rho_adaptive, prog_bar=False)
self.log("train/gc_content", gc_content_mean, prog_bar=True)
# Update Lagrangian multiplier periodically
self.step_counter += 1
if self.step_counter % 20 == 0:
self._update_alm_parameters(gc_constraint)
else:
total_loss = mlm_loss
self.log("train/mlm_loss", mlm_loss, prog_bar=True)
self.log("train/total_loss", total_loss, prog_bar=True)
return total_loss
def _update_alm_parameters(self, gc_constraint):
"""
Update Lagrangian multiplier and penalty coefficient according to ALM rules.
This implements the adaptive penalty update strategy:
- If constraint violation is decreasing sufficiently, update lambda and keep rho
- If constraint violation is not improving, increase rho (penalty coefficient)
"""
constraint_violation = abs(gc_constraint.item())
# Check if we're making sufficient progress
relative_improvement = (
(self.previous_constraint_violation - constraint_violation) /
max(self.previous_constraint_violation, 1e-8)
)
if constraint_violation <= self.alm_tolerance:
# Constraint satisfied - update lambda, optionally reduce rho
self.lambda_gc = self.lambda_gc + self.rho_adaptive * gc_constraint
# Could reduce rho here if desired, but keeping it stable works well
elif relative_improvement < self.alm_rel_penalty_increase_threshold:
# Not making enough progress - increase penalty
self.rho_adaptive = torch.clamp(
self.rho_adaptive * self.alm_penalty_update_factor,
min=self.alm_min_penalty,
max=self.alm_max_penalty
)
# Also update lambda
self.lambda_gc = self.lambda_gc + self.rho_adaptive * gc_constraint
else:
# Making good progress - just update lambda
self.lambda_gc = self.lambda_gc + self.rho_adaptive * gc_constraint
# Update tracking
self.previous_constraint_violation = torch.tensor(constraint_violation)
def predict_dna_sequence(
protein: str,
organism: Union[int, str],
device: torch.device,
tokenizer: Union[str, PreTrainedTokenizerFast] = None,
model: Union[str, torch.nn.Module] = None,
attention_type: str = "original_full",
deterministic: bool = True,
temperature: float = 0.2,
top_p: float = 0.95,
num_sequences: int = 1,
match_protein: bool = False,
use_constrained_search: bool = False,
gc_bounds: Tuple[float, float] = (0.30, 0.70),
beam_size: int = 5,
length_penalty: float = 1.0,
diversity_penalty: float = 0.0,
) -> Union[DNASequencePrediction, List[DNASequencePrediction]]:
"""
Predict the DNA sequence(s) for a given protein using the ENCOT model.
This function takes a protein sequence and an organism (as ID or name) as input
and returns the predicted DNA sequence(s) using the ENCOT model. It can use
either provided tokenizer and model objects or load them from specified paths.
Args:
protein (str): The input protein sequence for which to predict the DNA sequence.
organism (Union[int, str]): Either the ID of the organism or its name (e.g.,
"Escherichia coli general").
device (torch.device): The device (CPU or GPU) to run the model on.
use_constrained_search (bool, optional): Enable constrained beam search with GC bounds.
gc_bounds (Tuple[float, float], optional): GC content bounds (min, max) for
constrained search. Defaults to (0.30, 0.70).
beam_size (int, optional): Beam size for beam search. Defaults to 5.
Returns:
Union[DNASequencePrediction, List[DNASequencePrediction]]: Predicted DNA sequence(s)
with associated metrics.
"""
def get_CSI_weights(sequences: List[str]) -> Dict[str, float]:
"""
Calculate the Codon Similarity Index (CSI) weights for a list of DNA sequences.
Args:
sequences (List[str]): List of DNA sequences.
Returns:
dict: The CSI weights.
"""
return relative_adaptiveness(sequences=sequences)
def get_CSI_value(dna: str, weights: Dict[str, float]) -> float:
"""
Calculate the Codon Similarity Index (CSI) for a DNA sequence.
Args:
dna (str): The DNA sequence.
weights (dict): The CSI weights from get_CSI_weights.
Returns:
float: The CSI value.
"""
return CAI(dna, weights)
def get_ecoli_tai_weights():
"""
Returns pre-calculated tAI weights for E. coli K-12 MG1655.
These weights are based on tRNA gene copy numbers and wobble base pairing rules.
"""
return {
'TTT': 0.58, 'TTC': 0.42, 'TTA': 0.13, 'TTG': 0.13,
'TCT': 0.15, 'TCC': 0.15, 'TCA': 0.12, 'TCG': 0.15,
# ... full codon table
}
def calculate_tAI(sequence: str, tai_weights: Dict[str, float]) -> float:
"""
Calculate the tRNA Adaptation Index (tAI) for a DNA sequence.
Args:
sequence (str): DNA sequence (must be divisible by 3)
tai_weights (Dict[str, float]): tAI weights for each codon
Returns:
float: Geometric mean of tAI weights for all codons in the sequence
"""
if len(sequence) % 3 != 0:
raise ValueError("Sequence length must be divisible by 3")
codons = [sequence[i:i+3].upper() for i in range(0, len(sequence), 3)]
weights = [tai_weights.get(codon, 0.5) for codon in codons if codon not in ['TAA', 'TAG', 'TGA']]
if not weights:
return 0.0
# Geometric mean
product = 1.0
for w in weights:
product *= w
return product ** (1.0 / len(weights))
# ENCOT ALM Training Configuration
# This configuration reproduces the main training setup from the paper
# using the Augmented-Lagrangian Method (ALM) for GC content control.
model:
base_model: "adibvafa/CodonTransformer-base"
tokenizer: "adibvafa/CodonTransformer"
data:
dataset_dir: "data"
# Expected files: finetune_set.json (created by preprocess_data.py)
training:
batch_size: 6
max_epochs: 15
learning_rate: 5e-5
warmup_fraction: 0.1
num_workers: 5
accumulate_grad_batches: 1
num_gpus: 4
save_every_n_steps: 512
seed: 123
log_every_n_steps: 20
checkpoint:
checkpoint_dir: "models/alm-enhanced-training"
checkpoint_filename: "balanced_alm_finetune.ckpt"
# Augmented-Lagrangian Method (ALM) for GC content control
alm:
enabled: true
gc_target: 0.52 # Target GC content for E. coli (52%)
curriculum_epochs: 3 # Warm-up epochs before enforcing GC constraint
# ALM penalty parameters
initial_penalty_factor: 20.0
penalty_update_factor: 10.0
max_penalty: 1e6
min_penalty: 1e-6
# ALM tolerance parameters
tolerance: 1e-5 # Primal tolerance
dual_tolerance: 1e-5 # Dual tolerance for constraint violation
tolerance_update_factor: 0.1
# Adaptive penalty adjustment
rel_penalty_increase_threshold: 0.1
# Legacy penalty method (if ALM disabled)
gc_penalty:
weight: 0.0 # Only used if use_lagrangian=false
def is_valid_sequence(dna_seq: str) -> bool:
"""
Applies a series of validation checks to a DNA sequence.
Args:
dna_seq (str): The DNA sequence to validate.
Returns:
bool: True if the sequence is valid, False otherwise.
"""
# Check if length is divisible by 3 (valid codon frame)
if len(dna_seq) % 3 != 0:
return False
# Check for valid start codon
if not dna_seq.upper().startswith(('ATG', 'TTG', 'CTG', 'GTG')):
return False
# Check for valid stop codon
if not dna_seq.upper().endswith(('TAA', 'TAG', 'TGA')):
return False
# Check for internal stop codons (excluding the last codon)
codons = [dna_seq[i:i+3].upper() for i in range(0, len(dna_seq) - 3, 3)]
if any(codon in ['TAA', 'TAG', 'TGA'] for codon in codons):
return False
# Check if sequence contains only valid nucleotides
if not all(c in 'ATGC' for c in dna_seq.upper()):
return False
return True
def main():
st.title("ENCOT")
st.markdown("E. coli codon optimization with constraint-aware decoding and in silico evaluation metrics.")
# Load model
load_model_and_tokenizer()
# Create the main tabbed interface
tab1, tab2, tab3, tab4 = st.tabs([
"Single Optimize",
"Batch Process",
"Comparative Analysis",
"Advanced Settings"
])
with tab1:
single_sequence_optimization()
with tab2:
batch_processing()
with tab3:
comparative_analysis()
with tab4:
advanced_settings()
# Footer
st.markdown("---")
st.markdown("**ENCOT**")
st.markdown("Open-source codon optimization for E. coli with reproducible evaluation.")
def benchmark_sequences(sequences, model, tokenizer, device, cai_weights, tai_weights):
"""
Run ENCOT on protein sequences and compute metrics for optimized DNA.
Args:
sequences: List of protein sequences to optimize
model: Loaded ENCOT model
tokenizer: Tokenizer for the model
device: PyTorch device (CPU/GPU)
cai_weights: Pre-computed CAI weights
tai_weights: Pre-computed tAI weights
Returns:
DataFrame with optimization results and metrics
"""
results = []
for name, protein in tqdm(sequences, desc="Optimizing sequences"):
# Optimize the sequence
output = predict_dna_sequence(
protein=protein,
organism="Escherichia coli general",
device=device,
model=model,
tokenizer=tokenizer,
deterministic=True,
use_constrained_search=True,
gc_bounds=(0.45, 0.55)
)
optimized_dna = output.predicted_dna
# Calculate metrics
cai = get_CSI_value(optimized_dna, cai_weights)
tai = calculate_tAI(optimized_dna, tai_weights)
gc_content = get_GC_content(optimized_dna)
cis_elements = count_negative_cis_elements(optimized_dna)
results.append({
'name': name,
'protein': protein,
'optimized_dna': optimized_dna,
'CAI': cai,
'tAI': tai,
'GC_content': gc_content,
'negative_cis_elements': cis_elements
})
return pd.DataFrame(results)
ENCOT/
โโโ CodonTransformer/ # Core library modules
โ โโโ CodonPrediction.py # Model loading & DNA sequence prediction
โ โโโ CodonEvaluation.py # Metrics (CAI, tAI, GC, CFD, etc.)
โ โโโ CodonData.py # Data preprocessing & preparation
โ โโโ CodonUtils.py # Constants, mappings, utilities
โ โโโ CodonPostProcessing.py # DNA-Chisel integration
โ
โโโ scripts/ # Command-line tools
โ โโโ train.py # Training wrapper
โ โโโ optimize_sequence.py # Sequence optimization CLI
โ โโโ run_benchmarks.py # Benchmark evaluation
โ โโโ preprocess_data.py # Data preparation
โ
โโโ configs/ # YAML configurations
โ โโโ train_ecoli_alm.yaml # Main ALM training config โญ
โ โโโ train_ecoli_quick.yaml # Quick test config
โ
โโโ streamlit_gui/ # Web interface
โ โโโ app.py # Main Streamlit GUI โญ
โ โโโ demo.py # Demo script
โ โโโ run_gui.py # Launcher
โ
โโโ data/ # Datasets
โ โโโ finetune_set.json # Training data
โ โโโ test_set.json # Test data
โ
โโโ finetune.py # Main training script โญโญโญ
โโโ benchmark_evaluation.py # Evaluation script
โโโ setup.py # Package setup
โโโ pyproject.toml # Project configuration
โโโ README.md # Documentation
Key Innovations:
โญโญโญ Augmented-Lagrangian Method (ALM) for GC control
โญโญ Constrained beam search with GC bounds
โญ Multi-metric evaluation (CAI, tAI, GC, cis-elements)