Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| from datetime import datetime, timedelta | |
| import gradio as gr | |
| import json | |
| from log_reader import RemoteLogReader | |
| # logging.basicConfig(level=logging.INFO) | |
| # log = logging.getLogger(__name__) | |
| def get_file_data(content: str) -> tuple[str, str, bool]: | |
| """Read file content and return IP, username, and vote condition status""" | |
| try: | |
| lines = [line.strip() for line in content.split('\n') if line.strip()] | |
| if not lines: | |
| return None, "", False | |
| # Get IP from first line | |
| try: | |
| first_line_data = json.loads(lines[0]) | |
| ip = first_line_data.get('ip') | |
| except json.JSONDecodeError: | |
| ip = None | |
| # Find the vote line (if any) | |
| username = "" | |
| vote_conditions_met = False | |
| vote_line_index = -1 | |
| # Search for the vote line | |
| for i, line in enumerate(lines): | |
| try: | |
| line_data = json.loads(line) | |
| if line_data.get('type') == 'vote': | |
| vote_line_index = i | |
| break | |
| except json.JSONDecodeError: | |
| continue | |
| # If we found a vote line, check conditions and get username | |
| if vote_line_index >= 0: | |
| try: | |
| vote_line_data = json.loads(lines[vote_line_index]) | |
| # Only try to get username if the key exists | |
| if 'username' in vote_line_data: | |
| username = vote_line_data.get('username') | |
| feedback = vote_line_data.get('feedback') | |
| # Check vote conditions: type is vote, feedback has 6 items, and at least 4 lines (2 rounds of chat) | |
| # Only count lines up to and including the vote line | |
| relevant_lines = lines[:vote_line_index + 1] | |
| vote_conditions_met = ( | |
| isinstance(feedback, dict) and | |
| len([1 for v in feedback.values() if v]) == 6 and | |
| len(relevant_lines) >= 4 | |
| ) | |
| except (json.JSONDecodeError, TypeError): | |
| pass | |
| return ip, username, vote_conditions_met | |
| except Exception as e: | |
| logging.error(f"Error processing file content: {e}") | |
| return None, "", False | |
| def search_battle_anony(date_str: str, search_query: str) -> list[list]: | |
| """Search battle_anony conversations for a specific date and query.""" | |
| results = [] | |
| try: | |
| # Initialize RemoteLogReader | |
| reader = RemoteLogReader() | |
| # Get conversation logs for battle_anony mode | |
| conv_logs = reader.get_conv_logs(date_str) | |
| battle_anony_logs = conv_logs.get('battle_anony', {}) | |
| # Process each conversation | |
| for conv_id, logs in battle_anony_logs.items(): | |
| found_query = False | |
| ip = None | |
| username = "" | |
| # Convert messages to file content format for validation check | |
| content = '\n'.join(json.dumps(msg) for msg in logs) | |
| ip, username, is_valid = get_file_data(content) | |
| if not ip: | |
| continue | |
| # Search through messages for the query | |
| for log in logs: | |
| if "state" in log and "messages" in log["state"]: | |
| messages = log["state"]["messages"] | |
| # Search through each message | |
| for _, message in messages: | |
| if search_query in message: | |
| found_query = True | |
| break | |
| if found_query: | |
| break | |
| if found_query: | |
| # Convert to list format instead of dict | |
| results.append([ | |
| ip, | |
| username, # Add username column | |
| conv_id, | |
| 'Yes' if is_valid else 'No', | |
| 'Yes' | |
| ]) | |
| except Exception as e: | |
| logging.error(f"Error searching logs for date {date_str}: {e}") | |
| return results | |
| def create_ui(): | |
| def process_search(month, day, query): | |
| if not query.strip(): | |
| return [] | |
| # Create date string in YYYY_MM_DD format | |
| date_str = f"2025_{month}_{day}" | |
| results = search_battle_anony(date_str, query) | |
| if not results: | |
| return [] | |
| return results | |
| with gr.Blocks(title="Battle Search") as app: | |
| gr.Markdown("# Battle Search") | |
| gr.Markdown("Search for specific content in battle_anony conversations by date and view IP addresses with validation status.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| current_month = datetime.now().strftime("%m") | |
| current_day = datetime.now().strftime("%d") | |
| month_input = gr.Dropdown( | |
| choices=[f"{i:02d}" for i in range(1, 13)], | |
| label="Month", | |
| value=current_month | |
| ) | |
| day_input = gr.Dropdown( | |
| choices=[f"{i:02d}" for i in range(1, 32)], | |
| label="Day", | |
| value=current_day | |
| ) | |
| query_input = gr.Textbox( | |
| label="Search Query", | |
| placeholder="Enter text to search for in conversations...", | |
| lines=1 | |
| ) | |
| with gr.Row(): | |
| search_button = gr.Button("Search") | |
| with gr.Row(): | |
| table_output = gr.DataFrame( | |
| headers=['IP', 'Username', 'Conversation ID', 'Is Valid', 'Contains Query'], | |
| label="Results Table", | |
| interactive=False, | |
| value=[] # Initialize with empty list | |
| ) | |
| search_button.click( | |
| fn=process_search, | |
| inputs=[month_input, day_input, query_input], | |
| outputs=[table_output] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_ui() | |
| app.launch() |