Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Gemini AI analysis module for deceptive pattern detection. | |
| Updated to match gemini_prompting_to_make_dp_csvs_genai.py structure. | |
| """ | |
| import pandas as pd | |
| import os | |
| import time | |
| import csv | |
| from io import StringIO | |
| import json | |
| from glob import glob | |
| from tqdm.auto import tqdm | |
| import gradio as gr | |
| try: | |
| from google import genai | |
| from google.genai import types | |
| from google.genai.errors import ServerError | |
| GENAI_AVAILABLE = True | |
| except ImportError: | |
| GENAI_AVAILABLE = False | |
| def check_csv_format(df: pd.DataFrame) -> str: | |
| """ | |
| Check if the csv file generated is in the correct format as is expected. | |
| Expectation is that the csv file has 10 columns and the index is integer. | |
| It is also expected that all the cells in the csv file are strings and not null. | |
| If the csv file has only one column, it is considered as a bad file. | |
| Args: | |
| df: pandas DataFrame object that is read from the csv file. | |
| Returns: | |
| str: A string that indicates the status of the csv | |
| """ | |
| if 1 < len(df.columns) < 10: | |
| return "The CSV file has less than 10 columns." | |
| elif len(df.columns) > 10: | |
| return "The CSV file has more than 10 columns." | |
| elif not isinstance(df.index, pd.core.indexes.range.RangeIndex): | |
| return "The CSV file has an incorrect index. Probably issue with the PIPE (|) separation variable." | |
| elif len(df.columns) == 1: | |
| return "The CSV file has only one column." | |
| elif 'Text' in df.columns and not isinstance(df.Text.dtype, object): | |
| return "The CSV file has non-string values in the Text column." | |
| else: | |
| return "The CSV file is in the correct format." | |
| # analyze_with_gemini function removed - using few_shots_generator instead | |
| def few_shots_generator(eval_dir='./eval', files=None, api_key=None): | |
| """ | |
| Generator version of few_shots that yields notifications in real-time. | |
| Yields: | |
| tuple: (status, message) where status is 'notification' or 'result' | |
| """ | |
| print(f"[CONSOLE] few_shots_generator: Starting analysis...") | |
| print(f"[CONSOLE] eval_dir: {eval_dir}") | |
| print(f"[CONSOLE] files: {files}") | |
| print(f"[CONSOLE] API key provided: {'Yes' if api_key else 'No'}") | |
| if not api_key: | |
| print(f"[CONSOLE] No API key provided, returning None") | |
| yield ('notification', "β No API key provided for analysis") | |
| raise gr.Error("No API key provided for analysis") | |
| # Read system prompt from gradio-demo directory | |
| try: | |
| system_prompt_path = os.path.join(os.path.dirname(__file__), '..', 'system_prompt.txt') | |
| with open(system_prompt_path, 'r', encoding='utf-8') as f: | |
| textsi_1 = f.read() | |
| print(f"[CONSOLE] System prompt loaded from: {system_prompt_path}") | |
| except Exception as e: | |
| print(f"[CONSOLE] Failed to load system prompt: {e}") | |
| yield ('notification', "β Failed to load system prompt") | |
| raise gr.Error(f"Failed to load system prompt: {str(e)}") | |
| os.makedirs(f"{eval_dir}/gemini_fs", exist_ok=True) | |
| print(f"[CONSOLE] Created gemini_fs directory: {eval_dir}/gemini_fs") | |
| try: | |
| client = genai.Client(api_key=api_key) | |
| print(f"[CONSOLE] Gemini client initialized") | |
| except Exception as e: | |
| error_msg = f"β Failed to initialize Gemini client: {str(e)}" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Client initialization failed: {e}") | |
| raise gr.Error(f"Failed to initialize Gemini client: {str(e)}") | |
| if files is None: | |
| files = glob(os.path.join(f"{eval_dir}/csv_with_yolo", "*.csv")) | |
| if not isinstance(files, list): | |
| files = [files] | |
| print(f"[CONSOLE] Processing {len(files)} files") | |
| for f in files: | |
| print(f"[CONSOLE] Processing file: {f}") | |
| try: | |
| data = pd.read_csv(f, index_col=0) | |
| data.index = data.index.str.replace('|', '', regex=False) | |
| data = data.to_csv() | |
| print(f"[CONSOLE] Data loaded and converted to CSV format") | |
| except Exception as e: | |
| print(f"[CONSOLE] Failed to read the file: {f}, error: {e}") | |
| raise gr.Error(f"Failed to read input file: {str(e)}") | |
| try_cnt = 0 | |
| while try_cnt < 2: | |
| try: | |
| try_cnt += 1 | |
| yield ('notification', f"π€ Calling Gemini AI for pattern analysis (attempt {try_cnt})...") | |
| if try_cnt == 1: | |
| gr.Info("π€ Starting Gemini analysis...") | |
| print(f"[CONSOLE] Attempt {try_cnt} - Calling Gemini API...") | |
| response = client.models.generate_content( | |
| model='gemini-2.5-pro', | |
| contents=data, | |
| config=types.GenerateContentConfig( | |
| system_instruction=textsi_1, | |
| temperature=0, | |
| top_p=0.1, | |
| top_k=1, | |
| max_output_tokens=12288, | |
| safety_settings=[ | |
| types.SafetySetting(category='HARM_CATEGORY_HARASSMENT', threshold='BLOCK_NONE'), | |
| types.SafetySetting(category='HARM_CATEGORY_HATE_SPEECH', threshold='BLOCK_NONE'), | |
| types.SafetySetting(category='HARM_CATEGORY_SEXUALLY_EXPLICIT', threshold='BLOCK_NONE'), | |
| types.SafetySetting(category='HARM_CATEGORY_DANGEROUS_CONTENT', threshold='BLOCK_NONE'), | |
| types.SafetySetting(category='HARM_CATEGORY_CIVIC_INTEGRITY', threshold='BLOCK_NONE') | |
| ] | |
| ) | |
| ) | |
| yield ('notification', f"β Gemini API call successful! Processing results...") | |
| gr.Info("β Gemini analysis successful!") | |
| print(f"[CONSOLE] Gemini API call successful") | |
| break | |
| except ServerError as e: | |
| if try_cnt > 3: | |
| error_msg = f"β Failed to get response after {try_cnt} attempts" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Failed to get response for {f} after {try_cnt} attempts") | |
| raise gr.Error(f"Analysis failed after {try_cnt} attempts") | |
| wait_msg = f"β οΈ Server error occurred. Retrying attempt {try_cnt + 1}/2 in 60 seconds..." | |
| yield ('notification', wait_msg) | |
| gr.Warning(f"β οΈ Server error. Retrying in 60 seconds... (attempt {try_cnt + 1}/2)") | |
| print(f"[CONSOLE] Server error: {e.message}, sleeping for 60 seconds") | |
| print(e) | |
| time.sleep(60) | |
| continue | |
| except Exception as e: | |
| # Handle non-server errors (API key issues, quota errors, etc.) | |
| error_msg = f"β Gemini API error: {str(e.message)}" | |
| print(f"[CONSOLE] Non-server error in Gemini API call: {e}") | |
| yield 'notification', error_msg | |
| raise gr.Error(f"Gemini API error: {str(e.message)}") | |
| try: | |
| # Process the response | |
| _f = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f)) | |
| df = pd.read_csv(StringIO(response.text.replace("```csv", '').replace("```", '').strip()), sep='|') | |
| csv_with_yolo = pd.read_csv(f, index_col=0) | |
| gemini_cols = df[["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]] | |
| csv_with_yolo.reset_index(inplace=True) | |
| final_df = pd.concat([csv_with_yolo, gemini_cols], axis=1) | |
| final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL) | |
| print(f"[CONSOLE] Results saved to: {_f}") | |
| # Check if thinking is needed (if any deceptive patterns found) | |
| if set(final_df['Deceptive Design Category'].tolist()) != {'non-deceptive'}: | |
| yield ('notification', "π§ Deceptive patterns detected! Running advanced thinking analysis...") | |
| gr.Info("π§ Deceptive patterns found! Running advanced analysis...") | |
| print(f"[CONSOLE] Deceptive patterns found, running thinking analysis...") | |
| # Use generator version of thinking | |
| thinking_result = None | |
| for thinking_status, thinking_data in thinking_generator(eval_dir, files=[_f], api_key=api_key): | |
| if thinking_status == 'notification': | |
| yield ('notification', thinking_data) | |
| elif thinking_status == 'result': | |
| thinking_result = thinking_data | |
| break | |
| if thinking_result is not None: | |
| yield ('notification', "β Advanced thinking analysis completed successfully!") | |
| gr.Info("β Advanced analysis completed!") | |
| print(f"[CONSOLE] Thinking analysis completed, using refined results") | |
| final_df = thinking_result | |
| else: | |
| yield ('notification', "β οΈ Advanced thinking analysis failed, using original results") | |
| gr.Warning("β οΈ Advanced analysis failed, using basic results") | |
| print(f"[CONSOLE] Thinking analysis failed, using original results") | |
| else: | |
| yield ('notification', "β No deceptive patterns found, analysis complete!") | |
| gr.Info("β No deceptive patterns detected!") | |
| print(f"[CONSOLE] No deceptive patterns found, skipping thinking analysis") | |
| yield 'result', final_df | |
| return | |
| except Exception as e: | |
| print(f"[CONSOLE] Error parsing with pipe separator, trying comma: {e}") | |
| try: | |
| df = pd.read_csv(StringIO(response.text.replace("```csv", '').replace("```", '').strip()), sep=',') | |
| csv_with_yolo = pd.read_csv(f, index_col=0) | |
| gemini_cols = df[["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]] | |
| csv_with_yolo.reset_index(inplace=True) | |
| final_df = pd.concat([csv_with_yolo, gemini_cols], axis=1) | |
| final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL) | |
| print(f"[CONSOLE] Results saved to: {_f} (comma separated)") | |
| # Check if thinking is needed | |
| if set(final_df['Deceptive Design Category'].tolist()) != {'non-deceptive'}: | |
| yield ('notification', "π§ Deceptive patterns detected! Running advanced thinking analysis...") | |
| gr.Info("π§ Deceptive patterns found! Running advanced analysis...") | |
| print(f"[CONSOLE] Deceptive patterns found, running thinking analysis...") | |
| # Use generator version of thinking | |
| thinking_result = None | |
| for thinking_status, thinking_data in thinking_generator(eval_dir, files=[_f], api_key=api_key): | |
| if thinking_status == 'notification': | |
| yield ('notification', thinking_data) | |
| elif thinking_status == 'result': | |
| thinking_result = thinking_data | |
| break | |
| if thinking_result is not None: | |
| yield ('notification', "β Advanced thinking analysis completed successfully!") | |
| gr.Info("β Advanced analysis completed!") | |
| print(f"[CONSOLE] Thinking analysis completed, using refined results") | |
| final_df = thinking_result | |
| else: | |
| yield ('notification', "β οΈ Advanced thinking analysis failed, using original results") | |
| gr.Warning("β οΈ Advanced analysis failed, using basic results") | |
| print(f"[CONSOLE] Thinking analysis failed, using original results") | |
| else: | |
| yield ('notification', "β No deceptive patterns found, analysis complete!") | |
| gr.Info("β No deceptive patterns detected!") | |
| print(f"[CONSOLE] No deceptive patterns found, skipping thinking analysis") | |
| yield ('result', final_df) | |
| return | |
| except Exception as e2: | |
| error_msg = f"β Error parsing Gemini response with both separators: {str(e2)}" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] FEW_SHOT Error with both separators: {e2}") | |
| try: | |
| error_file = _f.replace(".csv", "e1.txt") | |
| with open(error_file, 'w') as _fs: | |
| _fs.write(response.text) | |
| print(f"[CONSOLE] Error response saved to: {error_file}") | |
| except Exception as e3: | |
| print(f"[CONSOLE] Failed to save error response: {e3}") | |
| raise gr.Error(f"Failed to parse response: {str(e2)}") | |
| yield ('result', None) | |
| def thinking_generator(eval_dir="./eval", files=None, api_key=None): | |
| """ | |
| Generator version of thinking that yields notifications in real-time. | |
| """ | |
| print(f"[CONSOLE] thinking_generator: Starting thinking analysis...") | |
| print(f"[CONSOLE] eval_dir: {eval_dir}") | |
| print(f"[CONSOLE] files: {files}") | |
| if not api_key: | |
| print(f"[CONSOLE] No API key provided for thinking analysis") | |
| raise gr.Error("No API key provided for thinking analysis") | |
| # Read thinking system prompt from gradio-demo directory | |
| try: | |
| thinking_prompt_path = os.path.join(os.path.dirname(__file__), '..', 'system_prompt_thinking.txt') | |
| with open(thinking_prompt_path, 'r', encoding='utf-8') as f: | |
| textsi_1 = f.read() | |
| print(f"[CONSOLE] Thinking system prompt loaded from: {thinking_prompt_path}") | |
| except Exception as e: | |
| print(f"[CONSOLE] Failed to load thinking system prompt: {e}") | |
| raise gr.Error(f"Failed to load thinking system prompt: {str(e)}") | |
| os.makedirs(f"{eval_dir}/gemini_fs", exist_ok=True) | |
| try: | |
| client = genai.Client(api_key=api_key, http_options={'api_version':'v1beta'}) | |
| print(f"[CONSOLE] Thinking client initialized with v1beta") | |
| except Exception as e: | |
| error_msg = f"β Failed to initialize thinking client: {str(e)}" | |
| print(f"[CONSOLE] Thinking client initialization failed: {e}") | |
| raise gr.Error(f"Failed to initialize thinking client: {str(e)}") | |
| if files is None: | |
| files = glob(os.path.join(f"{eval_dir}/gemini_fs", "*.csv")) | |
| if not isinstance(files, list): | |
| files = [files] | |
| print(f"[CONSOLE] Processing {len(files)} files for thinking analysis") | |
| for f in files: | |
| print(f"[CONSOLE] Thinking analysis for file: {f}") | |
| try: | |
| data = pd.read_csv(f, index_col=0) | |
| data.index = data.index.str.replace('|', '', regex=False) | |
| data = data.to_csv() | |
| print(f"[CONSOLE] Data prepared for thinking analysis") | |
| # Make API call to Gemini with retry logic for thinking analysis | |
| try_cnt = 0 | |
| response = None | |
| while try_cnt < 2: | |
| try: | |
| try_cnt += 1 | |
| yield ('notification', f"π§ Running advanced thinking analysis (attempt {try_cnt})...") | |
| print(f"[CONSOLE] Attempt {try_cnt} - Calling Gemini API for thinking...") | |
| response = client.models.generate_content( | |
| model='gemini-2.5-pro', | |
| contents=data, | |
| config=types.GenerateContentConfig( | |
| system_instruction=textsi_1, | |
| temperature=0, | |
| top_p=0.1, | |
| top_k=1, | |
| max_output_tokens=65536, | |
| safety_settings=[ | |
| types.SafetySetting(category='HARM_CATEGORY_HARASSMENT', threshold='BLOCK_NONE'), | |
| types.SafetySetting(category='HARM_CATEGORY_HATE_SPEECH', threshold='BLOCK_NONE'), | |
| types.SafetySetting(category='HARM_CATEGORY_SEXUALLY_EXPLICIT', threshold='BLOCK_NONE'), | |
| types.SafetySetting(category='HARM_CATEGORY_DANGEROUS_CONTENT', threshold='BLOCK_NONE'), | |
| types.SafetySetting(category='HARM_CATEGORY_CIVIC_INTEGRITY', threshold='BLOCK_NONE') | |
| ] | |
| ) | |
| ) | |
| yield ('notification', f"β Advanced thinking analysis API call successful!") | |
| print(f"[CONSOLE] Thinking API call successful") | |
| break | |
| except ServerError as e: | |
| if try_cnt > 3: | |
| error_msg = f"β Failed to complete thinking analysis after {try_cnt} attempts" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Failed to get thinking response after {try_cnt} attempts") | |
| raise gr.Error(f"Advanced analysis failed after {try_cnt} attempts") | |
| wait_msg = f"β οΈ Server error in thinking analysis. Retrying attempt {try_cnt + 1}/2 in 60 seconds..." | |
| yield ('notification', wait_msg) | |
| gr.Warning(f"β οΈ Thinking server error. Retrying in 60s... (attempt {try_cnt + 1}/2)") | |
| print(f"[CONSOLE] Server error in thinking analysis: {e.message}, sleeping for 60 seconds") | |
| print(e) | |
| time.sleep(60) | |
| continue | |
| except Exception as e: | |
| # Handle non-server errors in thinking analysis | |
| error_msg = f"β Thinking analysis API error: {str(e)}" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Non-server error in thinking API call: {e}") | |
| raise gr.Error(f"Thinking analysis API error: {str(e)}") | |
| output_csv = "" | |
| thought_txt = "" | |
| for part in response.candidates[0].content.parts: | |
| if part.thought == True: | |
| thought_txt = part.text | |
| print(f"[CONSOLE] Extracted thought text ({len(thought_txt)} chars)") | |
| else: | |
| output_csv = part.text | |
| print(f"[CONSOLE] Extracted output CSV ({len(output_csv)} chars)") | |
| _f = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f)) | |
| _f_thought = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f).replace(".csv", "_thinking.txt")) | |
| # Save thinking text | |
| with open(_f_thought, 'w', encoding='utf-8') as _f_thought_file: | |
| _f_thought_file.write(thought_txt) | |
| print(f"[CONSOLE] Thinking text saved to: {_f_thought}") | |
| # Parse and save updated CSV with similar process as main analysis | |
| try: | |
| # Parse the thinking response CSV | |
| df_thinking = pd.read_csv(StringIO(output_csv), sep='|') | |
| # Read the original CSV file to get the base data | |
| csv_with_yolo = pd.read_csv(f, index_col=0).drop(columns=["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"], errors='ignore') | |
| # Extract the thinking analysis columns (similar to main process) | |
| thinking_cols = df_thinking[["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]] | |
| # Reset index and concatenate with original data | |
| csv_with_yolo.reset_index(inplace=True) | |
| final_df = pd.concat([csv_with_yolo, thinking_cols], axis=1) | |
| # Save the updated dataframe | |
| final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL) | |
| print(f"[CONSOLE] Thinking results saved to: {_f} (pipe separated)") | |
| yield ('result', final_df) # Return the updated dataframe | |
| return | |
| except Exception as e: | |
| print(f"[CONSOLE] Error with pipe separator, trying comma: {e}") | |
| try: | |
| # Parse the thinking response CSV with comma separator | |
| df_thinking = pd.read_csv(StringIO(output_csv), sep=',') | |
| # Read the original CSV file to get the base data | |
| csv_with_yolo = pd.read_csv(f, index_col=0).drop(columns=["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"], errors='ignore') | |
| # Extract the thinking analysis columns (similar to main process) | |
| thinking_cols = df_thinking[["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]] | |
| # Reset index and concatenate with original data | |
| csv_with_yolo.reset_index(inplace=True) | |
| final_df = pd.concat([csv_with_yolo, thinking_cols], axis=1) | |
| # Save the updated dataframe | |
| final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL) | |
| print(f"[CONSOLE] Thinking results saved to: {_f} (comma separated)") | |
| yield ('result', final_df) # Return the updated dataframe | |
| return | |
| except Exception as e2: | |
| error_msg = f"β Error parsing thinking analysis response with both separators: {str(e2)}" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] THINKING ERROR with both separators: {e2}") | |
| try: | |
| error_file = _f.replace(".csv", "e2.txt") | |
| with open(error_file, 'w') as _fs: | |
| _fs.write(output_csv) | |
| print(f"[CONSOLE] Thinking error response saved to: {error_file}") | |
| except Exception as e3: | |
| print(f"[CONSOLE] Failed to save thinking error response: {e3}") | |
| raise gr.Error(f"Failed to parse thinking response: {str(e2)}") | |
| except Exception as e: | |
| error_msg = f"β Error in thinking analysis: {str(e)}" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Error in thinking analysis for {f}: {e}") | |
| raise gr.Error(f"Thinking analysis error: {str(e)}") | |
| yield ('result', None) # Return None if no files processed |