""" Voice Processing API для обработки аудио и извлечения данных о расходах. Основной файл приложения Flask. """ from __future__ import annotations import json import os import subprocess import tempfile import time from datetime import date from functools import lru_cache from pathlib import Path from typing import Any, Optional from flask import Flask, jsonify, request # Импорт экстракторов from extractors import ( ExpenseDateExtractor, ExpenseSupplierExtractor, ExpenseUserExtractor, ExpenseAmountExtractor, ) from expense_predictor import predict_expenses # HuggingFace Token (если нужен для моделей) HF_TOKEN = os.getenv("HF_TOKEN") _WHISPER_MODEL: Optional[Any] = None _HF_ASR_CLIENT: Optional[Any] = None _LOCAL_WHISPER_READY = False _REMOTE_ASR_READY = False app = Flask(__name__) app.config["MAX_CONTENT_LENGTH"] = 20 * 1024 * 1024 TEST_USERS = [ "Олечка", "Влад", ] TEST_SUPPLIERS = [ "alimentara", "andy's pizza", "atlantis alexandri", "avandion srl", "belsug toys s.r.l.", "beauty factory", "berăria costin", "casa curată", "chibox", "cleber", "coffee dealer", "crafti", "curtea macelar", "danjan lux srl", "davidan", "döner kebab", "elica", "energocom", "eurotelecom", "farmacia familia", "fast food", "felicia", "fidesco", "filetti", "franzeluța", "giganet", "global store", "granier", "herb", "hippocrates", "iarmareco", "iherb", "iute credit", "iutecredit", "jardi market", "joom", "katana sushi", "katshop", "kaufland", "kebab", "kiss beauty salon", "letz", "linella", "linella 115", "local", "maestro", "maestro delice", "maximum", "medical market", "megapolis", "merci", "metro", "mikof", "modus vivendi", "moldovagaz", "moldovapresa", "moldpresa", "mozza", "nanu market", "nr 1", "ocean fish", "oldcom", "ovatia", "pandashop", "peach girl", "peon farm", "piața centrală", "pizza9", "premier energy", "primul discounter", "rogob", "salamer", "samurai", "sancos", "startur", "stomatologia familiei", "supraten", "tagaer", "takumi", "telemarket cricova", "temix", "temu", "valconi", "vasconi", "vatsak", "velmart", "volta", "welness & spa termal", "яндекс подписка", ] TEST_PHRASES = [ "Олюня оплатил в Велмарт 455,90 лей", "В Велмарт Олюня оплатил 455,90 лей", "Оля оплатила в Velmart 455,90 лей", "Через 3 дня Влад был в Wellness & spa Thermal на 1200,70 лей", "вчера Олечка заплатил в вольта 425,40 лей", "Сегодня Олюня купил в velmart на 755,50 лей", "Вчера Оля платил в vatsak 185,80 лей", "Сегодня был в Vasconi на 455,30 lei", "Вчера платил в Valconi 325,90 lei", "Сегодня заказал в тему на 895,60 лей", "Вчера купил в temix на 185,50 лей", "Сегодня оплатил в телемаркет Крикова 655,80 лей", "Вчера был в такуме на 425,7 лей", "Сегодня купил в tagaer на 285,40 лей", "Вчера оплатил в Supraten 1200,50 лень", "Сегодня был в стоматологии о фамилии на 455,90 лень", "Я на следующей неделе заказал в стартур билеты на 855,60 лень", "Сегодня оплатил в Sankos 245,70 лей", "Вчера купил в Samurai на 325,40 лей", "Сегодня был в Salomer на 185,50 lei", "Вчера купил vragob na 655,80 lei", "Сегодня купил в primul discounter на 425,03 лей", "Вчера оплатил Premier Energy 985,90 lei", "Сегодня заказали в пицце 9 на 285,60 лей", "Сегодня заказали в пицце 91 на 285,60 лей", "Сегодня заказали в пицце 912 на 285,60 лей", "На прошлой неделе ходили в piața centrală, купили на 455,7 lei", "Сегодня купил в peon farm на 325,40 лей", "Вчера Wallach купила в Peach Girl на 755,50 лей", "Через 2 дня купил в Pandashop на 895,80 лей", "Pazavchora był vivația i kupil na 185,30 lei", "Сегодня оплатил в oldcom 655,90 лей", "Вчера купил рыбу в Ocean Fish на 280 lei", "Сегодня купил в номер 1 на 420 лей", "вчера воля купила в nanu market на 250 lei", "Сегодня купил в Mozza на 380 lei", "Вчера оплатил moldpressa 90 lei", "Сегодня заплатил в Moldova-Presa 180 lei", "Вчера платил MoldovaGaz 1250 lei", "Сегодня был в modus vivendi, я ставил 420 lei", "Вчера купил в Micov na 150 lei", "Сегодня оплатил в метрах 890,13 лей", "Вчера купил в Мерси на 210 lei", "Сегодня заплатил в Megapolis 680 lei", "Вчера Оля купила лекарство в Medical Market на 340 лей", "Сегодня оплатил в максимум 450 lei", "Вчера купил десерт в maestro delice на 120 lei", "Сегодня оплатил в maestro 750 lei", "вчера оля купила в local на 190 лей", "Сегодня был в Linelo 115 и купил на 280 лей", "Вчера купил продукты в Linel на 420,55 лей", "Сегодня оплатил vats 320 lei", "Вчера Олечка была в Kiss Beauty Salon на 450 lei", "Сегодня купил кебаб в кебаб на 150 лей", "Вчера Оля была в Кауфленд и потратила 890,15 лей", "Сегодня купил в cat shop на 650 lei", "Вчера вечером был выкатан суши на 300 восьятлей", "Оля вчера заказала в Joom на 1200 lei", "Сегодня купили рассаду в Ярди Маркет на 280 лей", "Вчера Влад оплатил в uiti credit 950 lei", "Сегодня оплатил в U.T. Credit очередной платеж 1800 лей", "Вчера заказал в iherb витамина на 420 лей", "На прошлой неделе покупали в Ярмареку на 950,13 лей", "Оля вчера была в Хипократис и оставила 650 lei", "Сегодня я купил витамины в herb на 180 лей", "Вчера купил хлеб в Граньер на 70 лей", "Сегодня ходил в Global Store за техникой на 2100 лей", "Вчера я оплатил интернет в Giganet 450,35 лей", "Сегодня Оля купила хлеб Франзелуца на 80 петлей", "вчера купил рыбу в эфилете на 420 лей", "На прошлой неделе заплатил в Fidesco 1300 lei", "Сегодня Влад был в Феличи и купил сыр на 95 лей", "Вчера вечером купили fast food на 180 lei", "Олечка вчера купила лекарство фармачия Familia на 240 лей", "Я сегодня утром оплатил Eurotelicom 310 lei", "Вчера Владислав оплатил энергоком 560 lei", "Сегодня оплатил в Елика 420 лей", "На следующей неделе в субботу хочу зайти в дёйнер-кебаб", ] def env_flag(name: str, default: bool = False) -> bool: """Парсит bool-флаг из переменных окружения.""" raw = os.getenv(name) if raw is None: return default return raw.strip().lower() in {"1", "true", "yes", "on"} def get_whisper_backend() -> str: """Возвращает активный backend для STT.""" backend = (os.getenv("WHISPER_BACKEND") or "auto").strip().lower() if backend not in {"auto", "hf-inference", "local"}: return "auto" return backend def should_use_remote_asr() -> bool: """Определяет, можно ли использовать HF Inference для ASR.""" backend = get_whisper_backend() return backend in {"auto", "hf-inference"} and bool(HF_TOKEN) def should_use_local_asr() -> bool: """Определяет, можно ли использовать локальный ASR.""" backend = get_whisper_backend() if backend == "hf-inference" and not HF_TOKEN: return True return backend in {"auto", "local"} def get_hf_asr_client() -> Any: """Возвращает клиент HF Inference для ASR.""" global _HF_ASR_CLIENT if _HF_ASR_CLIENT is None: from huggingface_hub import InferenceClient provider = os.getenv("WHISPER_REMOTE_PROVIDER", "hf-inference") timeout = float(os.getenv("WHISPER_REMOTE_TIMEOUT", "15")) _HF_ASR_CLIENT = InferenceClient( provider=provider, api_key=HF_TOKEN, timeout=timeout, ) print(f"[INFO] hf inference client ready: provider={provider}, timeout={timeout}s") return _HF_ASR_CLIENT def warmup_local_whisper_model() -> None: """Прогревает локальную faster-whisper модель один раз.""" global _LOCAL_WHISPER_READY if _LOCAL_WHISPER_READY: return started = time.time() model = get_whisper_model() print(f"[TIMINGS] whisper_preload: {round(time.time() - started, 3)}s") import struct import wave warmup_path = "/tmp/_whisper_warmup.wav" try: with wave.open(warmup_path, "w") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(16000) wf.writeframes(struct.pack("<" + "h" * 3200, *([0] * 3200))) wt0 = time.time() segments, _ = model.transcribe( warmup_path, language="ru", beam_size=1, condition_on_previous_text=False, vad_filter=False, ) _ = list(segments) print(f"[TIMINGS] whisper_warmup: {round(time.time() - wt0, 3)}s") _LOCAL_WHISPER_READY = True finally: if os.path.exists(warmup_path): os.unlink(warmup_path) def ensure_asr_ready(include_local: bool = False) -> dict[str, bool]: """Инициализирует доступные ASR backend-ы без повторного прогрева.""" global _REMOTE_ASR_READY remote_ready = False local_ready = _LOCAL_WHISPER_READY if should_use_remote_asr(): get_hf_asr_client() _REMOTE_ASR_READY = True remote_ready = True if include_local and should_use_local_asr() and env_flag("WHISPER_PRELOAD_ON_HEALTH", default=True): warmup_local_whisper_model() local_ready = _LOCAL_WHISPER_READY return { "remote_ready": remote_ready or _REMOTE_ASR_READY, "local_ready": local_ready, } def preprocess_audio_for_asr(audio_path: str) -> tuple[str, Optional[str]]: """Приводит аудио к 16k mono wav для более стабильного и быстрого STT.""" if not env_flag("WHISPER_PREPROCESS_AUDIO", default=True): return audio_path, None fd, prepared_path = tempfile.mkstemp(suffix=".wav") os.close(fd) command = [ "ffmpeg", "-y", "-i", audio_path, "-ar", "16000", "-ac", "1", "-vn", prepared_path, ] try: subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return prepared_path, prepared_path except Exception: if os.path.exists(prepared_path): os.unlink(prepared_path) return audio_path, None def transcribe_audio_remote(audio_path: str) -> tuple[str, float]: """Транскрибирует аудио через HF Inference.""" started = time.time() client = get_hf_asr_client() model_id = os.getenv("WHISPER_REMOTE_MODEL", "openai/whisper-large-v3") result = client.automatic_speech_recognition(audio=audio_path, model=model_id) text = (getattr(result, "text", None) or "").strip() elapsed = round(time.time() - started, 3) print(f"[TIMINGS] whisper_transcribe_remote: {elapsed}s") if text: return text, elapsed raise RuntimeError("HF Inference returned empty transcription.") def transcribe_audio_local(audio_path: str) -> tuple[str, float]: """Транскрибирует аудио локально через faster-whisper.""" started = time.time() model = get_whisper_model() beam_size = max(1, int(os.getenv("WHISPER_NUM_BEAMS", "1"))) vad_filter = env_flag("WHISPER_VAD_FILTER", default=False) segments, _ = model.transcribe( audio_path, language="ru", beam_size=beam_size, condition_on_previous_text=False, vad_filter=vad_filter, ) text = "".join(segment.text for segment in segments).strip() elapsed = round(time.time() - started, 3) print(f"[TIMINGS] whisper_transcribe_local: {elapsed}s") if text: return text, elapsed raise RuntimeError("Local faster-whisper returned empty transcription.") def get_whisper_model() -> Any: """Возвращает faster-whisper WhisperModel (ленивая загрузка).""" global _WHISPER_MODEL if _WHISPER_MODEL is None: from faster_whisper import WhisperModel model_id = os.getenv("WHISPER_MODEL", "deepdml/faster-whisper-large-v3-ct2") cpu_threads = max(1, int(os.getenv("WHISPER_CPU_THREADS", "2"))) _WHISPER_MODEL = WhisperModel( model_id, device="cpu", compute_type="int8", cpu_threads=cpu_threads, num_workers=1, ) print(f"[INFO] faster-whisper loaded: model={model_id}, threads={cpu_threads}") return _WHISPER_MODEL @lru_cache(maxsize=32) def build_cached_pipeline(suppliers_key: tuple[str, ...], users_key: tuple[str, ...]) -> ExpenseTextExtractor: """Кэширует экстрактор для повторяющихся наборов suppliers/users.""" return ExpenseTextExtractor(suppliers=list(suppliers_key), users=list(users_key)) class ExpenseTextExtractor: """ Главный экстрактор данных о расходах. Комбинирует все экстракторы: даты, поставщики, пользователи, суммы. """ def __init__(self, suppliers: list[str], users: list[str]) -> None: self.date_extractor = ExpenseDateExtractor() self.supplier_extractor = ExpenseSupplierExtractor(suppliers=suppliers) self.amount_extractor = ExpenseAmountExtractor(suppliers=suppliers) self.user_extractor = ExpenseUserExtractor(users=users, suppliers=suppliers) def extract( self, text: str, reference_date: str | date | None = None, debug: bool = False, ) -> dict[str, Any]: """Извлекает все данные из текста.""" timings: dict[str, float] = {} t0 = time.time() date_info = self.date_extractor.extract(text, reference_date=reference_date, debug=debug) timings["date_extractor"] = round(time.time() - t0, 3) if debug: print(f"[DEBUG][DATE] {date_info}") t0 = time.time() supplier_info = self.supplier_extractor.extract( text, date_phrase=date_info.get("matched_date_phrase"), debug=debug, ) timings["supplier_extractor"] = round(time.time() - t0, 3) if debug: print(f"[DEBUG][SUPPLIER] {supplier_info}") t0 = time.time() user_info = self.user_extractor.extract( text, supplier_phrase=supplier_info.get("matched_supplier_phrase"), date_phrase=date_info.get("matched_date_phrase"), debug=debug, ) timings["user_extractor"] = round(time.time() - t0, 3) if debug: print(f"[DEBUG][USER] {user_info}") t0 = time.time() amount_info = self.amount_extractor.extract( text, matched_date_phrase=date_info["matched_date_phrase"], matched_supplier_phrase=supplier_info["matched_supplier_phrase"], debug=debug, ) timings["amount_extractor"] = round(time.time() - t0, 3) if debug: print(f"[DEBUG][AMOUNT] {amount_info}") if debug: print(f"[TIMINGS] {timings}") result = { "text": text, "user": user_info["user"], "supplier": supplier_info["supplier"], "amount": amount_info["amount"], "date": date_info["date"], "date_iso": date_info["date_iso"], } if debug: result["debug"] = { "timings": timings, "date": date_info.get("date_debug"), "supplier": supplier_info.get("supplier_debug"), "user": user_info.get("user_debug"), "amount": amount_info.get("amount_debug"), } return result def build_default_pipeline(suppliers: list[str], users: list[str]) -> ExpenseTextExtractor: """Создаёт пайплайн извлечения данных.""" suppliers_key = tuple(item for item in suppliers if item) users_key = tuple(item for item in users if item) return build_cached_pipeline(suppliers_key, users_key) def extract_names(items: Any) -> list[str]: """Извлекает имена из списка объектов или строк.""" if not isinstance(items, list): return [] names: list[str] = [] for item in items: if isinstance(item, dict): name = item.get("name") if isinstance(name, str) and name.strip(): names.append(name.strip()) continue if isinstance(item, str) and item.strip(): names.append(item.strip()) return names def polish_notes_text(text: str) -> str: """Форматирует текст заметки.""" import re normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return "" normalized = normalized[0].upper() + normalized[1:] if normalized[-1] not in ".!?": normalized += "." return normalized def transcribe_audio_text(audio_path: str) -> tuple[str, float]: """Транскрибирует аудио в текст. Возвращает (текст, время в секундах).""" mock_text = os.getenv("EXPENSE_VOICE_MOCK_TEXT") if mock_text: return mock_text.strip(), 0.0 prepared_audio_path, prepared_temp_path = preprocess_audio_for_asr(audio_path) try: if should_use_remote_asr(): try: text, elapsed = transcribe_audio_remote(prepared_audio_path) print(f"[TIMINGS] whisper_backend: hf-inference") return text, elapsed except Exception as remote_error: print(f"[WARN] Remote ASR failed: {remote_error}") if not should_use_local_asr(): raise if should_use_local_asr(): text, elapsed = transcribe_audio_local(prepared_audio_path) print(f"[TIMINGS] whisper_backend: local") return text, elapsed except Exception as e: print(f"[ERROR] Whisper transcribe failed: {e}") finally: if prepared_temp_path and os.path.exists(prepared_temp_path): os.unlink(prepared_temp_path) raise RuntimeError("Speech-to-text backend is unavailable.") def process_voice_request(audio_path: str, mode: str, payload: dict[str, Any], debug: bool = False) -> dict[str, Any]: """Обрабатывает голосовой запрос.""" total_start = time.time() context = payload.get("context", {}) if isinstance(payload, dict) else {} supplier_names = extract_names(context.get("suppliers")) user_names = extract_names(context.get("users")) transcript, whisper_time = transcribe_audio_text(audio_path) if debug: print(f"[DEBUG][TRANSCRIPT] {transcript}") print( f"[DEBUG][CONTEXT] suppliers_count={len(supplier_names)}, users_count={len(user_names)}" ) print(f"[DEBUG][SUPPLIERS] {supplier_names}") print(f"[DEBUG][USERS] {user_names}") if mode == "notes": notes = polish_notes_text(transcript) return { "status": "ok", "text": transcript, "notes": notes, "supplier": None, "user": None, "date": None, "sum": None, } if not supplier_names: raise RuntimeError("No suppliers were provided by Laravel context.") if not user_names: raise RuntimeError("No users were provided by Laravel context.") t0 = time.time() extractor = build_default_pipeline(suppliers=supplier_names, users=user_names) pipeline_init_time = round(time.time() - t0, 3) print(f"[TIMINGS] pipeline_init: {pipeline_init_time}s") extracted = extractor.extract(transcript, reference_date=date.today().isoformat(), debug=debug) if debug: print(f"[DEBUG][EXTRACTED_RAW] {extracted}") total_time = round(time.time() - total_start, 3) print(f"[TIMINGS] TOTAL: {total_time}s (whisper: {whisper_time}s)") payload = { "status": "ok", "text": transcript, "notes": polish_notes_text(extracted.get("text") or transcript), "supplier": extracted.get("supplier"), "user": extracted.get("user"), "date": extracted.get("date_iso") or extracted.get("date"), "sum": extracted.get("amount"), } if debug and extracted.get("debug"): payload["debug"] = extracted.get("debug") if debug: print(f"[DEBUG][RESPONSE_PAYLOAD] {payload}") return payload def parse_context(raw: str | None) -> dict[str, Any]: """Парсит JSON контекст.""" if not raw: return {} try: payload = json.loads(raw) return payload if isinstance(payload, dict) else {} except json.JSONDecodeError: return {} def parse_json_payload() -> dict[str, Any]: """Возвращает JSON payload из входящего запроса.""" payload = request.get_json(silent=True) return payload if isinstance(payload, dict) else {} # ============================================================================ # ENDPOINTS # ============================================================================ @app.get("/") def index(): """Главная страница API.""" return jsonify({ "status": "ok", "message": "Expense Processing API is running", "endpoints": { "POST /process-audio": "Process audio file", "POST /predict-expenses": "Predict next 3 expenses based on history", "GET /health": "Health check", "GET /test-data": "Run text-only extraction tests" } }) @app.get("/health") def health(): """Проверка здоровья сервиса.""" try: readiness = ensure_asr_ready(include_local=True) return jsonify({ "status": "ok", "stt": { "backend": get_whisper_backend(), "remote_enabled": should_use_remote_asr(), "local_enabled": should_use_local_asr(), **readiness, }, }) except Exception as exception: return jsonify({"status": "error", "message": str(exception)}), 503 @app.get("/test-data") def test_data(): """Тестирует извлечение данных из текста без использования Whisper.""" debug = (request.args.get("debug") or "").strip().lower() == "1" extractor = build_default_pipeline(suppliers=TEST_SUPPLIERS, users=TEST_USERS) started = time.time() results: list[dict[str, Any]] = [] for phrase in TEST_PHRASES: item_started = time.time() extracted = extractor.extract( phrase, reference_date=date.today().isoformat(), debug=debug, ) row = { "text": phrase, "user": extracted.get("user"), "supplier": extracted.get("supplier"), "amount": extracted.get("amount"), "date": extracted.get("date"), "date_iso": extracted.get("date_iso"), "processing_time": round(time.time() - item_started, 3), } if debug and extracted.get("debug"): row["debug"] = extracted.get("debug") results.append(row) return jsonify({ "status": "ok", "mode": "text-only", "reference_date": date.today().isoformat(), "phrases_count": len(TEST_PHRASES), "suppliers_count": len(TEST_SUPPLIERS), "users_count": len(TEST_USERS), "total_processing_time": round(time.time() - started, 3), "results": results, }) @app.post("/process-audio") def process_audio(): """Обработка аудио файла.""" audio = request.files.get("audio") mode = (request.form.get("mode") or "expense").strip() debug = (request.args.get("debug") or "") == "1" context = parse_context(request.form.get("context")) if audio is None: return jsonify({"status": "error", "message": "Audio file is required."}), 422 suffix = Path(audio.filename or "voice.webm").suffix or ".webm" temp_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: temp_path = temp_file.name audio.save(temp_file) result = process_voice_request(audio_path=temp_path, mode=mode, payload={"context": context}, debug=debug) return jsonify(result) except Exception as exception: return jsonify({"status": "error", "message": str(exception)}), 422 finally: if temp_path and os.path.exists(temp_path): os.unlink(temp_path) @app.post("/predict-expenses") def predict_expenses_endpoint(): """Predicts top 3 expenses user should add based on 6-month history.""" payload = parse_json_payload() expenses = payload.get("expenses") or [] user_id = payload.get("user_id") debug = (request.args.get("debug") or request.args.get("debut") or "").strip().lower() == "" if not isinstance(expenses, list): return jsonify({"status": "error", "message": "expenses must be a list"}), 422 if user_id is None: return jsonify({"status": "error", "message": "user_id is required"}), 422 try: predictions = predict_expenses(expenses, target_user_id=user_id, debug=debug) return jsonify({ "status": "ok", "predictions": predictions }) except Exception as exception: return jsonify({"status": "error", "message": str(exception)}), 422 if __name__ == "__main__": app.run(host="0.0.0.0", port=int(os.getenv("PORT", "7860")))