Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import uuid | |
| import threading | |
| import pandas as pd | |
| import numpy as np | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| import torch | |
| # Global model cache | |
| MODEL_CACHE = { | |
| "model": None, | |
| "tokenizer": None, | |
| "init_lock": threading.Lock() | |
| } | |
| # Create directories for user data | |
| os.makedirs("user_data", exist_ok=True) | |
| def initialize_model_once(): | |
| """Initialize Phi-4-mini model once""" | |
| with MODEL_CACHE["init_lock"]: | |
| if MODEL_CACHE["model"] is None: | |
| # Load Phi-4-mini model | |
| MODEL_CACHE["tokenizer"] = AutoTokenizer.from_pretrained("microsoft/Phi-4-mini-instruct") | |
| MODEL_CACHE["model"] = AutoModelForCausalLM.from_pretrained( | |
| "microsoft/Phi-4-mini-instruct", | |
| torch_dtype=torch.float16, | |
| device_map="auto" | |
| ) | |
| return MODEL_CACHE["model"], MODEL_CACHE["tokenizer"] | |
| def generate_pandas_code(prompt, max_new_tokens=512): | |
| """Generate Python code using the Phi-4-mini model""" | |
| model, tokenizer = initialize_model_once() | |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| temperature=0.2, | |
| top_p=0.9, | |
| ) | |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Extract only the generated part, removing the input prompt | |
| generated_text = response[len(tokenizer.decode(inputs.input_ids[0], skip_special_tokens=True)):] | |
| # Extract code between ```python and ``` if present | |
| import re | |
| code_match = re.search(r'```python\s*(.*?)\s*```', generated_text, re.DOTALL) | |
| if code_match: | |
| return code_match.group(1).strip() | |
| else: | |
| # Return the raw generated text as fallback | |
| return generated_text.strip() | |
| class ChatBot: | |
| def __init__(self, session_id): | |
| self.session_id = session_id | |
| self.csv_info = None | |
| self.df = None | |
| self.chat_history = [] | |
| self.user_dir = f"user_data/{session_id}" | |
| os.makedirs(self.user_dir, exist_ok=True) | |
| def process_file(self, file): | |
| if file is None: | |
| return "Mohon upload file CSV terlebih dahulu." | |
| try: | |
| # Handle file from Gradio | |
| file_path = file.name if hasattr(file, 'name') else str(file) | |
| file_name = os.path.basename(file_path) | |
| # Load and save CSV directly with pandas | |
| try: | |
| self.df = pd.read_csv(file_path) | |
| user_file_path = f"{self.user_dir}/uploaded.csv" | |
| self.df.to_csv(user_file_path, index=False) | |
| # Store CSV info | |
| self.csv_info = { | |
| "filename": file_name, | |
| "rows": self.df.shape[0], | |
| "columns": self.df.shape[1], | |
| "column_names": self.df.columns.tolist(), | |
| } | |
| print(f"CSV verified: {self.df.shape[0]} rows, {len(self.df.columns)} columns") | |
| except Exception as e: | |
| return f"Error membaca CSV: {str(e)}" | |
| # Add file info to chat history | |
| file_info = f"CSV berhasil dimuat: {file_name} dengan {self.df.shape[0]} baris dan {len(self.df.columns)} kolom. Kolom: {', '.join(self.df.columns.tolist())}" | |
| self.chat_history.append(("System", file_info)) | |
| return f"File CSV '{file_name}' berhasil diproses! Anda dapat mulai mengajukan pertanyaan tentang data." | |
| except Exception as e: | |
| import traceback | |
| print(traceback.format_exc()) | |
| return f"Error pemrosesan file: {str(e)}" | |
| def execute_query(self, code): | |
| """Safely execute pandas code""" | |
| try: | |
| # Create local context with the dataframe | |
| local_vars = {"df": self.df, "pd": pd, "np": np} | |
| # Execute code with timeout | |
| exec(code, {"pd": pd, "np": np}, local_vars) | |
| # Get result | |
| if "result" in local_vars: | |
| return local_vars["result"] | |
| else: | |
| # If no result variable, find the last variable created | |
| last_var = None | |
| for var_name, var_value in local_vars.items(): | |
| if var_name not in ["df", "pd", "np"] and var_name != "__builtins__": | |
| last_var = var_value | |
| if last_var is not None: | |
| return last_var | |
| else: | |
| return self.df # Return the dataframe as default | |
| except Exception as e: | |
| raise Exception(f"Gagal menjalankan kode: {str(e)}") | |
| def chat(self, message, history): | |
| if self.df is None: | |
| return "Mohon upload file CSV terlebih dahulu." | |
| try: | |
| # Handle common metadata questions directly to save resources | |
| message_lower = message.lower() | |
| if "nama file" in message_lower: | |
| return f"Nama file CSV adalah: {self.csv_info['filename']}" | |
| elif "nama kolom" in message_lower: | |
| return f"Kolom dalam CSV: {', '.join(self.csv_info['column_names'])}" | |
| elif "jumlah baris" in message_lower or "berapa baris" in message_lower: | |
| return f"Jumlah baris dalam CSV: {self.csv_info['rows']}" | |
| elif "jumlah kolom" in message_lower or "berapa kolom" in message_lower: | |
| return f"Jumlah kolom dalam CSV: {self.csv_info['columns']}" | |
| # Get sample data for context | |
| sample_df = self.df.head(5) | |
| sample_str = sample_df.to_string() | |
| data_types = {col: str(dtype) for col, dtype in self.df.dtypes.items()} | |
| # Create prompt for LLM | |
| prompt = f""" | |
| You are a data analyst that translates natural language questions into Python pandas code. | |
| DataFrame information: | |
| - Column names: {', '.join(self.csv_info['column_names'])} | |
| - Data types: {data_types} | |
| - Number of rows: {self.csv_info['rows']} | |
| - Sample data: | |
| {sample_str} | |
| User question: {message} | |
| Write a short Python code using pandas to answer the user's question. | |
| The code must use the 'df' variable as the DataFrame name. | |
| The code should assign the final result to a variable named 'result'. | |
| Only return the Python code without any explanation. | |
| ```python | |
| """ | |
| # Generate code with Phi-4 | |
| try: | |
| code = generate_pandas_code(prompt) | |
| # Add result variable if not present | |
| if not any(line.strip().startswith("result =") for line in code.split("\n")): | |
| if code.startswith("df."): | |
| code = "result = " + code | |
| elif not "result" in code: | |
| code = "result = " + code | |
| except Exception as e: | |
| print(f"Error generating code: {str(e)}") | |
| # Fallback for basic questions | |
| if "rata-rata" in message_lower or "mean" in message_lower: | |
| code = "result = df.describe()" | |
| elif "jumlah" in message_lower or "count" in message_lower: | |
| code = "result = df.count()" | |
| else: | |
| return f"Maaf, saya tidak dapat menghasilkan kode untuk pertanyaan ini. Error: {str(e)}" | |
| # Execute the code and get result | |
| try: | |
| print(f"Executing code: {code}") | |
| result = self.execute_query(code) | |
| # Check if result is relevant to the question | |
| if result is None or (isinstance(result, pd.DataFrame) and result.empty): | |
| return "Maaf, kita tidak bisa mendapatkan informasi terkait pertanyaan anda di dalam file CSV anda." | |
| # Format result based on its type | |
| if isinstance(result, pd.DataFrame): | |
| if len(result) > 5: | |
| result_str = result.head(5).to_string() + f"\n\n[Total {len(result)} baris]" | |
| else: | |
| result_str = result.to_string() | |
| elif isinstance(result, (pd.Series, np.ndarray)): | |
| if len(result) > 10: | |
| result_str = str(result[:10]) + f"\n\n[Total {len(result)} item]" | |
| else: | |
| result_str = str(result) | |
| elif hasattr(result, "__len__") and not isinstance(result, (str, int, float)): | |
| result_str = str(result) | |
| if len(result) > 0: | |
| result_str += f"\n\n[Total {len(result)} item]" | |
| else: | |
| result_str = str(result) | |
| # Format response | |
| response = f"Hasil analisis:\n\n{result_str}\n\nKode yang dijalankan:\n```python\n{code}\n```" | |
| self.chat_history.append((message, response)) | |
| return response | |
| except Exception as e: | |
| return f"Error saat menganalisis data: {str(e)}\n\nKode yang dicoba:\n```python\n{code}\n```" | |
| except Exception as e: | |
| import traceback | |
| print(traceback.format_exc()) | |
| return f"Error: {str(e)}" | |
| # UI Code (sama seperti sebelumnya) | |
| def create_gradio_interface(): | |
| with gr.Blocks(title="CSV Data Analyzer") as interface: | |
| session_id = gr.State(lambda: str(uuid.uuid4())) | |
| chatbot_state = gr.State(lambda: None) | |
| gr.HTML("<h1 style='text-align: center;'>CSV Data Analyzer</h1>") | |
| gr.HTML("<h3 style='text-align: center;'>Ajukan pertanyaan tentang data CSV Anda</h3>") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File( | |
| label="Upload CSV Anda", | |
| file_types=[".csv"] | |
| ) | |
| process_button = gr.Button("Proses CSV") | |
| with gr.Accordion("Contoh Pertanyaan", open=False): | |
| gr.Markdown(""" | |
| - "Berapa jumlah data yang memiliki nilai Glucose di atas 150?" | |
| - "Hitung nilai rata-rata setiap kolom numerik" | |
| - "Berapa banyak data untuk setiap kelompok dalam kolom Outcome?" | |
| - "Berapa jumlah baris dalam dataset ini?" | |
| - "Berapa jumlah kolom dalam dataset ini?" | |
| """) | |
| with gr.Column(scale=2): | |
| chatbot_interface = gr.Chatbot( | |
| label="Riwayat Chat", | |
| height=400 | |
| ) | |
| message_input = gr.Textbox( | |
| label="Ketik pertanyaan Anda", | |
| placeholder="Contoh: Berapa jumlah data yang memiliki nilai Glucose di atas 150?", | |
| lines=2 | |
| ) | |
| submit_button = gr.Button("Kirim") | |
| clear_button = gr.Button("Bersihkan Chat") | |
| # Handler functions | |
| def handle_process_file(file, sess_id): | |
| chatbot = ChatBot(sess_id) | |
| result = chatbot.process_file(file) | |
| return chatbot, [(None, result)] | |
| process_button.click( | |
| fn=handle_process_file, | |
| inputs=[file_input, session_id], | |
| outputs=[chatbot_state, chatbot_interface] | |
| ) | |
| def user_message_submitted(message, history, chatbot, sess_id): | |
| history = history + [(message, None)] | |
| return history, "", chatbot, sess_id | |
| def bot_response(history, chatbot, sess_id): | |
| if chatbot is None: | |
| chatbot = ChatBot(sess_id) | |
| history[-1] = (history[-1][0], "Mohon upload file CSV terlebih dahulu.") | |
| return chatbot, history | |
| user_message = history[-1][0] | |
| response = chatbot.chat(user_message, history[:-1]) | |
| history[-1] = (user_message, response) | |
| return chatbot, history | |
| submit_button.click( | |
| fn=user_message_submitted, | |
| inputs=[message_input, chatbot_interface, chatbot_state, session_id], | |
| outputs=[chatbot_interface, message_input, chatbot_state, session_id] | |
| ).then( | |
| fn=bot_response, | |
| inputs=[chatbot_interface, chatbot_state, session_id], | |
| outputs=[chatbot_state, chatbot_interface] | |
| ) | |
| message_input.submit( | |
| fn=user_message_submitted, | |
| inputs=[message_input, chatbot_interface, chatbot_state, session_id], | |
| outputs=[chatbot_interface, message_input, chatbot_state, session_id] | |
| ).then( | |
| fn=bot_response, | |
| inputs=[chatbot_interface, chatbot_state, session_id], | |
| outputs=[chatbot_state, chatbot_interface] | |
| ) | |
| def handle_clear_chat(chatbot): | |
| if chatbot is not None: | |
| chatbot.chat_history = [] | |
| return chatbot, [] | |
| clear_button.click( | |
| fn=handle_clear_chat, | |
| inputs=[chatbot_state], | |
| outputs=[chatbot_state, chatbot_interface] | |
| ) | |
| return interface | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| demo = create_gradio_interface() | |
| demo.launch(share=True) |