Spaces:
Paused
Paused
| # llm_processor.py - LLM ์ฒ๋ฆฌ ๋ชจ๋ | |
| import os | |
| import re | |
| import time | |
| from datetime import datetime | |
| import logging | |
| # HuggingFace ๊ด๋ จ import | |
| try: | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| LlamaConfig, | |
| LlamaForCausalLM, | |
| BitsAndBytesConfig | |
| ) | |
| import torch | |
| TRANSFORMERS_AVAILABLE = True | |
| except ImportError: | |
| print("โ ๏ธ Transformers ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ์ค์น๋์ง ์์์ต๋๋ค") | |
| TRANSFORMERS_AVAILABLE = False | |
| class TaxRuleEngine: | |
| """์ทจ๋์ธ ๊ณ์ฐ ์์ง (๋ ธํธ๋ถ์์ ์ถ์ถ)""" | |
| def __init__(self): | |
| # ์กฐ์ ๋์์ง์ญ (์์ธ ์ฃผ์ ์ง์ญ) | |
| self.adjustment_areas = [ | |
| "๊ฐ๋จ๊ตฌ", "์์ด๊ตฌ", "์กํ๊ตฌ", "์ฉ์ฐ๊ตฌ" | |
| ] | |
| # ๋ค์ฃผํ ์ค๊ณผ์ธ ์ธ์จ (์ฒ๋ถ์) | |
| self.multi_housing_rates = { | |
| "1์ธ๋2์ฃผํ_์กฐ์ ๋์": 80, # 8% | |
| "1์ธ๋3์ฃผํ_์กฐ์ ๋์": 120, # 12% | |
| "1์ธ๋4์ฃผํ์ด์_์กฐ์ ๋์": 120, # 12% | |
| "1์ธ๋3์ฃผํ_์กฐ์ ๋์์ธ": 80, # 8% | |
| "1์ธ๋4์ฃผํ์ด์_์กฐ์ ๋์์ธ": 120, # 12% | |
| } | |
| def calculate_housing_tax_rate(self, acquisition_value): | |
| """์ฃผํ ์ทจ๋์ธ์จ ๊ณ์ฐ (์ง๋ฐฉ์ธ๋ฒ ์ 11์กฐ ์ 8ํธ)""" | |
| if acquisition_value <= 600000000: # 6์ต์ ์ดํ | |
| return 10 | |
| elif acquisition_value <= 900000000: # 6์ต ์ด๊ณผ 9์ต ์ดํ | |
| excess = acquisition_value - 600000000 | |
| rate = (excess / 300000000) * 20 + 10 | |
| return round(rate, 4) | |
| else: # 9์ต ์ด๊ณผ | |
| return 30 | |
| def is_adjustment_area(self, location): | |
| """์กฐ์ ๋์์ง์ญ ์ฌ๋ถ ํ๋จ""" | |
| return any(area in location for area in self.adjustment_areas) | |
| def determine_multi_housing_heavy_tax(self, total_housing_count, is_adjustment_area, acquisition_type="๋งค๋งค"): | |
| """๋ค์ฃผํ ์ค๊ณผ์ธ ์ ํ ๊ฒฐ์ """ | |
| if acquisition_type in ['์์', '์ฆ์ฌ', '๋ฌด์์ทจ๋']: | |
| if is_adjustment_area and total_housing_count >= 2: | |
| return '์กฐ์ ์ง์ญ๊ณ ๊ฐ์ฃผํ์ฆ์ฌ' # 12% | |
| return None | |
| if total_housing_count <= 1: | |
| return None | |
| elif total_housing_count == 2: | |
| return '1์ธ๋2์ฃผํ_์กฐ์ ๋์' if is_adjustment_area else None | |
| elif total_housing_count == 3: | |
| return '1์ธ๋3์ฃผํ_์กฐ์ ๋์' if is_adjustment_area else '1์ธ๋3์ฃผํ_์กฐ์ ๋์์ธ' | |
| else: # 4์ฃผํ ์ด์ | |
| return '1์ธ๋4์ฃผํ์ด์_์กฐ์ ๋์' if is_adjustment_area else '1์ธ๋4์ฃผํ์ด์_์กฐ์ ๋์์ธ' | |
| def calculate_comprehensive_tax(self, property_info): | |
| """์ข ํฉ ์ทจ๋์ธ ๊ณ์ฐ""" | |
| if not property_info.get('acquisition_value'): | |
| return None | |
| # ๊ธฐ๋ณธ ์ธ์จ ๊ณ์ฐ | |
| base_rate = self.calculate_housing_tax_rate(property_info['acquisition_value']) | |
| # ์ฃผํ์ ๋ฐ ์กฐ์ ๋์์ง์ญ ํ์ธ | |
| total_housing_count = len(property_info.get('housing_list', [])) + 1 | |
| is_adjustment_area = self.is_adjustment_area(property_info.get('location', '')) | |
| # ์ค๊ณผ์ธ ๊ฒฐ์ | |
| heavy_tax_type = property_info.get('heavy_tax_type') | |
| if not heavy_tax_type: | |
| heavy_tax_type = self.determine_multi_housing_heavy_tax( | |
| total_housing_count, | |
| is_adjustment_area, | |
| property_info.get('acquisition_type', '๋งค๋งค') | |
| ) | |
| # ์ต์ข ์ธ์จ ๊ฒฐ์ | |
| final_rate = base_rate | |
| if heavy_tax_type and heavy_tax_type in self.multi_housing_rates: | |
| final_rate = self.multi_housing_rates[heavy_tax_type] | |
| elif heavy_tax_type == '์กฐ์ ์ง์ญ๊ณ ๊ฐ์ฃผํ์ฆ์ฌ': | |
| final_rate = 120 # 12% | |
| # ๋ฉด์ธ์ ํ์ธ (50๋ง์ ์ดํ) | |
| if property_info['acquisition_value'] <= 500000: | |
| tax_amount = 0 | |
| else: | |
| tax_amount = int(property_info['acquisition_value'] * (final_rate / 1000)) | |
| return { | |
| 'tax_amount': tax_amount, | |
| 'base_rate': base_rate, | |
| 'final_rate': final_rate, | |
| 'heavy_tax_type': heavy_tax_type, | |
| 'is_adjustment_area': is_adjustment_area, | |
| 'total_housing_count': total_housing_count, | |
| 'acquisition_value': property_info['acquisition_value'] | |
| } | |
| class LLMProcessor: | |
| """HyperCLOVA X ๊ธฐ๋ฐ LLM ์ฒ๋ฆฌ ๋ชจ๋""" | |
| def __init__(self): | |
| self.model = None | |
| self.tokenizer = None | |
| self.tax_engine = TaxRuleEngine() | |
| self.is_initialized = False | |
| self.device = 'cpu' | |
| # ์์คํ ํ๋กฌํํธ | |
| self.system_prompt = """๋น์ ์ ๋ํ๋ฏผ๊ตญ ์ง๋ฐฉ์ธ๋ฒ ์ทจ๋์ธ ์ ๋ฌธ๊ฐ์ ๋๋ค. | |
| ์ฃผ์ ์ญํ : | |
| 1. ์ทจ๋์ธ ๊ด๋ จ ์ง๋ฌธ์ ์ ํํ๊ณ ์์ธํ ๋ต๋ณ ์ ๊ณต | |
| 2. ์ง๋ฐฉ์ธ๋ฒ ์ 2์ฅ ์ทจ๋์ธ ๊ท์ ๊ธฐ์ค ํด์ | |
| 3. ๋ค์ฃผํ ๋ณด์ ์ ์ค๊ณผ์ธ ๊ณ์ฐ ๋ฐ ์ค๋ช | |
| 4. ์กฐ์ ๋์์ง์ญ ์ฌ๋ถ์ ๋ฐ๋ฅธ ์ธ์จ ์ฐจ์ด ์ค๋ช | |
| 5. ์ฃผํ์ ์ฐ์ ๊ธฐ์ค (์ํ๋ น ์ 28์กฐ์4) ์ ์ฉ | |
| ๋ต๋ณ ํ์: | |
| - ํด๋น ๋ฒ๋ น ์กฐํญ ๋ช ์ | |
| - ๊ตฌ์ฒด์ ์ธ ๊ณ์ฐ ๊ณผ์ ์ค๋ช | |
| - ์ ์ธ ๋ฐฉ์ ์ ์ (ํฉ๋ฒ์ ๋ฒ์ ๋ด) | |
| - ์ ๊ณ ๊ธฐํ ๋ฐ ์ ์์ฌํญ ์๋ด | |
| ์ ๋ฌธ์ ์ด๊ณ ์น์ ํ ํค์ผ๋ก ๋ต๋ณํ์ธ์.""" | |
| def initialize_model(self, force_cpu=False): | |
| """HyperCLOVA X ๋ชจ๋ธ ์ด๊ธฐํ""" | |
| if not TRANSFORMERS_AVAILABLE: | |
| print("โ Transformers ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ค์นํด์ฃผ์ธ์: pip install transformers torch") | |
| return False | |
| if self.is_initialized: | |
| return True | |
| print("๐ HyperCLOVA X 1.5B ๋ชจ๋ธ ์ด๊ธฐํ ์ค...") | |
| try: | |
| # HuggingFace ํ ํฐ ํ์ธ | |
| hf_token = os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_HUB_TOKEN') | |
| if not hf_token: | |
| print("โ ๏ธ HuggingFace ํ ํฐ์ด ํ์ํฉ๋๋ค") | |
| return False | |
| # ๋๋ฐ์ด์ค ์ค์ | |
| if force_cpu or not torch.cuda.is_available(): | |
| self.device = 'cpu' | |
| print("๐ป CPU ๋ชจ๋๋ก ์คํ") | |
| else: | |
| self.device = 'cuda' | |
| print(f"๐ฅ GPU ๋ชจ๋๋ก ์คํ: {torch.cuda.get_device_name()}") | |
| model_name = "naver-hyperclovax/HyperCLOVAX-SEED-Text-Instruct-1.5B" | |
| # Config ๋ก๋ | |
| config = LlamaConfig.from_pretrained(model_name, token=hf_token) | |
| # Tokenizer ๋ก๋ | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| token=hf_token, | |
| legacy=False, | |
| add_eos_token=True, | |
| add_bos_token=True | |
| ) | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| # ๋ชจ๋ธ ๋ก๋ | |
| if self.device == 'cuda': | |
| # GPU: 8bit ์์ํ | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_8bit=True, | |
| llm_int8_enable_fp32_cpu_offload=True, | |
| llm_int8_threshold=6.0 | |
| ) | |
| self.model = LlamaForCausalLM.from_pretrained( | |
| model_name, | |
| config=config, | |
| quantization_config=quantization_config, | |
| torch_dtype=torch.float16, | |
| device_map="auto", | |
| token=hf_token, | |
| low_cpu_mem_usage=True | |
| ) | |
| else: | |
| # CPU: float32 | |
| self.model = LlamaForCausalLM.from_pretrained( | |
| model_name, | |
| config=config, | |
| torch_dtype=torch.float32, | |
| token=hf_token, | |
| low_cpu_mem_usage=True | |
| ) | |
| self.model = self.model.to('cpu') | |
| self.is_initialized = True | |
| print(f"โ HyperCLOVA X ๋ชจ๋ธ ์ด๊ธฐํ ์๋ฃ ({self.device})") | |
| return True | |
| except Exception as e: | |
| print(f"โ ๋ชจ๋ธ ์ด๊ธฐํ ์คํจ: {e}") | |
| return False | |
| def extract_property_info(self, user_input): | |
| """์ฌ์ฉ์ ์ ๋ ฅ์์ ๋ถ๋์ฐ ์ ๋ณด ์๋ ์ถ์ถ""" | |
| property_info = { | |
| 'property_type': '์ฃผํ', | |
| 'acquisition_type': '๋งค๋งค', | |
| 'acquisition_value': None, | |
| 'location': '', | |
| 'housing_list': [] | |
| } | |
| # ๊ธ์ก ์ถ์ถ (๋ค์ํ ๋จ์ ์ง์) | |
| amount_patterns = [ | |
| (r'(\d+(?:\.\d+)?)์ต', 100000000), | |
| (r'(\d+(?:,\d+)?)๋ง์', 10000), | |
| ] | |
| for pattern, multiplier in amount_patterns: | |
| amounts = re.findall(pattern, user_input) | |
| if amounts: | |
| amount_str = amounts[0].replace(',', '') | |
| property_info['acquisition_value'] = int(float(amount_str) * multiplier) | |
| break | |
| # ์ง์ญ ์ถ์ถ | |
| for area in self.tax_engine.adjustment_areas: | |
| area_name = area.replace('๊ตฌ', '') | |
| if area_name in user_input or area in user_input: | |
| property_info['location'] = f'์์ธํน๋ณ์ {area}' | |
| break | |
| # ์ฃผํ์ ์ถ์ถ | |
| housing_patterns = [r'(\d+)์ฃผํ', r'๊ธฐ์กด.*?(\d+).*?์ฃผํ', r'(\d+).*?๋ณด์ '] | |
| for pattern in housing_patterns: | |
| matches = re.findall(pattern, user_input) | |
| if matches: | |
| existing_count = int(matches[0]) - 1 | |
| for i in range(max(0, existing_count)): | |
| property_info['housing_list'].append({ | |
| 'id': f'existing_house_{i+1}', | |
| 'type': '์ฃผํ', | |
| 'acquisition_type': '๋งค๋งค', | |
| 'value': 500000000 | |
| }) | |
| break | |
| return property_info | |
| def format_tax_result(self, result, property_info): | |
| """๊ณ์ฐ ๊ฒฐ๊ณผ๋ฅผ ์ฌ์ฉ์ ์นํ์ ์ผ๋ก ํฌ๋งทํ """ | |
| if not result: | |
| return "๐ ์ ํํ ๊ณ์ฐ์ ์ํด ๋ถ๋์ฐ ๊ฐ๊ฒฉ์ ๊ตฌ์ฒด์ ์ผ๋ก ์๋ ค์ฃผ์๋ฉด ๋์์ด ๋ฉ๋๋ค." | |
| output = f"""๐ **์ทจ๋์ธ ๊ณ์ฐ ๊ฒฐ๊ณผ** | |
| โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| ๐ **์ทจ๋๊ฐ์ก**: {result['acquisition_value']:,}์ | |
| ๐๏ธ **์ด ์ฃผํ์**: {result['total_housing_count']}์ฃผํ | |
| ๐ **์กฐ์ ๋์์ง์ญ**: {'์' if result['is_adjustment_area'] else '์๋์ค'} | |
| ๐ฐ **์ธ์จ ์ ๋ณด** | |
| โข ๊ธฐ๋ณธ์ธ์จ: {result['base_rate']}โฐ ({result['base_rate']/10:.1f}%) | |
| โข ์ต์ข ์ธ์จ: {result['final_rate']}โฐ ({result['final_rate']/10:.1f}%) | |
| ๐ธ **์ทจ๋์ธ์ก**: {result['tax_amount']:,}์""" | |
| if result['heavy_tax_type']: | |
| output += f"\nโ ๏ธ **์ค๊ณผ์ธ ์ ์ฉ**: {result['heavy_tax_type']}" | |
| output += f"""\n\n๐ **๋ฒ๋ น ๊ทผ๊ฑฐ** | |
| โข ์ง๋ฐฉ์ธ๋ฒ ์ 11์กฐ (๋ถ๋์ฐ ์ทจ๋์ธ) | |
| โข ์ง๋ฐฉ์ธ๋ฒ ์ 13์กฐ (์ค๊ณผ์ธ) | |
| โข ์ง๋ฐฉ์ธ๋ฒ ์ํ๋ น ์ 28์กฐ์4 (์ฃผํ์ ์ฐ์ ) | |
| โข ์ ๊ณ ๊ธฐํ: ์ทจ๋์ผ๋ก๋ถํฐ 60์ผ ์ด๋ด""" | |
| return output | |
| def generate_ai_response(self, user_input, rag_context="", max_length=300): | |
| """AI ์๋ต ์์ฑ (RAG ์ปจํ ์คํธ ํฌํจ)""" | |
| if not self.is_initialized: | |
| print("โ ๏ธ ๋ชจ๋ธ์ด ์ด๊ธฐํ๋์ง ์์์ต๋๋ค. ์ด๊ธฐํ๋ฅผ ์๋ํฉ๋๋ค...") | |
| if not self.initialize_model(): | |
| return "โ AI ๋ชจ๋ธ ์ด๊ธฐํ์ ์คํจํ์ต๋๋ค." | |
| try: | |
| # 1. ์๋ ๊ณ์ฐ | |
| property_info = self.extract_property_info(user_input) | |
| tax_result = None | |
| tax_summary = "" | |
| if property_info.get('acquisition_value'): | |
| property_info['acquisition_date'] = datetime.now().strftime('%Y-%m-%d') | |
| tax_result = self.tax_engine.calculate_comprehensive_tax(property_info) | |
| tax_summary = self.format_tax_result(tax_result, property_info) | |
| # 2. AI ๋ต๋ณ ์์ฑ์ ์ํ ํ๋กฌํํธ ๊ตฌ์ฑ | |
| context_parts = [] | |
| if rag_context: | |
| context_parts.append(f"์ฐธ๊ณ ์๋ฃ:\n{rag_context}") | |
| if tax_summary: | |
| context_parts.append(f"์๋ ๊ณ์ฐ ๊ฒฐ๊ณผ:\n{tax_summary}") | |
| context_prompt = f"""{self.system_prompt} | |
| ์ฌ์ฉ์ ์ง๋ฌธ: {user_input} | |
| {chr(10).join(context_parts)} | |
| ์ ์ ๋ณด๋ฅผ ๋ฐํ์ผ๋ก ์ ๋ฌธ๊ฐ๋ก์ ์์ธํ๊ณ ์ดํดํ๊ธฐ ์ฌ์ด ์ค๋ช ์ ์ ๊ณตํด์ฃผ์ธ์:""" | |
| # 3. ํ ํฌ๋์ด์ง | |
| inputs = self.tokenizer( | |
| context_prompt, | |
| return_tensors="pt", | |
| max_length=1800, | |
| truncation=True | |
| ).to(self.model.device) | |
| # 4. AI ์๋ต ์์ฑ | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| inputs.input_ids, | |
| attention_mask=inputs.attention_mask, | |
| max_new_tokens=max_length, | |
| do_sample=True, | |
| temperature=0.6, | |
| top_p=0.85, | |
| repetition_penalty=1.15, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id | |
| ) | |
| # 5. ์๋ต ๋์ฝ๋ฉ | |
| generated_response = self.tokenizer.decode( | |
| outputs[0][inputs.input_ids.shape[1]:], | |
| skip_special_tokens=True | |
| ).strip() | |
| # 6. ์ต์ข ์๋ต ๊ตฌ์ฑ | |
| final_response = "" | |
| if tax_summary: | |
| final_response += f"{tax_summary}\n\n" | |
| final_response += f"""๐ค **AI ์ ๋ฌธ๊ฐ ์์ธ ์ค๋ช ** | |
| โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| {generated_response} | |
| --- | |
| ๐ก **์ถ๊ฐ ๋ฌธ์๋ ๋ค๋ฅธ ์ํฉ์ ๋ํ ์๋ด์ด ํ์ํ์๋ฉด ์ธ์ ๋ ๋ง์ํด ์ฃผ์ธ์!**""" | |
| return final_response | |
| except Exception as e: | |
| error_response = f"โ AI ์๋ต ์์ฑ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {str(e)}\n\n" | |
| if tax_summary: | |
| return error_response + tax_summary | |
| return error_response + "๊ธฐ๋ณธ์ ์ธ ์ทจ๋์ธ ์ ๋ณด๋ ์ง๋ฐฉ์ธ๋ฒ ์ 11์กฐ๋ฅผ ์ฐธ๊ณ ํ์ธ์." | |
| def process_with_rag(self, user_input, rag_documents): | |
| """RAG ๋ฌธ์์ ํจ๊ป ์ฒ๋ฆฌ""" | |
| # RAG ๋ฌธ์๋ฅผ ์ปจํ ์คํธ๋ก ๋ณํ | |
| if rag_documents and len(rag_documents) > 0: | |
| rag_context = "\n\n".join([doc.get('content', '') for doc in rag_documents]) | |
| else: | |
| rag_context = "" | |
| return self.generate_ai_response(user_input, rag_context) | |
| # ์ ์ญ ์ธ์คํด์ค | |
| _llm_processor = None | |
| def get_llm_processor(): | |
| """LLM ํ๋ก์ธ์ ์ฑ๊ธํด ์ธ์คํด์ค ๋ฐํ""" | |
| global _llm_processor | |
| if _llm_processor is None: | |
| _llm_processor = LLMProcessor() | |
| return _llm_processor | |
| def is_llm_available(): | |
| """LLM ์์คํ ์ฌ์ฉ ๊ฐ๋ฅ ์ฌ๋ถ ํ์ธ""" | |
| return TRANSFORMERS_AVAILABLE and torch.cuda.is_available() | |
| def process_with_llm(user_input, rag_documents=None): | |
| """ํธ์ ํจ์: RAG ๊ฒฐ๊ณผ์ ํจ๊ป LLM ์ฒ๋ฆฌ""" | |
| processor = get_llm_processor() | |
| if rag_documents: | |
| return processor.process_with_rag(user_input, rag_documents) | |
| else: | |
| return processor.generate_ai_response(user_input) | |
| if __name__ == "__main__": | |
| # ํ ์คํธ ์ฝ๋ | |
| print("๐งช LLM ํ๋ก์ธ์ ํ ์คํธ") | |
| processor = LLMProcessor() | |
| # ์ด๊ธฐํ ํ ์คํธ | |
| if processor.initialize_model(force_cpu=True): | |
| print("โ ๋ชจ๋ธ ์ด๊ธฐํ ์ฑ๊ณต") | |
| # ๊ฐ๋จํ ํ ์คํธ | |
| test_input = "๊ฐ๋จ๊ตฌ 10์ต์ ์ํํธ 3์ฃผํ์ ์ทจ๋์ธ" | |
| response = processor.generate_ai_response(test_input) | |
| print(f"์๋ต: {response[:100]}...") | |
| else: | |
| print("โ ๋ชจ๋ธ ์ด๊ธฐํ ์คํจ") |