| |
| import os |
| import json |
| import time |
| import gradio as gr |
| import speech_recognition as sr |
| import pyttsx3 |
| import threading |
|
|
| from typing import Tuple |
|
|
| |
| USE_OPENAI = bool(os.getenv("OPENAI_API_KEY", "").strip()) |
| if USE_OPENAI: |
| import openai |
|
|
| |
| try: |
| import torch |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
| from happytransformer import HappyTextToText, TTSettings |
| LOCAL_MODEL_AVAILABLE = True |
| except Exception: |
| LOCAL_MODEL_AVAILABLE = False |
|
|
| |
| try: |
| import Levenshtein |
| _have_lev = True |
| except Exception: |
| _have_lev = False |
|
|
| APP_TITLE = "ESPeak — AI Grammar & Speech Assistant" |
|
|
| |
| |
| |
| def levenshtein_distance(a: str, b: str) -> int: |
| if _have_lev: |
| return Levenshtein.distance(a, b) |
| |
| la, lb = len(a), len(b) |
| if la == 0: return lb |
| if lb == 0: return la |
| dp = [[0]*(lb+1) for _ in range(la+1)] |
| for i in range(la+1): |
| dp[i][0] = i |
| for j in range(lb+1): |
| dp[0][j] = j |
| for i in range(1, la+1): |
| for j in range(1, lb+1): |
| cost = 0 if a[i-1]==b[j-1] else 1 |
| dp[i][j] = min(dp[i-1][j]+1, dp[i][j-1]+1, dp[i-1][j-1]+cost) |
| return dp[la][lb] |
|
|
| def score_from_edit(orig: str, corrected: str) -> int: |
| |
| if not orig.strip(): |
| return 0 |
| dist = levenshtein_distance(orig, corrected) |
| |
| norm = max(len(orig), 1) |
| ratio = max(0.0, 1.0 - dist / norm) |
| score = int(round(ratio * 100)) |
| return score |
|
|
| |
| |
| |
| tokenizer = model = happy_tt = None |
| if not USE_OPENAI and LOCAL_MODEL_AVAILABLE: |
| def load_local_models(): |
| global tokenizer, model, happy_tt |
| model_name = "prithivida/grammar_error_correcter_v1" |
| try: |
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_name) |
| happy_tt = HappyTextToText("T5", model_name) |
| except Exception as e: |
| print("Local model load failed:", e) |
| raise |
| load_local_models() |
|
|
| |
| |
| |
| def transcribe_audio_file(audio_filepath: str) -> str: |
| r = sr.Recognizer() |
| try: |
| with sr.AudioFile(audio_filepath) as source: |
| audio_data = r.record(source) |
| text = r.recognize_google(audio_data) |
| return text |
| except sr.UnknownValueError: |
| return "" |
| except Exception as e: |
| return f"[transcription_error]: {str(e)}" |
|
|
| |
| |
| |
| OPENAI_PROMPT_SYSTEM = ( |
| "You are ESPeak Assistant — expert grammar corrector. " |
| "Return JSON only with keys: corrected_text (string), score (0-100 integer), explanation (short string)." |
| ) |
|
|
| OPENAI_USER_TEMPLATE = ( |
| "Correct this sentence for grammar, punctuation, and clarity while preserving tone:\n\n" |
| "### INPUT\n{input_text}\n\n" |
| "Return only JSON with corrected_text, score, and explanation." |
| ) |
|
|
| def call_openai_correct(text: str) -> Tuple[str,int,str]: |
| messages = [ |
| {"role":"system", "content": OPENAI_PROMPT_SYSTEM}, |
| {"role":"user", "content": OPENAI_USER_TEMPLATE.format(input_text=text)} |
| ] |
| resp = openai.ChatCompletion.create( |
| model="gpt-4o-mini" if "gpt-4o-mini" in openai.Model.list() else "gpt-4", |
| messages=messages, |
| temperature=0.0, |
| max_tokens=300 |
| ) |
| content = resp["choices"][0]["message"]["content"].strip() |
| |
| try: |
| parsed = json.loads(content) |
| corrected = parsed.get("corrected_text", "") |
| score = int(parsed.get("score", score_from_edit(text, corrected))) |
| explanation = parsed.get("explanation", "") |
| return corrected, score, explanation |
| except Exception: |
| |
| try: |
| start = content.index("{") |
| end = content.rindex("}")+1 |
| data = json.loads(content[start:end]) |
| corrected = data.get("corrected_text","") |
| score = int(data.get("score", score_from_edit(text, corrected))) |
| explanation = data.get("explanation","") |
| return corrected, score, explanation |
| except Exception: |
| |
| corrected = content |
| score = score_from_edit(text, corrected) |
| explanation = "Auto-correction from OpenAI; parsing fallback used." |
| return corrected, score, explanation |
|
|
| def call_local_correct(text: str) -> Tuple[str,int,str]: |
| |
| prefix = "gec: " + text |
| |
| try: |
| inputs = tokenizer.encode(prefix, return_tensors="pt", max_length=256, truncation=True) |
| with torch.no_grad(): |
| outputs = model.generate(inputs, max_length=256, num_beams=4) |
| corrected = tokenizer.decode(outputs[0], skip_special_tokens=True) |
| except Exception: |
| corrected = text |
|
|
| |
| try: |
| args = TTSettings(num_beams=4, min_length=1) |
| happy_out = happy_tt.generate_text(prefix, args=args).text |
| |
| alt_correction = happy_out or corrected |
| except Exception: |
| alt_correction = corrected |
|
|
| score = score_from_edit(text, alt_correction) |
| |
| explanation = [] |
| if text.strip() == alt_correction.strip(): |
| explanation = ["No change needed."] |
| else: |
| explanation = ["Adjusted grammar/punctuation; minor wording edits to improve clarity."] |
| return alt_correction, score, "; ".join(explanation) |
|
|
| |
| |
| |
| def process_input(audio, typed_text, use_tts=False, prefer_openai=False): |
| """ |
| audio: filepath from Gradio (or None) |
| typed_text: str |
| use_tts: bool -> read corrected text with local pyttsx3 |
| prefer_openai: triage flag to prefer OpenAI (if key available) |
| """ |
| source_text = "" |
| |
| if audio: |
| transcribed = transcribe_audio_file(audio) |
| if transcribed.startswith("[transcription_error]"): |
| source_text = typed_text or "" |
| trans_msg = transcribed |
| else: |
| source_text = transcribed |
| trans_msg = f"Transcribed: {transcribed}" |
| else: |
| source_text = typed_text or "" |
| trans_msg = "Typed input" |
|
|
| if not source_text.strip(): |
| return "No input detected.", 0, "No correction (empty input).", trans_msg, json.dumps({}) |
|
|
| |
| use_openai_backend = False |
| if USE_OPENAI and prefer_openai: |
| use_openai_backend = True |
| elif USE_OPENAI and not LOCAL_MODEL_AVAILABLE: |
| use_openai_backend = True |
| elif not USE_OPENAI and LOCAL_MODEL_AVAILABLE: |
| use_openai_backend = False |
| elif USE_OPENAI and LOCAL_MODEL_AVAILABLE: |
| |
| use_openai_backend = prefer_openai or True |
|
|
| try: |
| if use_openai_backend: |
| corrected, score, explanation = call_openai_correct(source_text) |
| else: |
| corrected, score, explanation = call_local_correct(source_text) |
| except Exception as e: |
| |
| corrected = source_text |
| score = 0 |
| explanation = f"Model error: {e}" |
|
|
| |
| tts_msg = "" |
| if use_tts: |
| try: |
| def speak(text): |
| engine = pyttsx3.init() |
| engine.say(text) |
| engine.runAndWait() |
| threading.Thread(target=speak, args=(corrected,), daemon=True).start() |
| tts_msg = "Speaking corrected text..." |
| except Exception as e: |
| tts_msg = f"TTS failed: {e}" |
|
|
| |
| meta = { |
| "original": source_text, |
| "corrected": corrected, |
| "score": score, |
| "explanation": explanation, |
| "backend": "openai" if use_openai_backend else "local", |
| "transcription_note": trans_msg, |
| "timestamp": int(time.time()) |
| } |
|
|
| return corrected, score, explanation, trans_msg + (" • " + tts_msg if tts_msg else ""), json.dumps(meta, ensure_ascii=False, indent=2) |
|
|
| |
| |
| |
| def build_ui(): |
| with gr.Blocks(title=APP_TITLE, css=""" |
| .header {background: linear-gradient(90deg,#ff8fa3,#ff6aa3); padding: 18px; border-radius: 12px; color:white} |
| .muted {color: #6b7280} |
| """) as demo: |
| |
| with gr.Row(elem_id="top-row"): |
| with gr.Column(scale=3): |
| gr.Markdown(f"## <div class='header'>ESPeak — AI Grammar & Speech Assistant</div>") |
| gr.Markdown("Speak or type a sentence — ESPeak will correct grammar, score it, and explain changes. Use OpenAI backend if you set `OPENAI_API_KEY` in environment.") |
| with gr.Column(scale=1): |
| gr.Markdown("**Quick tips**\n- Speak clearly (short sentences work best)\n- Toggle TTS to hear the corrected sentence\n- Use `Prefer OpenAI` to route to ChatGPT if available") |
| gr.Markdown("---") |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| audio = gr.Audio(sources="microphone", type="filepath", label="Record (microphone)") |
| typed = gr.Textbox(lines=3, placeholder="Or type your sentence here...", label="Text input") |
| with gr.Row(): |
| tts_checkbox = gr.Checkbox(label="Play corrected (TTS)", value=False) |
| prefer_openai = gr.Checkbox(label="Prefer OpenAI backend (if available)", value=True) |
| run_btn = gr.Button("Check Grammar", variant="primary") |
| with gr.Column(scale=2): |
| corrected_out = gr.Textbox(label="Corrected Text", interactive=False) |
| score_out = gr.Number(label="Grammar Score (0-100)", interactive=False) |
| explanation_out = gr.Textbox(label="Explanation (what I changed)", interactive=False) |
| trans_note = gr.Textbox(label="Transcription / Info", interactive=False) |
| meta_out = gr.Code(label="JSON metadata (copyable)", language="json") |
|
|
| def on_submit(audio_file, typed_text, use_tts, use_openai): |
| return process_input(audio_file, typed_text, use_tts, use_openai) |
|
|
| run_btn.click(on_submit, inputs=[audio, typed, tts_checkbox, prefer_openai], |
| outputs=[corrected_out, score_out, explanation_out, trans_note, meta_out]) |
|
|
| gr.Markdown("---") |
| gr.Markdown("**ESPeak** · Built for quick grammar checking of spoken and typed English. Designed for demos and interview projects.") |
| return demo |
|
|
| if __name__ == "__main__": |
| demo = build_ui() |
| demo.launch(share=False, inbrowser=True) |