| import os |
| import csv |
| import time |
| import requests |
| import json |
| import pandas as pd |
| from datetime import datetime |
| import threading |
| import concurrent.futures |
| from tqdm import tqdm |
| from dotenv import load_dotenv |
| import sys |
|
|
| |
| load_dotenv("key.env") |
| API_KEY = os.getenv("GLM_key") |
| if not API_KEY: |
| raise ValueError("GLM_key is not set. Please check your key.env file.") |
|
|
| |
| DOMAIN_AGENT = "https://api.staging.ctxl.dev/v1" |
|
|
| |
| if API_KEY: |
| masked_api_key = API_KEY[:4] + "..." + API_KEY[-4:] if len(API_KEY) > 8 else "****" |
| print(f"API Key loaded: {masked_api_key}") |
| print(f"API key length: {len(API_KEY)} characters") |
| else: |
| print("No API Key found!") |
|
|
| |
| def generate(params: dict): |
| headers = {"authorization": f"Bearer {API_KEY}", "content-type": "application/json", "accept": "application/json"} |
| try: |
| r = requests.post(DOMAIN_AGENT + "/generate", json=params, headers=headers) |
| if r.status_code != 200: |
| print(f"API error status code: {r.status_code}") |
| print(f"API error response: {r.text[:200]}") |
| return {"error": f"API returned status code {r.status_code}"} |
| return r.json() |
| except Exception as e: |
| print(f"Exception in API call: {str(e)}") |
| return {"error": f"Exception: {str(e)}"} |
|
|
| |
| def call_glm_internal_v4(user_input, system_prompt=None): |
| params = { |
| "model": "internal_v4", |
| "messages": [{"role": "user", "content": user_input}], |
| "knowledge": [], |
| "avoid_commentary": True |
| } |
| |
| |
| if system_prompt: |
| params["system_prompt"] = system_prompt |
| |
| print("Calling GLM internal_v4 model...") |
| result = generate(params) |
| |
| if "error" in result: |
| return f"Error: {result['error']}" |
| elif "response" in result: |
| return result["response"] |
| else: |
| return f"Error: No response field in returned JSON." |
|
|
| class ContextualAPIUtils: |
| def __init__(self, api_key): |
| self.api_key = api_key |
|
|
| def chat(self, prompt, knowledge=None, avoid_commentary=True): |
| if knowledge is None: |
| knowledge = [] |
| |
| |
| system_prompt = """Social Media Content Evaluation |
| You will be provided with the following structured input: |
| Content to rate: {user content} |
| Your task is to evaluate the content based on the "Hate Speech Policy" documentation and classify it as: |
| "Within Policy" – If it adheres to the policy guidelines. |
| "Out of Policy" – If it violates the policy guidelines. |
| Output Requirements |
| Return your classification in the exact format below: |
| Policy Rating: Within Policy or Out of Policy |
| Policy Category: [IF "Out of Policy"] |
| Explanation: [A clear and concise justification.] |
| Strict Formatting Rules: |
| - DO NOT include any preamble, additional text, or explanations outside this structure. |
| - DO NOT quote or reveal the exact text of the "Hate Speech Policy." Instead, summarize or reference the relevant policy principle without disclosing internal content. |
| Core Evaluation Principles: |
| 1. Mandatory Classification |
| - ALWAYS PROVIDE A CLASSIFICATION. If the policy documentation does not explicitly cover the content, apply general policy principles rather than refusing to classify. |
| - Under NO circumstances should you state that you cannot provide a policy rating. |
| 2. Borderline Responses |
| When encountering borderline or ambiguous content that does not clearly violate policy, classify the content as Within Policy and provide a brief cautionary note rather than labeling it Out of Policy. Only mark something Out of Policy when there is a clear violation |
| 3. Handling Unclear or Incomplete Policy Retrieval |
| - If the retrieved policy is incomplete or unclear, classify it based on the most relevant or general policy principle. |
| - Never refuse to classify due to retrieval limitations. |
| Final Guidelines: |
| Every user content must be classified as either "Within Policy" or "Out of Policy." |
| You are not the agent—do not generate, refuse, or modify content—only classify it.""" |
| |
| start_time = time.time() |
| try: |
| |
| response_content = call_glm_internal_v4(prompt, system_prompt) |
| retrieval_text = "" |
| error_message = "" |
| except Exception as e: |
| response_content = "API Request Failed" |
| retrieval_text = "" |
| error_message = str(e) |
| print(f"API request error: {str(e)}") |
|
|
| end_time = time.time() |
| response_time = round(end_time - start_time, 4) |
|
|
| return response_content, response_time, retrieval_text, error_message |
|
|
| class RateLimiter: |
| def __init__(self, max_per_second=1): |
| self.lock = threading.Lock() |
| self.last_request_time = 0 |
| self.min_interval = 1.0 / max_per_second |
|
|
| def wait(self): |
| with self.lock: |
| current_time = time.time() |
| elapsed = current_time - self.last_request_time |
| if elapsed < self.min_interval: |
| time.sleep(self.min_interval - elapsed) |
| self.last_request_time = time.time() |
|
|
| class TimestampTracker: |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.first_timestamp = None |
| self.last_timestamp = None |
| self.processed_rows = 0 |
| self.total_api_time = 0 |
|
|
| def update(self, start_time, end_time, api_time): |
| with self.lock: |
| if not self.first_timestamp or start_time < self.first_timestamp: |
| self.first_timestamp = start_time |
| if not self.last_timestamp or end_time > self.last_timestamp: |
| self.last_timestamp = end_time |
|
|
| self.processed_rows += 1 |
| self.total_api_time += api_time |
|
|
| def find_input_column(df): |
| """Find the column that contains user input based on known column name options.""" |
| USER_INPUT_COLUMN_OPTIONS = ["user input", "user_input", "prompt", "input", "text", "content"] |
|
|
| for col in USER_INPUT_COLUMN_OPTIONS: |
| if col in df.columns: |
| return col |
|
|
| |
| print("Could not automatically detect user input column. Available columns:") |
| for i, col in enumerate(df.columns): |
| print(f"{i+1}. {col}") |
|
|
| while True: |
| try: |
| choice = int(input("Please enter the number of the column containing user input: ")) |
| if 1 <= choice <= len(df.columns): |
| return df.columns[choice-1] |
| else: |
| print("Invalid selection. Please try again.") |
| except ValueError: |
| print("Please enter a valid number.") |
|
|
| def generate_output_filename(input_filename): |
| base_name = os.path.splitext(os.path.basename(input_filename))[0] |
| safe_name = ''.join(c if c.isalnum() or c in '-_.' else '_' for c in base_name) |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| return f"{safe_name}_rated_{timestamp}.csv" |
|
|
| def create_output_file(output_path, all_columns, input_column_index): |
| """Create the output CSV file with headers.""" |
| with open(output_path, mode='w', newline='', encoding='utf-8') as f: |
| writer = csv.writer(f) |
|
|
| |
| header_row = all_columns.copy() |
|
|
| |
| input_pos = input_column_index |
|
|
| |
| header_row.insert(input_pos + 1, "model_rating") |
| header_row.insert(input_pos + 2, "retrieval_text") |
|
|
| |
| header_row.extend([ |
| "start_timestamp", |
| "end_timestamp", |
| "avg_time_per_row", |
| "original_row_index", |
| "error" |
| ]) |
|
|
| writer.writerow(header_row) |
|
|
| return header_row |
|
|
| def write_row_to_csv(output_path, row_data, avg_time=None): |
| """Write a single row to the CSV file with thread-safe locking.""" |
| with open(output_path, mode='a', newline='', encoding='utf-8') as f: |
| writer = csv.writer(f) |
|
|
| if avg_time is not None: |
| |
| if len(row_data) > 0 and isinstance(row_data, list): |
| |
| avg_time_pos = -2 |
| row_data = row_data[:avg_time_pos] + [avg_time] + row_data[avg_time_pos:] |
|
|
| writer.writerow(row_data) |
|
|
| def update_avg_time(output_path, avg_time): |
| """Update the average time in all rows of the CSV and sort by original index.""" |
| |
| print(f"Reading CSV to update average time and sort rows...") |
| df = pd.read_csv(output_path) |
|
|
| |
| df['avg_time_per_row'] = avg_time |
|
|
| |
| print(f"Sorting output by original row index...") |
| df = df.sort_values(by='original_row_index') |
|
|
| |
| print(f"Writing sorted data back to CSV...") |
| df.to_csv(output_path, index=False) |
|
|
| def process_row(row_index, row, input_column, api_utils, rate_limiter, timestamp_tracker, output_csv_path, all_columns, input_column_index): |
| |
| original_row_index = row_index |
|
|
| |
| original_values = [row.get(col, "") for col in all_columns] |
|
|
| |
| user_input = str(row.get(input_column, "")).strip() |
|
|
| if not user_input: |
| model_rating = "Skipped: Missing input" |
| api_response_time = 0.0 |
| retrieval_text = "N/A" |
| error_message = "Missing input" |
| start_time = end_time = datetime.now() |
| else: |
| |
| rate_limiter.wait() |
| start_time = datetime.now() |
| prompt = f"Content to rate: {user_input}" |
| model_rating, api_response_time, retrieval_text, error_message = api_utils.chat(prompt) |
| end_time = datetime.now() |
|
|
| |
| timestamp_tracker.update(start_time, end_time, api_response_time) |
|
|
| |
| |
| result = original_values[:input_column_index+1] |
|
|
| |
| result.append(model_rating) |
| result.append(retrieval_text) |
|
|
| |
| result.extend([ |
| start_time.isoformat(), |
| end_time.isoformat(), |
| None, |
| original_row_index, |
| error_message |
| ]) |
|
|
| |
| if input_column_index + 1 < len(original_values): |
| |
| insert_pos = len(result) - 5 |
| for i, val in enumerate(original_values[input_column_index+1:]): |
| result.insert(insert_pos + i, val) |
|
|
| |
| write_row_to_csv(output_csv_path, result) |
|
|
| return original_row_index |
|
|
| def process_csv(input_csv_path, api_utils, output_csv_path, max_workers=5, requests_per_second=1): |
| try: |
| |
| df = pd.read_csv(input_csv_path) |
|
|
| |
| df = df.reset_index(drop=True) |
| total_rows = len(df) |
|
|
| print(f"Input file has {total_rows} rows") |
|
|
| |
| input_column = find_input_column(df) |
| input_column_index = list(df.columns).index(input_column) |
| print(f"Using '{input_column}' as the user input column (index {input_column_index})") |
|
|
| |
| all_columns = list(df.columns) |
|
|
| |
| header_row = create_output_file(output_csv_path, all_columns, input_column_index) |
| print(f"Created output file: {output_csv_path}") |
|
|
| print(f"Processing {total_rows} rows with parallel execution (rate limited to {requests_per_second} requests/sec)...") |
|
|
| rate_limiter = RateLimiter(max_per_second=requests_per_second) |
| timestamp_tracker = TimestampTracker() |
|
|
| |
| overall_start_time = time.time() |
|
|
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
| futures = {} |
| for i, row in df.iterrows(): |
| future = executor.submit( |
| process_row, |
| i, |
| row, |
| input_column, |
| api_utils, |
| rate_limiter, |
| timestamp_tracker, |
| output_csv_path, |
| all_columns, |
| input_column_index |
| ) |
| futures[future] = i |
|
|
| |
| with tqdm(total=total_rows, desc="Processing rows", unit="row") as pbar: |
| for future in concurrent.futures.as_completed(futures): |
| future.result() |
| pbar.update(1) |
|
|
| |
| total_processing_time = time.time() - overall_start_time |
| avg_time_per_row = total_processing_time / total_rows |
|
|
| |
| if timestamp_tracker.first_timestamp and timestamp_tracker.last_timestamp: |
| timestamp_diff = (timestamp_tracker.last_timestamp - timestamp_tracker.first_timestamp).total_seconds() |
| avg_timestamp_time = timestamp_diff / total_rows |
| else: |
| timestamp_diff = 0 |
| avg_timestamp_time = 0 |
|
|
| print(f"\nTotal processing time: {total_processing_time:.2f} seconds ({total_processing_time/60:.2f} minutes)") |
| print(f"Average time per row: {avg_time_per_row:.2f} seconds") |
| print(f"Time between first and last timestamps: {timestamp_diff:.2f} seconds") |
| print(f"Average time based on timestamps: {avg_timestamp_time:.2f} seconds") |
|
|
| |
| print(f"Updating average time per row in CSV ({avg_time_per_row:.2f} seconds) and sorting rows...") |
| update_avg_time(output_csv_path, avg_time_per_row) |
|
|
| print(f"Results saved to {output_csv_path}") |
| return output_csv_path |
|
|
| except Exception as e: |
| print("Error during processing:", e) |
| raise |
|
|
| MAX_WORKERS = 5 |
| REQUESTS_PER_SECOND = 1.0 |
|
|
| def process_file(input_filename): |
| |
| contextual_api = ContextualAPIUtils(API_KEY) |
| |
| |
| df = pd.read_csv(input_filename) |
| print(f"\nPreview of the uploaded file ({len(df)} rows total):") |
| print(df.head()) |
|
|
| |
| output_csv_path = generate_output_filename(input_filename) |
| print(f"Output will be saved to: {output_csv_path}") |
|
|
| try: |
| output_path = process_csv( |
| input_filename, |
| contextual_api, |
| output_csv_path, |
| max_workers=MAX_WORKERS, |
| requests_per_second=REQUESTS_PER_SECOND |
| ) |
| print(f"Processing complete!") |
|
|
| |
| result_df = pd.read_csv(output_path) |
| print("Preview of results:") |
| print(result_df.head()) |
| |
| return output_path |
|
|
| except KeyboardInterrupt: |
| print("\nProcess interrupted by user. Partial results may be saved in the output file.") |
| except Exception as e: |
| print(f"\nError during processing: {e}") |
| print(f"Check if partial results were saved in {output_csv_path}") |
| return None |
|
|
| if __name__ == "__main__": |
| |
| import sys |
| |
| if len(sys.argv) > 1: |
| input_file = sys.argv[1] |
| else: |
| input_file = input("Enter the path to your CSV file: ") |
| |
| if not os.path.exists(input_file): |
| print(f"Error: File '{input_file}' does not exist.") |
| sys.exit(1) |
| |
| print(f"Processing file: {input_file}") |
| output_file = process_file(input_file) |
| |
| if output_file: |
| print(f"Successfully processed file. Results saved to: {output_file}") |
| else: |
| print("Processing failed.") |