Spaces:
Build error
Build error
fix: update tax deduction limit and adjust premium calculation logic in get_user_info function
3b0c9f6 | import numpy as np | |
| import librosa | |
| import io | |
| import os | |
| import warnings | |
| import tempfile | |
| from pydub import AudioSegment | |
| from dotenv import load_dotenv | |
| from fastrtc import get_cloudflare_turn_credentials_async, get_cloudflare_turn_credentials | |
| import random | |
| try: | |
| import torch | |
| except ModuleNotFoundError: | |
| torch = None # type: ignore | |
| warnings.filterwarnings("ignore") | |
| class PersonaState: | |
| def __init__(self): | |
| # Generate persona once when the instance is created | |
| self._persona = self._generate_persona() | |
| def format_persona(persona_dict): | |
| """Return a human-readable persona string.""" | |
| if not persona_dict: | |
| return "No persona assigned." | |
| return ( | |
| f"Name: {persona_dict['Name']}\n" | |
| f"Gender: {persona_dict['Gender']}\n" | |
| f"Age: {persona_dict['Age']}\n" | |
| f"Nationality: {persona_dict['Nationality']}\n" | |
| f"Occupation: {persona_dict['Occupation']}\n" | |
| f"เงินได้สุทธิ: {persona_dict['เงินได้สุทธิ']}\n" | |
| f"Traits: {persona_dict['Traits']}\n" | |
| f"Interest in product: {persona_dict['Interest in product']}" | |
| ) | |
| def _generate_persona(self): | |
| names_male = ["สมชาย", "วิทยา", "ประยุทธ", "ธนพล"] | |
| names_female = [ "สมหญิง", "สุดารัตน์","มาลี", "จินตนา"] | |
| genders = ["Male", "Female"] | |
| # ages = ["25-34", "35-44", "45-54", "55-64"] | |
| nationalities = ["Thai"] | |
| occupations = ["พนักงานบริษัท", "เจ้าของธุรกิจ", "ฟรีแลนซ์", "แม่บ้าน/พ่อบ้าน","ข้าราชการ","พ่อค้า/เเม่ค้า", "Influencer"] | |
| traits = ["ใจดี", "จริงจัง", "ชอบพูดคุย", "ขี้สงสัย", "ชอบวางแผน", "รักครอบครัว", "ชอบความท้าทาย", "รอบคอบ", "หงุดหงิดง่าย", "ใจร้อน","หัวอ่อน","ego"] | |
| interests = ["yes", 'no', 'neutral'] | |
| gender = random.choice(genders) | |
| name = random.choice(names_male) if gender == "Male" else random.choice(names_female) | |
| age = random.randint(20,50) | |
| nationality = random.choice(nationalities) | |
| selected_traits = ", ".join(random.sample(traits, k=3)) | |
| if 20 < age <= 25: | |
| occupations = [job for job in occupations if job not in {"ข้าราชการ"}] | |
| income = random.randint(150, 800)*1000 | |
| elif 25 < age <= 45 : | |
| income = random.randint(150, 1500)*1000 | |
| elif 45 < age <= 50: | |
| income = random.randint(150, 2000)*1000 | |
| else: | |
| income = random.randint(120, 400)*1000 | |
| occupation = random.choice(occupations) | |
| interest_in_product = random.choice(interests) | |
| # Store as a dictionary for easier access and cleaner representation | |
| return { | |
| "Name": name, | |
| "Gender": gender, | |
| "Age": age, | |
| "Nationality": nationality, | |
| "Occupation": occupation, | |
| "เงินได้สุทธิ": income, | |
| "Traits": selected_traits, | |
| "Interest in product": interest_in_product | |
| } | |
| def get_persona(self): | |
| """Returns the generated persona as a dictionary.""" | |
| return dict(self._persona) | |
| def get_persona_string(self): | |
| """Returns the generated persona as a formatted string.""" | |
| return self.format_persona(self._persona) | |
| def get_gender(self): | |
| """Returns the gender from the persona.""" | |
| return self._persona["Gender"] | |
| # 1. จัดเก็บข้อมูลเบี้ยประกันจากตาราง | |
| # ข้อมูลนี้คืออัตราเบี้ยประกันต่อทุนประกันทุกๆ 1,000 บาท | |
| PREMIUM_RATES = { | |
| # อายุ: {"แผนจ่าย 8 ปี": {"ชาย": อัตรา, "หญิง": อัตรา}, "แผนจ่ายถึง 60": {"ชาย": อัตรา, "หญิง": อัตรา}} | |
| 20: {"pay_8_years": {"male": 219, "female": 244}, "pay_until_60": {"male": 61, "female": 66}}, | |
| 21: {"pay_8_years": {"male": 228, "female": 252}, "pay_until_60": {"male": 64, "female": 69}}, | |
| 22: {"pay_8_years": {"male": 235, "female": 259}, "pay_until_60": {"male": 67, "female": 72}}, | |
| 23: {"pay_8_years": {"male": 241, "female": 265}, "pay_until_60": {"male": 70, "female": 75}}, | |
| 24: {"pay_8_years": {"male": 247, "female": 272}, "pay_until_60": {"male": 73, "female": 78}}, | |
| 25: {"pay_8_years": {"male": 252, "female": 277}, "pay_until_60": {"male": 76, "female": 81}}, | |
| 26: {"pay_8_years": {"male": 258, "female": 285}, "pay_until_60": {"male": 79, "female": 85}}, | |
| 27: {"pay_8_years": {"male": 264, "female": 292}, "pay_until_60": {"male": 83, "female": 89}}, | |
| 28: {"pay_8_years": {"male": 270, "female": 299}, "pay_until_60": {"male": 87, "female": 93}}, | |
| 29: {"pay_8_years": {"male": 277, "female": 305}, "pay_until_60": {"male": 91, "female": 98}}, | |
| 30: {"pay_8_years": {"male": 285, "female": 313}, "pay_until_60": {"male": 95, "female": 102}}, | |
| 31: {"pay_8_years": {"male": 294, "female": 320}, "pay_until_60": {"male": 100, "female": 107}}, | |
| 32: {"pay_8_years": {"male": 301, "female": 328}, "pay_until_60": {"male": 105, "female": 112}}, | |
| 33: {"pay_8_years": {"male": 311, "female": 337}, "pay_until_60": {"male": 111, "female": 118}}, | |
| 34: {"pay_8_years": {"male": 319, "female": 345}, "pay_until_60": {"male": 117, "female": 125}}, | |
| 35: {"pay_8_years": {"male": 328, "female": 353}, "pay_until_60": {"male": 124, "female": 132}}, | |
| 36: {"pay_8_years": {"male": 337, "female": 361}, "pay_until_60": {"male": 131, "female": 139}}, | |
| 37: {"pay_8_years": {"male": 345, "female": 370}, "pay_until_60": {"male": 139, "female": 147}}, | |
| 38: {"pay_8_years": {"male": 355, "female": 379}, "pay_until_60": {"male": 148, "female": 156}}, | |
| 39: {"pay_8_years": {"male": 364, "female": 388}, "pay_until_60": {"male": 158, "female": 166}}, | |
| 40: {"pay_8_years": {"male": 374, "female": 397}, "pay_until_60": {"male": 168, "female": 177}}, | |
| 41: {"pay_8_years": {"male": 385, "female": 407}, "pay_until_60": {"male": 180, "female": 189}}, | |
| 42: {"pay_8_years": {"male": 396, "female": 417}, "pay_until_60": {"male": 194, "female": 203}}, | |
| 43: {"pay_8_years": {"male": 407, "female": 428}, "pay_until_60": {"male": 209, "female": 218}}, | |
| 44: {"pay_8_years": {"male": 419, "female": 439}, "pay_until_60": {"male": 226, "female": 235}}, | |
| 45: {"pay_8_years": {"male": 431, "female": 450}, "pay_until_60": {"male": 246, "female": 255}}, | |
| 46: {"pay_8_years": {"male": 444, "female": 462}, "pay_until_60": {"male": 268, "female": 278}}, | |
| 47: {"pay_8_years": {"male": 458, "female": 475}, "pay_until_60": {"male": 295, "female": 304}}, | |
| 48: {"pay_8_years": {"male": 472, "female": 488}, "pay_until_60": {"male": 326, "female": 336}}, | |
| 49: {"pay_8_years": {"male": 488, "female": 501}, "pay_until_60": {"male": 364, "female": 373}}, | |
| 50: {"pay_8_years": {"male": 504, "female": 515}, "pay_until_60": {"male": 410, "female": 418}}, | |
| # สำหรับอายุ 51-55 แผนจ่าย 8 ปี ไม่มีข้อมูล (ใช้ None) | |
| 51: {"pay_8_years": {"male": None, "female": None}, "pay_until_60": {"male": 463, "female": 472}}, | |
| 52: {"pay_8_years": {"male": None, "female": None}, "pay_until_60": {"male": 529, "female": 538}}, | |
| 53: {"pay_8_years": {"male": None, "female": None}, "pay_until_60": {"male": 611, "female": 622}}, | |
| 54: {"pay_8_years": {"male": None, "female": None}, "pay_until_60": {"male": 717, "female": 732}}, | |
| 55: {"pay_8_years": {"male": None, "female": None}, "pay_until_60": {"male": 860, "female": 884}}, | |
| } | |
| # 2. สร้าง Function สำหรับคำนวณ | |
| def calculate_be_together_premium(age: int, sum_insured: float, plan_name: str, gender: str): | |
| """ | |
| คำนวณเบี้ยประกันรายปีสำหรับแผน Be ToGether Smart Retirement | |
| Args: | |
| age (int): อายุของผู้เอาประกัน (20-55 ปี) | |
| sum_insured (float): ทุนประกันที่ต้องการ | |
| plan_name (str): ชื่อแผนที่เลือก ("จ่าย 8 ปี" หรือ "จ่ายถึง 60") | |
| gender (str): เพศของผู้เอาประกัน ("ชาย" หรือ "หญิง") | |
| Returns: | |
| float or str: เบี้ยประกันที่คำนวณได้ หรือข้อความแสดงข้อผิดพลาด | |
| """ | |
| # Mapping ค่าที่รับเข้ามาเป็นภาษาไทย ให้ตรงกับ key ใน dictionary | |
| plan_map = {"จ่าย 8 ปี": "pay_8_years", "จ่ายถึง 60": "pay_until_60"} | |
| gender_map = {"Male": "male", "Female": "female"} | |
| # --- ตรวจสอบข้อมูลเบื้องต้น --- | |
| if age not in PREMIUM_RATES: | |
| return f"ขออภัย ไม่พบข้อมูลสำหรับอายุ {age} ปี (รับอายุ 20-55 ปี)" | |
| if plan_name not in plan_map: | |
| return f"ขออภัย ไม่พบแผนประกันที่ชื่อว่า '{plan_name}'" | |
| if gender not in gender_map: | |
| return f"ขออภัย ไม่พบข้อมูลสำหรับเพศ '{gender}'" | |
| if sum_insured <= 0: | |
| return "ทุนประกันต้องเป็นค่ามากกว่า 0" | |
| # --- ดึงข้อมูลจากตาราง --- | |
| internal_plan_key = plan_map[plan_name] | |
| internal_gender_key = gender_map[gender] | |
| rate_per_1000 = PREMIUM_RATES[age][internal_plan_key][internal_gender_key] | |
| # --- ตรวจสอบว่าแผนนั้นมีสำหรับอายุที่ระบุหรือไม่ --- | |
| if rate_per_1000 is None: | |
| return f"แผน '{plan_name}' ไม่สามารถทำได้สำหรับผู้ที่มีอายุ {age} ปี" | |
| # --- คำนวณเบี้ยประกัน --- | |
| # สูตรคือ (ทุนประกัน / 1000) * อัตราเบี้ยประกัน | |
| final_premium = (sum_insured / 1000) * rate_per_1000 | |
| return final_premium | |
| # --- Device Configuration --- | |
| def get_device(): | |
| """Gets the best available device for PyTorch.""" | |
| if torch is None: | |
| return "cpu" | |
| if torch.cuda.is_available(): | |
| return "cuda" | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| return "mps" | |
| else: | |
| return "cpu" | |
| if get_device() == "mps": | |
| load_dotenv(override=True) | |
| device = get_device() | |
| print(f"Using device: {device}") | |
| # --- Cloud Credentials --- | |
| async def get_async_credentials(): | |
| """Asynchronously fetches Cloudflare TURN credentials.""" | |
| return await get_cloudflare_turn_credentials_async(hf_token=os.getenv('HF_TOKEN')) | |
| def get_sync_credentials(ttl=360_000): | |
| """Synchronously fetches Cloudflare TURN credentials.""" | |
| return get_cloudflare_turn_credentials(ttl=ttl) | |
| def setup_gcp_credentials(): | |
| """Sets up Google Cloud credentials from an environment variable.""" | |
| gcp_service_account_json_str = os.getenv("GCP_SERVICE_ACCOUNT_JSON") | |
| # print(gcp_service_account_json_str) | |
| if gcp_service_account_json_str: | |
| try: | |
| # Create a temporary file to store the credentials | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json") as temp_file: | |
| temp_file.write(gcp_service_account_json_str) | |
| gcp_credential_path = temp_file.name # Get the path to the temporary file | |
| # Set the environment variable that Google Cloud libraries expect | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_credential_path | |
| print(f"Google Cloud credentials set from secret to: {gcp_credential_path}") | |
| except Exception as e: | |
| print(f"Error setting up Google Cloud credentials: {e}") | |
| else: | |
| print("Warning: GCP_SERVICE_ACCOUNT_JSON secret not found. Google Cloud services may fail.") | |
| # if gcp_service_account_json_str: | |
| # print("GCP service account JSON loaded from environment variable.") | |
| # else: | |
| # print("Warning: GCP_SERVICE_ACCOUNT_JSON is not set; Google Cloud clients may fail.") | |
| # return gcp_service_account_json_str | |
| # --- Audio Processing --- | |
| # def audiosegment_to_numpy(audio, target_sample_rate=16000): | |
| # samples = np.array(audio.get_array_of_samples(), dtype=np.float32) | |
| # if audio.channels > 1: | |
| # samples = samples.reshape((-1, audio.channels)).mean(axis=1) | |
| # if audio.frame_rate != target_sample_rate: | |
| # samples = librosa.resample(samples, orig_sr=audio.frame_rate, target_sr=target_sample_rate) | |
| # samples /= np.iinfo(audio.array_type).max | |
| # return samples | |
| def audiosegment_to_numpy(audio, target_sample_rate=16000): | |
| """ | |
| Convert pydub.AudioSegment to normalized numpy array in range [-1, 1]. | |
| """ | |
| samples = np.array(audio.get_array_of_samples(), dtype=np.float32) | |
| if audio.channels > 1: | |
| samples = samples.reshape((-1, audio.channels)).mean(axis=1) | |
| # Normalize to [-1, 1] | |
| samples /= np.iinfo(audio.array_type).max | |
| # Resample if needed | |
| if audio.frame_rate != target_sample_rate: | |
| samples = librosa.resample(samples, orig_sr=audio.frame_rate, target_sr=target_sample_rate) | |
| # Final safety normalization | |
| max_val = np.max(np.abs(samples)) | |
| if max_val > 0: | |
| samples = samples / max_val | |
| return samples.astype(np.float32) | |
| def preprocess_audio(audio, target_channels=1, target_sr=16000): | |
| """ | |
| Ensures the audio is mono, target sample rate, and normalized to [-1, 1]. | |
| Args: | |
| audio: tuple (sample_rate, audio_array) | |
| Returns: | |
| tuple: (target_frame_rate, normalized_audio) | |
| """ | |
| target_frame_rate = target_sr | |
| sample_rate, audio_array = audio | |
| #save audio array for debug | |
| with open("debug_audio_array.npy", "wb") as f: | |
| np.save(f, audio_array) | |
| # Convert to int16 PCM if needed | |
| # If input is already float, scale it correctly | |
| if audio_array.dtype != np.int16: | |
| audio_array = np.clip(audio_array, -1.0, 1.0) | |
| audio_array_int16 = (audio_array * 32767).astype(np.int16) | |
| else: | |
| audio_array_int16 = audio_array | |
| # Wrap as BytesIO for AudioSegment | |
| audio_bytes = audio_array_int16.tobytes() | |
| audio_io = io.BytesIO(audio_bytes) | |
| # Convert to AudioSegment | |
| segment = AudioSegment.from_raw( | |
| audio_io, | |
| sample_width=2, | |
| frame_rate=sample_rate, | |
| channels=1 | |
| ) | |
| # Adjust channels & frame rate | |
| segment = segment.set_channels(target_channels) | |
| segment = segment.set_frame_rate(target_frame_rate) | |
| # Convert back to normalized numpy | |
| samples = audiosegment_to_numpy(segment, target_sample_rate=target_frame_rate) | |
| return (target_frame_rate, samples) | |
| def preprocess_audio_simplified(audio, target_sr=16000): | |
| """ | |
| Ensures the audio is mono, at the target sample rate, and normalized to [-1, 1]. | |
| Args: | |
| audio: tuple (original_sr, audio_array) | |
| audio_array is a numpy array. | |
| Returns: | |
| tuple: (target_sr, normalized_audio) | |
| """ | |
| original_sr, audio_array = audio | |
| # Ensure audio_array is float | |
| if audio_array.dtype not in [np.float32, np.float64]: | |
| # Normalize int16 or other int types to [-1, 1] | |
| audio_array = audio_array.astype(np.float32) / np.iinfo(audio_array.dtype).max | |
| # Ensure audio is mono | |
| # Assumes channels are in the first dimension if it's 2D | |
| if audio_array.ndim > 1 and audio_array.shape[0] > 1: | |
| audio_array = np.mean(audio_array, axis=0) | |
| # If shape is (1, N), flatten it to (N,) | |
| audio_array = audio_array.flatten() | |
| # Resample if needed | |
| if original_sr != target_sr: | |
| audio_array = librosa.resample(y=audio_array, orig_sr=original_sr, target_sr=target_sr) | |
| # Peak normalization | |
| max_val = np.max(np.abs(audio_array)) | |
| if max_val > 0: | |
| audio_array = audio_array / max_val | |
| return (target_sr, audio_array.astype(np.float32)) | |
| def is_valid_turn(turn: dict) -> bool: | |
| """ | |
| Checks if a conversation turn is valid for inclusion in the LLM history. | |
| A turn is valid if it has a role and meets role-specific criteria: | |
| - user: must have non-empty content. | |
| - assistant: must have EITHER non-empty content OR tool_calls. | |
| - tool: must have content and a tool_call_id. | |
| """ | |
| if not isinstance(turn, dict) or "role" not in turn: | |
| return False | |
| role = turn.get("role") | |
| if role == "user": | |
| # User turn is valid only if it has non-empty text content. | |
| return bool(turn.get("content") and isinstance(turn.get("content"), str) and turn.get("content").strip()) | |
| elif role == "assistant": | |
| # Assistant turn is valid if it has text content OR if it has tool_calls. | |
| has_content = bool(turn.get("content") and isinstance(turn.get("content"), str) and turn.get("content").strip()) | |
| has_tool_calls = "tool_calls" in turn and turn["tool_calls"] is not None | |
| return has_content or has_tool_calls | |
| elif role == "tool": | |
| # Tool turn is valid if it has a tool_call_id and content. | |
| return "tool_call_id" in turn and "content" in turn | |
| # Reject any other roles or malformed turns. | |
| return False | |