Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| from datetime import datetime, timedelta | |
| import gradio as gr | |
| import json | |
| from log_reader import RemoteLogReader | |
| # logging.basicConfig(level=logging.INFO) | |
| # log = logging.getLogger(__name__) | |
| def get_file_data(content: str) -> tuple[str, bool]: | |
| """Read file content and return IP and vote condition status""" | |
| try: | |
| lines = [line.strip() for line in content.split('\n') if line.strip()] | |
| if not lines: | |
| return None, False | |
| # Get IP from first line | |
| try: | |
| first_line_data = json.loads(lines[0]) | |
| ip = first_line_data.get('ip') | |
| except json.JSONDecodeError: | |
| ip = None | |
| # Check vote conditions from last line | |
| try: | |
| last_line_data = json.loads(lines[-1]) | |
| feedback = last_line_data.get('feedback') | |
| vote_conditions_met = (last_line_data.get('type') == 'vote' and | |
| isinstance(feedback, dict) and | |
| len(feedback) == 6) | |
| except json.JSONDecodeError: | |
| vote_conditions_met = False | |
| return ip, vote_conditions_met | |
| except Exception as e: | |
| logging.error(f"Error processing file content: {e}") | |
| return None, False | |
| def search_battle_anony(date_str: str, search_query: str) -> list[list]: | |
| """Search battle_anony conversations for a specific date and query.""" | |
| results = [] | |
| try: | |
| # Initialize RemoteLogReader | |
| reader = RemoteLogReader() | |
| # Get conversation logs for battle_anony mode | |
| conv_logs = reader.get_conv_logs(date_str) | |
| battle_anony_logs = conv_logs.get('battle_anony', {}) | |
| # Process each conversation | |
| for conv_id, logs in battle_anony_logs.items(): | |
| found_query = False | |
| ip = None | |
| # Convert messages to file content format for validation check | |
| content = '\n'.join(json.dumps(msg) for msg in logs) | |
| ip, is_valid = get_file_data(content) | |
| if not ip: | |
| continue | |
| # Search through messages for the query | |
| for log in logs: | |
| if "state" in log and "messages" in log["state"]: | |
| messages = log["state"]["messages"] | |
| # Search through each message | |
| for _, message in messages: | |
| if search_query in message: | |
| found_query = True | |
| break | |
| if found_query: | |
| break | |
| if found_query: | |
| # Convert to list format instead of dict | |
| results.append([ | |
| ip, | |
| conv_id, | |
| 'Yes' if is_valid else 'No', | |
| 'Yes' | |
| ]) | |
| except Exception as e: | |
| logging.error(f"Error searching logs for date {date_str}: {e}") | |
| return results | |
| def create_ui(): | |
| def process_search(month, day, query): | |
| if not query.strip(): | |
| return [] | |
| # Create date string in YYYY_MM_DD format | |
| date_str = f"2025_{month}_{day}" | |
| results = search_battle_anony(date_str, query) | |
| if not results: | |
| return [] | |
| return results | |
| with gr.Blocks(title="Battle Search") as app: | |
| gr.Markdown("# Battle Search") | |
| gr.Markdown("Search for specific content in battle_anony conversations by date and view IP addresses with validation status.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| month_input = gr.Dropdown( | |
| choices=[f"{i:02d}" for i in range(1, 13)], | |
| label="Month", | |
| value="01" | |
| ) | |
| day_input = gr.Dropdown( | |
| choices=[f"{i:02d}" for i in range(1, 32)], | |
| label="Day", | |
| value="01" | |
| ) | |
| query_input = gr.Textbox( | |
| label="Search Query", | |
| placeholder="Enter text to search for in conversations...", | |
| lines=1 | |
| ) | |
| with gr.Row(): | |
| search_button = gr.Button("Search") | |
| with gr.Row(): | |
| table_output = gr.DataFrame( | |
| headers=['IP', 'Conversation ID', 'Is Valid', 'Contains Query'], | |
| label="Results Table", | |
| interactive=False, | |
| value=[] # Initialize with empty list | |
| ) | |
| search_button.click( | |
| fn=process_search, | |
| inputs=[month_input, day_input, query_input], | |
| outputs=[table_output] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_ui() | |
| app.launch() |