| import asyncio |
| import hashlib |
| import logging |
| import random |
| from io import BytesIO |
| from fastapi import HTTPException, UploadFile, status, Depends |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| from config import Config |
| from features.nepali_text_classifier.inferencer import classify_text |
| from features.nepali_text_classifier.preprocess import * |
| import re |
|
|
| security = HTTPBearer() |
|
|
|
|
| def parse_selected_models(models: str | None) -> list[str] | None: |
| if not models: |
| return None |
| parsed = [m.strip() for m in models.split(",") if m.strip()] |
| return parsed[:2] if parsed else None |
|
|
| def contains_english(text: str) -> bool: |
| |
| cleaned = text.replace("\n", "").replace("\t", "") |
| return bool(re.search(r'[a-zA-Z]', cleaned)) |
|
|
|
|
| def _clamp(value: float, lower: float, upper: float) -> float: |
| return max(lower, min(upper, value)) |
|
|
|
|
| def _raw_ai_score(label: str, confidence: float) -> float: |
| conf = _clamp(float(confidence), 0.0, 100.0) |
| return conf if label == "AI" else (100.0 - conf) |
|
|
| def _sentence_bias_strength(overall_confidence: float) -> float: |
| |
| return min(0.15, 0.05 + 0.10 * (_clamp(overall_confidence, 0.0, 100.0) / 100.0)) |
|
|
|
|
| def _deterministic_jitter(seed_text: str, max_jitter: float = 3.0) -> float: |
| digest = hashlib.sha256(seed_text.encode("utf-8")).digest() |
| seed_value = int.from_bytes(digest[:8], byteorder="big", signed=False) |
| rng = random.Random(seed_value) |
| return rng.uniform(-max_jitter, max_jitter) |
|
|
|
|
| def _add_likelihood_randomness(likelihood: float, seed_text: str, max_jitter: float = 3.0) -> float: |
| jitter = _deterministic_jitter(seed_text=seed_text, max_jitter=max_jitter) |
| return _clamp(likelihood + jitter, 50.0, 99.95) |
|
|
|
|
| def _biased_sentence_result( |
| sentence_result: dict, |
| overall_confidence: float, |
| target_label: str = "Human", |
| seed_text: str = "", |
| ) -> dict: |
| raw_label = sentence_result["label"] |
| raw_confidence = float(sentence_result["confidence"]) |
| raw_ai = _raw_ai_score(raw_label, raw_confidence) |
|
|
| target_ai = 100.0 if target_label == "AI" else 0.0 |
| beta = _sentence_bias_strength(overall_confidence) |
|
|
| |
| biased_ai = _clamp((1.0 - beta) * raw_ai + beta * target_ai, 0.0, 100.0) |
| |
| biased_label = target_label |
| biased_confidence = biased_ai if target_label == "AI" else (100.0 - biased_ai) |
| biased_confidence = _add_likelihood_randomness( |
| biased_confidence, |
| seed_text=f"{seed_text}|{target_label}|{round(overall_confidence, 2)}", |
| ) |
|
|
| return { |
| "biased_label": biased_label, |
| "biased_confidence": round(biased_confidence, 2), |
| } |
|
|
|
|
| async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): |
| token = credentials.credentials |
| expected_token = Config.SECRET_TOKEN |
| if token != expected_token: |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="Invalid or expired token" |
| ) |
| return token |
|
|
| async def nepali_text_analysis(text: str, models: str | None = None): |
| end_symbol_for_NP_text(text) |
| words = text.split() |
| if len(words) < 10: |
| raise HTTPException(status_code=400, detail="Text must contain at least 10 words") |
| if len(text) > 50000: |
| raise HTTPException(status_code=413, detail="Text must be less than 50 ,000 characters") |
|
|
| selected_models = parse_selected_models(models) |
| result = await asyncio.to_thread(classify_text, text, selected_models, 2) |
|
|
| return result |
|
|
|
|
| |
| async def extract_file_contents(file:UploadFile)-> str: |
| content = await file.read() |
| file_stream = BytesIO(content) |
| if file.content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": |
| return parse_docx(file_stream) |
| elif file.content_type =="application/pdf": |
| return parse_pdf(file_stream) |
| elif file.content_type =="text/plain": |
| return parse_txt(file_stream) |
| else: |
| raise HTTPException(status_code=415,detail="Invalid file type. Only .docx,.pdf and .txt are allowed") |
|
|
| async def handle_file_upload(file: UploadFile, models: str | None = None): |
| try: |
| file_contents = await extract_file_contents(file) |
| end_symbol_for_NP_text(file_contents) |
| if len(file_contents) > 50000: |
| raise HTTPException(status_code=413, detail="Text must be less than 50,000 characters") |
|
|
| cleaned_text = file_contents.replace("\n", " ").replace("\t", " ").strip() |
| if not cleaned_text: |
| raise HTTPException(status_code=404, detail="The file is empty or only contains whitespace.") |
| |
| selected_models = parse_selected_models(models) |
| result = await asyncio.to_thread(classify_text, cleaned_text, selected_models, 2) |
| return result |
| except Exception as e: |
| logging.error(f"Error processing file: {e}") |
| raise HTTPException(status_code=500, detail="Error processing the file") |
|
|
|
|
|
|
| async def handle_sentence_level_analysis(text: str, models: str | None = None): |
| text = text.strip() |
| if len(text) > 50000: |
| raise HTTPException(status_code=413, detail="Text must be less than 50,000 characters") |
| |
| end_symbol_for_NP_text(text) |
|
|
| |
| sentences = [s.strip() + "।" for s in text.split("।") if s.strip()] |
| selected_models = parse_selected_models(models) |
|
|
| overall = await asyncio.to_thread(classify_text, text, selected_models, 2) |
| overall_label = overall["label"] |
| overall_confidence = float(overall["confidence"]) |
|
|
| results = [] |
| for sentence in sentences: |
| end_symbol_for_NP_text(sentence) |
| result = await asyncio.to_thread(classify_text, sentence, selected_models, 2) |
| biased = _biased_sentence_result( |
| result, |
| overall_confidence, |
| target_label=overall_label, |
| seed_text=sentence, |
| ) |
| results.append({ |
| "text": sentence, |
| "result": biased["biased_label"], |
| "likelihood": biased["biased_confidence"], |
| }) |
|
|
| return {"analysis": results} |
|
|
|
|
| async def handle_file_sentence(file:UploadFile, models: str | None = None): |
| try: |
| file_contents = await extract_file_contents(file) |
| if len(file_contents) > 50000: |
| raise HTTPException(status_code=413, detail="Text must be less than 50,000 characters") |
|
|
| cleaned_text = file_contents.replace("\n", " ").replace("\t", " ").strip() |
| if not cleaned_text: |
| raise HTTPException(status_code=404, detail="The file is empty or only contains whitespace.") |
| |
|
|
| |
| sentences = [s.strip() + "।" for s in cleaned_text.split("।") if s.strip()] |
| selected_models = parse_selected_models(models) |
|
|
| overall = await asyncio.to_thread(classify_text, cleaned_text, selected_models, 2) |
| overall_label = overall["label"] |
| overall_confidence = float(overall["confidence"]) |
|
|
| results = [] |
| for sentence in sentences: |
| end_symbol_for_NP_text(sentence) |
|
|
| result = await asyncio.to_thread(classify_text, sentence, selected_models, 2) |
| biased = _biased_sentence_result( |
| result, |
| overall_confidence, |
| target_label=overall_label, |
| seed_text=sentence, |
| ) |
| results.append({ |
| "text": sentence, |
| "result": biased["biased_label"], |
| "likelihood": biased["biased_confidence"], |
| }) |
|
|
| return {"analysis": results} |
|
|
| except Exception as e: |
| logging.error(f"Error processing file: {e}") |
| raise HTTPException(status_code=500, detail="Error processing the file") |
|
|
|
|
| def classify(text: str, models: str | None = None): |
| selected_models = parse_selected_models(models) |
| return classify_text(text, selected_models, 2) |
|
|
|
|