Spaces:
Running
on
Zero
Running
on
Zero
File size: 23,058 Bytes
9012453 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 |
"""
Gemini AI analysis module for deceptive pattern detection.
Updated to match gemini_prompting_to_make_dp_csvs_genai.py structure.
"""
import pandas as pd
import os
import time
import csv
from io import StringIO
import json
from glob import glob
from tqdm.auto import tqdm
import gradio as gr
try:
from google import genai
from google.genai import types
from google.genai.errors import ServerError
GENAI_AVAILABLE = True
except ImportError:
GENAI_AVAILABLE = False
def check_csv_format(df: pd.DataFrame) -> str:
"""
Check if the csv file generated is in the correct format as is expected.
Expectation is that the csv file has 10 columns and the index is integer.
It is also expected that all the cells in the csv file are strings and not null.
If the csv file has only one column, it is considered as a bad file.
Args:
df: pandas DataFrame object that is read from the csv file.
Returns:
str: A string that indicates the status of the csv
"""
if 1 < len(df.columns) < 10:
return "The CSV file has less than 10 columns."
elif len(df.columns) > 10:
return "The CSV file has more than 10 columns."
elif not isinstance(df.index, pd.core.indexes.range.RangeIndex):
return "The CSV file has an incorrect index. Probably issue with the PIPE (|) separation variable."
elif len(df.columns) == 1:
return "The CSV file has only one column."
elif 'Text' in df.columns and not isinstance(df.Text.dtype, object):
return "The CSV file has non-string values in the Text column."
else:
return "The CSV file is in the correct format."
# analyze_with_gemini function removed - using few_shots_generator instead
def few_shots_generator(eval_dir='./eval', files=None, api_key=None):
"""
Generator version of few_shots that yields notifications in real-time.
Yields:
tuple: (status, message) where status is 'notification' or 'result'
"""
print(f"[CONSOLE] few_shots_generator: Starting analysis...")
print(f"[CONSOLE] eval_dir: {eval_dir}")
print(f"[CONSOLE] files: {files}")
print(f"[CONSOLE] API key provided: {'Yes' if api_key else 'No'}")
if not api_key:
print(f"[CONSOLE] No API key provided, returning None")
yield ('notification', "β No API key provided for analysis")
raise gr.Error("No API key provided for analysis")
# Read system prompt from gradio-demo directory
try:
system_prompt_path = os.path.join(os.path.dirname(__file__), '..', 'system_prompt.txt')
with open(system_prompt_path, 'r', encoding='utf-8') as f:
textsi_1 = f.read()
print(f"[CONSOLE] System prompt loaded from: {system_prompt_path}")
except Exception as e:
print(f"[CONSOLE] Failed to load system prompt: {e}")
yield ('notification', "β Failed to load system prompt")
raise gr.Error(f"Failed to load system prompt: {str(e)}")
os.makedirs(f"{eval_dir}/gemini_fs", exist_ok=True)
print(f"[CONSOLE] Created gemini_fs directory: {eval_dir}/gemini_fs")
try:
client = genai.Client(api_key=api_key)
print(f"[CONSOLE] Gemini client initialized")
except Exception as e:
error_msg = f"β Failed to initialize Gemini client: {str(e)}"
yield ('notification', error_msg)
print(f"[CONSOLE] Client initialization failed: {e}")
raise gr.Error(f"Failed to initialize Gemini client: {str(e)}")
if files is None:
files = glob(os.path.join(f"{eval_dir}/csv_with_yolo", "*.csv"))
if not isinstance(files, list):
files = [files]
print(f"[CONSOLE] Processing {len(files)} files")
for f in files:
print(f"[CONSOLE] Processing file: {f}")
try:
data = pd.read_csv(f, index_col=0)
data.index = data.index.str.replace('|', '', regex=False)
data = data.to_csv()
print(f"[CONSOLE] Data loaded and converted to CSV format")
except Exception as e:
print(f"[CONSOLE] Failed to read the file: {f}, error: {e}")
raise gr.Error(f"Failed to read input file: {str(e)}")
try_cnt = 0
while try_cnt < 2:
try:
try_cnt += 1
yield ('notification', f"π€ Calling Gemini AI for pattern analysis (attempt {try_cnt})...")
if try_cnt == 1:
gr.Info("π€ Starting Gemini analysis...")
print(f"[CONSOLE] Attempt {try_cnt} - Calling Gemini API...")
response = client.models.generate_content(
model='gemini-2.5-pro',
contents=data,
config=types.GenerateContentConfig(
system_instruction=textsi_1,
temperature=0,
top_p=0.1,
top_k=1,
max_output_tokens=12288,
safety_settings=[
types.SafetySetting(category='HARM_CATEGORY_HARASSMENT', threshold='BLOCK_NONE'),
types.SafetySetting(category='HARM_CATEGORY_HATE_SPEECH', threshold='BLOCK_NONE'),
types.SafetySetting(category='HARM_CATEGORY_SEXUALLY_EXPLICIT', threshold='BLOCK_NONE'),
types.SafetySetting(category='HARM_CATEGORY_DANGEROUS_CONTENT', threshold='BLOCK_NONE'),
types.SafetySetting(category='HARM_CATEGORY_CIVIC_INTEGRITY', threshold='BLOCK_NONE')
]
)
)
yield ('notification', f"β
Gemini API call successful! Processing results...")
gr.Info("β
Gemini analysis successful!")
print(f"[CONSOLE] Gemini API call successful")
break
except ServerError as e:
if try_cnt > 3:
error_msg = f"β Failed to get response after {try_cnt} attempts"
yield ('notification', error_msg)
print(f"[CONSOLE] Failed to get response for {f} after {try_cnt} attempts")
raise gr.Error(f"Analysis failed after {try_cnt} attempts")
wait_msg = f"β οΈ Server error occurred. Retrying attempt {try_cnt + 1}/2 in 60 seconds..."
yield ('notification', wait_msg)
gr.Warning(f"β οΈ Server error. Retrying in 60 seconds... (attempt {try_cnt + 1}/2)")
print(f"[CONSOLE] Server error: {e.message}, sleeping for 60 seconds")
print(e)
time.sleep(60)
continue
except Exception as e:
# Handle non-server errors (API key issues, quota errors, etc.)
error_msg = f"β Gemini API error: {str(e.message)}"
print(f"[CONSOLE] Non-server error in Gemini API call: {e}")
yield 'notification', error_msg
raise gr.Error(f"Gemini API error: {str(e.message)}")
try:
# Process the response
_f = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f))
df = pd.read_csv(StringIO(response.text.replace("```csv", '').replace("```", '').strip()), sep='|')
csv_with_yolo = pd.read_csv(f, index_col=0)
gemini_cols = df[["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]]
csv_with_yolo.reset_index(inplace=True)
final_df = pd.concat([csv_with_yolo, gemini_cols], axis=1)
final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL)
print(f"[CONSOLE] Results saved to: {_f}")
# Check if thinking is needed (if any deceptive patterns found)
if set(final_df['Deceptive Design Category'].tolist()) != {'non-deceptive'}:
yield ('notification', "π§ Deceptive patterns detected! Running advanced thinking analysis...")
gr.Info("π§ Deceptive patterns found! Running advanced analysis...")
print(f"[CONSOLE] Deceptive patterns found, running thinking analysis...")
# Use generator version of thinking
thinking_result = None
for thinking_status, thinking_data in thinking_generator(eval_dir, files=[_f], api_key=api_key):
if thinking_status == 'notification':
yield ('notification', thinking_data)
elif thinking_status == 'result':
thinking_result = thinking_data
break
if thinking_result is not None:
yield ('notification', "β
Advanced thinking analysis completed successfully!")
gr.Info("β
Advanced analysis completed!")
print(f"[CONSOLE] Thinking analysis completed, using refined results")
final_df = thinking_result
else:
yield ('notification', "β οΈ Advanced thinking analysis failed, using original results")
gr.Warning("β οΈ Advanced analysis failed, using basic results")
print(f"[CONSOLE] Thinking analysis failed, using original results")
else:
yield ('notification', "β
No deceptive patterns found, analysis complete!")
gr.Info("β
No deceptive patterns detected!")
print(f"[CONSOLE] No deceptive patterns found, skipping thinking analysis")
yield 'result', final_df
return
except Exception as e:
print(f"[CONSOLE] Error parsing with pipe separator, trying comma: {e}")
try:
df = pd.read_csv(StringIO(response.text.replace("```csv", '').replace("```", '').strip()), sep=',')
csv_with_yolo = pd.read_csv(f, index_col=0)
gemini_cols = df[["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]]
csv_with_yolo.reset_index(inplace=True)
final_df = pd.concat([csv_with_yolo, gemini_cols], axis=1)
final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL)
print(f"[CONSOLE] Results saved to: {_f} (comma separated)")
# Check if thinking is needed
if set(final_df['Deceptive Design Category'].tolist()) != {'non-deceptive'}:
yield ('notification', "π§ Deceptive patterns detected! Running advanced thinking analysis...")
gr.Info("π§ Deceptive patterns found! Running advanced analysis...")
print(f"[CONSOLE] Deceptive patterns found, running thinking analysis...")
# Use generator version of thinking
thinking_result = None
for thinking_status, thinking_data in thinking_generator(eval_dir, files=[_f], api_key=api_key):
if thinking_status == 'notification':
yield ('notification', thinking_data)
elif thinking_status == 'result':
thinking_result = thinking_data
break
if thinking_result is not None:
yield ('notification', "β
Advanced thinking analysis completed successfully!")
gr.Info("β
Advanced analysis completed!")
print(f"[CONSOLE] Thinking analysis completed, using refined results")
final_df = thinking_result
else:
yield ('notification', "β οΈ Advanced thinking analysis failed, using original results")
gr.Warning("β οΈ Advanced analysis failed, using basic results")
print(f"[CONSOLE] Thinking analysis failed, using original results")
else:
yield ('notification', "β
No deceptive patterns found, analysis complete!")
gr.Info("β
No deceptive patterns detected!")
print(f"[CONSOLE] No deceptive patterns found, skipping thinking analysis")
yield ('result', final_df)
return
except Exception as e2:
error_msg = f"β Error parsing Gemini response with both separators: {str(e2)}"
yield ('notification', error_msg)
print(f"[CONSOLE] FEW_SHOT Error with both separators: {e2}")
try:
error_file = _f.replace(".csv", "e1.txt")
with open(error_file, 'w') as _fs:
_fs.write(response.text)
print(f"[CONSOLE] Error response saved to: {error_file}")
except Exception as e3:
print(f"[CONSOLE] Failed to save error response: {e3}")
raise gr.Error(f"Failed to parse response: {str(e2)}")
yield ('result', None)
def thinking_generator(eval_dir="./eval", files=None, api_key=None):
"""
Generator version of thinking that yields notifications in real-time.
"""
print(f"[CONSOLE] thinking_generator: Starting thinking analysis...")
print(f"[CONSOLE] eval_dir: {eval_dir}")
print(f"[CONSOLE] files: {files}")
if not api_key:
print(f"[CONSOLE] No API key provided for thinking analysis")
raise gr.Error("No API key provided for thinking analysis")
# Read thinking system prompt from gradio-demo directory
try:
thinking_prompt_path = os.path.join(os.path.dirname(__file__), '..', 'system_prompt_thinking.txt')
with open(thinking_prompt_path, 'r', encoding='utf-8') as f:
textsi_1 = f.read()
print(f"[CONSOLE] Thinking system prompt loaded from: {thinking_prompt_path}")
except Exception as e:
print(f"[CONSOLE] Failed to load thinking system prompt: {e}")
raise gr.Error(f"Failed to load thinking system prompt: {str(e)}")
os.makedirs(f"{eval_dir}/gemini_fs", exist_ok=True)
try:
client = genai.Client(api_key=api_key, http_options={'api_version':'v1beta'})
print(f"[CONSOLE] Thinking client initialized with v1beta")
except Exception as e:
error_msg = f"β Failed to initialize thinking client: {str(e)}"
print(f"[CONSOLE] Thinking client initialization failed: {e}")
raise gr.Error(f"Failed to initialize thinking client: {str(e)}")
if files is None:
files = glob(os.path.join(f"{eval_dir}/gemini_fs", "*.csv"))
if not isinstance(files, list):
files = [files]
print(f"[CONSOLE] Processing {len(files)} files for thinking analysis")
for f in files:
print(f"[CONSOLE] Thinking analysis for file: {f}")
try:
data = pd.read_csv(f, index_col=0)
data.index = data.index.str.replace('|', '', regex=False)
data = data.to_csv()
print(f"[CONSOLE] Data prepared for thinking analysis")
# Make API call to Gemini with retry logic for thinking analysis
try_cnt = 0
response = None
while try_cnt < 2:
try:
try_cnt += 1
yield ('notification', f"π§ Running advanced thinking analysis (attempt {try_cnt})...")
print(f"[CONSOLE] Attempt {try_cnt} - Calling Gemini API for thinking...")
response = client.models.generate_content(
model='gemini-2.5-pro',
contents=data,
config=types.GenerateContentConfig(
system_instruction=textsi_1,
temperature=0,
top_p=0.1,
top_k=1,
max_output_tokens=65536,
safety_settings=[
types.SafetySetting(category='HARM_CATEGORY_HARASSMENT', threshold='BLOCK_NONE'),
types.SafetySetting(category='HARM_CATEGORY_HATE_SPEECH', threshold='BLOCK_NONE'),
types.SafetySetting(category='HARM_CATEGORY_SEXUALLY_EXPLICIT', threshold='BLOCK_NONE'),
types.SafetySetting(category='HARM_CATEGORY_DANGEROUS_CONTENT', threshold='BLOCK_NONE'),
types.SafetySetting(category='HARM_CATEGORY_CIVIC_INTEGRITY', threshold='BLOCK_NONE')
]
)
)
yield ('notification', f"β
Advanced thinking analysis API call successful!")
print(f"[CONSOLE] Thinking API call successful")
break
except ServerError as e:
if try_cnt > 3:
error_msg = f"β Failed to complete thinking analysis after {try_cnt} attempts"
yield ('notification', error_msg)
print(f"[CONSOLE] Failed to get thinking response after {try_cnt} attempts")
raise gr.Error(f"Advanced analysis failed after {try_cnt} attempts")
wait_msg = f"β οΈ Server error in thinking analysis. Retrying attempt {try_cnt + 1}/2 in 60 seconds..."
yield ('notification', wait_msg)
gr.Warning(f"β οΈ Thinking server error. Retrying in 60s... (attempt {try_cnt + 1}/2)")
print(f"[CONSOLE] Server error in thinking analysis: {e.message}, sleeping for 60 seconds")
print(e)
time.sleep(60)
continue
except Exception as e:
# Handle non-server errors in thinking analysis
error_msg = f"β Thinking analysis API error: {str(e)}"
yield ('notification', error_msg)
print(f"[CONSOLE] Non-server error in thinking API call: {e}")
raise gr.Error(f"Thinking analysis API error: {str(e)}")
output_csv = ""
thought_txt = ""
for part in response.candidates[0].content.parts:
if part.thought == True:
thought_txt = part.text
print(f"[CONSOLE] Extracted thought text ({len(thought_txt)} chars)")
else:
output_csv = part.text
print(f"[CONSOLE] Extracted output CSV ({len(output_csv)} chars)")
_f = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f))
_f_thought = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f).replace(".csv", "_thinking.txt"))
# Save thinking text
with open(_f_thought, 'w', encoding='utf-8') as _f_thought_file:
_f_thought_file.write(thought_txt)
print(f"[CONSOLE] Thinking text saved to: {_f_thought}")
# Parse and save updated CSV with similar process as main analysis
try:
# Parse the thinking response CSV
df_thinking = pd.read_csv(StringIO(output_csv), sep='|')
# Read the original CSV file to get the base data
csv_with_yolo = pd.read_csv(f, index_col=0).drop(columns=["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"], errors='ignore')
# Extract the thinking analysis columns (similar to main process)
thinking_cols = df_thinking[["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]]
# Reset index and concatenate with original data
csv_with_yolo.reset_index(inplace=True)
final_df = pd.concat([csv_with_yolo, thinking_cols], axis=1)
# Save the updated dataframe
final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL)
print(f"[CONSOLE] Thinking results saved to: {_f} (pipe separated)")
yield ('result', final_df) # Return the updated dataframe
return
except Exception as e:
print(f"[CONSOLE] Error with pipe separator, trying comma: {e}")
try:
# Parse the thinking response CSV with comma separator
df_thinking = pd.read_csv(StringIO(output_csv), sep=',')
# Read the original CSV file to get the base data
csv_with_yolo = pd.read_csv(f, index_col=0).drop(columns=["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"], errors='ignore')
# Extract the thinking analysis columns (similar to main process)
thinking_cols = df_thinking[["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]]
# Reset index and concatenate with original data
csv_with_yolo.reset_index(inplace=True)
final_df = pd.concat([csv_with_yolo, thinking_cols], axis=1)
# Save the updated dataframe
final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL)
print(f"[CONSOLE] Thinking results saved to: {_f} (comma separated)")
yield ('result', final_df) # Return the updated dataframe
return
except Exception as e2:
error_msg = f"β Error parsing thinking analysis response with both separators: {str(e2)}"
yield ('notification', error_msg)
print(f"[CONSOLE] THINKING ERROR with both separators: {e2}")
try:
error_file = _f.replace(".csv", "e2.txt")
with open(error_file, 'w') as _fs:
_fs.write(output_csv)
print(f"[CONSOLE] Thinking error response saved to: {error_file}")
except Exception as e3:
print(f"[CONSOLE] Failed to save thinking error response: {e3}")
raise gr.Error(f"Failed to parse thinking response: {str(e2)}")
except Exception as e:
error_msg = f"β Error in thinking analysis: {str(e)}"
yield ('notification', error_msg)
print(f"[CONSOLE] Error in thinking analysis for {f}: {e}")
raise gr.Error(f"Thinking analysis error: {str(e)}")
yield ('result', None) # Return None if no files processed |