NUDR / backend /scan_research_dry.py
magicboris's picture
Upload 83 files
3647b02 verified
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import random
from datetime import datetime
from glob import glob
from typing import Any, AsyncGenerator, Dict, List
async def do_research(
session_key: str,
query: str,
mock_directory: str = "mock_instances/stocks_24th_3_sections",
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Generator that reads and replays events from mock_instances with random delays.
Does not save any artifacts or events.
Args:
query: The research query to process (ignored in dry mode)
session_key: Unique identifier for this research session (ignored in dry mode)
mock_directory: Directory containing mock instance files to replay
Yields:
Dict containing event data from the mock instance
"""
# Find the mock instance files
mock_files = glob(f"{mock_directory}/*.events.jsonl")
if not mock_files:
yield {
"type": "error",
"description": "No mock instance files found in mock_instances directory",
}
return
# Use the first mock file found
mock_file = mock_files[0]
# Read all events from the mock file
with open(mock_file, "r") as f:
events = [json.loads(line) for line in f if line.strip()]
# Filter for non-reporting events
research_events = [
event
for event in events
if not event.get("type", "").startswith("report_")
and event.get("type", "") != "completed"
]
# Replay each event with a random delay
for event in research_events:
# Update timestamp to current time
event["timestamp"] = datetime.now().isoformat()
# Yield the event
yield event
# Wait for a random time between 0.5 and 2 seconds
await asyncio.sleep(random.uniform(0.5, 2.0))
async def do_reporting(
session_key: str, mock_directory: str = "mock_instances/stocks_24th_3_sections"
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Generator that reads and replays reporting events from mock_instances with random delays.
Does not save any artifacts or events.
Args:
session_key: Unique identifier for this research session (ignored in dry mode)
mock_directory: Directory containing mock instance files to replay
Yields:
Dict containing event data from the mock instance
"""
# Find the mock instance files
mock_files = glob(f"{mock_directory}/*.events.jsonl")
if not mock_files:
yield {
"type": "error",
"description": "No mock instance files found in mock_instances directory",
}
return
# Use the first mock file found
mock_file = mock_files[0]
# Read all events from the mock file
with open(mock_file, "r") as f:
events = [json.loads(line) for line in f if line.strip()]
# Filter for reporting events
reporting_events = [
event for event in events if event.get("type", "").startswith("report_")
]
# Replay each event with a random delay
for event in reporting_events:
# Update timestamp to current time
event["timestamp"] = datetime.now().isoformat()
# Yield the event
yield event
# Wait for a random time between 0.5 and 2 seconds
await asyncio.sleep(random.uniform(0.5, 2.0))