| import os, nltk, numpy as np, requests |
| from sentence_transformers import SentenceTransformer |
| import language_tool_python |
| from PIL import Image |
| from io import BytesIO |
| import pytesseract |
| from datetime import datetime |
|
|
| try: |
| nltk.data.find('tokenizers/punkt') |
| except: |
| nltk.download('punkt') |
|
|
| tool = language_tool_python.LanguageTool('en-US') |
| embed = SentenceTransformer('all-MiniLM-L6-v2') |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| HF_FEEDBACK_MODEL_URL = os.getenv("HF_FEEDBACK_MODEL_URL","https://api-inference.huggingface.co/models/google/flan-t5-large") |
|
|
| def ocr_image_from_filebytes(b): |
| return pytesseract.image_to_string(Image.open(BytesIO(b)).convert("RGB")) |
|
|
| def count_errors(t): |
| return len(tool.check(t)) |
|
|
| def lex_div(t): |
| w=nltk.word_tokenize(t.lower()); |
| return len(set([x for x in w if x.isalpha()]))/len(w) if w else 0 |
|
|
| def avg_sent_len(t): |
| s=nltk.sent_tokenize(t) |
| return sum(len(nltk.word_tokenize(x)) for x in s)/len(s) if s else 0 |
|
|
| def coherence(t): |
| s=nltk.sent_tokenize(t) |
| if len(s)<2: return 0.5 |
| e=embed.encode(s) |
| sims=[ np.dot(e[i],e[i+1])/(np.linalg.norm(e[i])*np.linalg.norm(e[i+1])+1e-9) for i in range(len(e)-1)] |
| return float(sum(sims)/len(sims)) |
|
|
| def norm25(v,a,b): |
| v=max(a,min(b,v)) |
| return int(round((v-a)/(b-a)*25)) if b>a else 0 |
|
|
| def score_text(t): |
| e=count_errors(t) |
| words=nltk.word_tokenize(t) |
| err_rate=e/(len(words) or 1)*100 |
| grammar=norm25(max(0,30-err_rate),0,30) |
| ttr=lex_div(t) |
| asl=avg_sent_len(t) |
| vocab=int(round(0.7*norm25(ttr,0,0.6)+0.3*norm25(min(asl,40),3,20))) |
| coh=norm25((coherence(t)+1)/2,0,1) |
| return {"grammar":grammar,"vocab":vocab,"coherence":coh,"errors_count":e} |
|
|
| async def analyze_full_submission(sub): |
| texts=[sub.get("task1_1",""),sub.get("task1_2",""),sub.get("task2","")] |
| analyses={} |
| combined=[] |
| for k,t in zip(["task1_1","task1_2","task2"],texts): |
| if not t.strip(): |
| analyses[k]={"skipped":True} |
| else: |
| a=score_text(t) |
| analyses[k]=a |
| combined.append(t) |
|
|
| weights={"task1_1":0.1,"task1_2":0.4,"task2":0.5} |
| total=0 |
| for k,w in weights.items(): |
| c=analyses[k] |
| if "skipped" in c: continue |
| total+= (c["grammar"]+c["vocab"]+c["coherence"])*w |
| total=int(round(total)) |
| level="Below B1" |
| if total>=65: level="C1" |
| elif total>=51: level="B2" |
| elif total>=38: level="B1" |
|
|
| return {"analysis":analyses,"total":total,"level":level,"feedback":"Basic feedback.","time":datetime.utcnow().isoformat()} |
|
|