Spaces:
Running on Zero
Running on Zero
| """Tests for the opt-in telemetry module.""" | |
| import json | |
| import os | |
| import tempfile | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from unittest.mock import MagicMock, patch | |
| import torch | |
| from obliteratus.telemetry import ( | |
| _ALLOWED_METHOD_CONFIG_KEYS, | |
| _direction_stats, | |
| _extract_excise_details, | |
| _extract_prompt_counts, | |
| _extract_analysis_insights, | |
| _is_mount_point, | |
| _test_writable, | |
| build_report, | |
| disable_telemetry, | |
| enable_telemetry, | |
| is_enabled, | |
| maybe_send_informed_report, | |
| maybe_send_pipeline_report, | |
| restore_from_hub, | |
| send_report, | |
| storage_diagnostic, | |
| ) | |
| def _reset_telemetry(): | |
| import obliteratus.telemetry as t | |
| t._enabled = None | |
| # ββ Enable / disable ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestTelemetryConfig: | |
| """Test telemetry enable/disable logic.""" | |
| def setup_method(self): | |
| _reset_telemetry() | |
| def test_disabled_by_default(self): | |
| with patch.dict(os.environ, {}, clear=True): | |
| _reset_telemetry() | |
| assert not is_enabled() | |
| def test_enabled_by_default_on_hf_spaces(self): | |
| with patch.dict(os.environ, {"SPACE_ID": "user/space"}, clear=True): | |
| import obliteratus.telemetry as t | |
| old_val = t._ON_HF_SPACES | |
| t._ON_HF_SPACES = True | |
| _reset_telemetry() | |
| assert is_enabled() | |
| t._ON_HF_SPACES = old_val | |
| def test_disable_via_env_zero(self): | |
| with patch.dict(os.environ, {"OBLITERATUS_TELEMETRY": "0"}): | |
| _reset_telemetry() | |
| assert not is_enabled() | |
| def test_disable_via_env_false(self): | |
| with patch.dict(os.environ, {"OBLITERATUS_TELEMETRY": "false"}): | |
| _reset_telemetry() | |
| assert not is_enabled() | |
| def test_enable_via_env_explicit(self): | |
| with patch.dict(os.environ, {"OBLITERATUS_TELEMETRY": "1"}): | |
| _reset_telemetry() | |
| assert is_enabled() | |
| def test_enable_programmatically(self): | |
| enable_telemetry() | |
| assert is_enabled() | |
| def test_disable_programmatically(self): | |
| enable_telemetry() | |
| assert is_enabled() | |
| disable_telemetry() | |
| assert not is_enabled() | |
| def test_programmatic_overrides_env(self): | |
| with patch.dict(os.environ, {"OBLITERATUS_TELEMETRY": "1"}): | |
| disable_telemetry() | |
| assert not is_enabled() | |
| # ββ Report building βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestBuildReport: | |
| """Test report payload construction.""" | |
| def _base_kwargs(self, **overrides): | |
| defaults = dict( | |
| architecture="LlamaForCausalLM", | |
| num_layers=32, | |
| num_heads=32, | |
| hidden_size=4096, | |
| total_params=8_000_000_000, | |
| method="advanced", | |
| method_config={"n_directions": 4, "norm_preserve": True}, | |
| quality_metrics={"perplexity": 5.2, "refusal_rate": 0.05}, | |
| ) | |
| defaults.update(overrides) | |
| return defaults | |
| def test_schema_version_2(self): | |
| report = build_report(**self._base_kwargs()) | |
| assert report["schema_version"] == 2 | |
| def test_basic_fields(self): | |
| report = build_report(**self._base_kwargs()) | |
| assert report["model"]["architecture"] == "LlamaForCausalLM" | |
| assert report["model"]["num_layers"] == 32 | |
| assert report["model"]["total_params"] == 8_000_000_000 | |
| assert report["method"] == "advanced" | |
| assert report["quality_metrics"]["refusal_rate"] == 0.05 | |
| assert len(report["session_id"]) == 32 | |
| def test_filters_unknown_config_keys(self): | |
| report = build_report(**self._base_kwargs( | |
| method_config={"n_directions": 1, "secret_flag": True, "nuke": "boom"}, | |
| )) | |
| assert "n_directions" in report["method_config"] | |
| assert "secret_flag" not in report["method_config"] | |
| assert "nuke" not in report["method_config"] | |
| def test_allows_all_valid_config_keys(self): | |
| """Every key in the allowlist should pass through.""" | |
| config = {k: True for k in _ALLOWED_METHOD_CONFIG_KEYS} | |
| report = build_report(**self._base_kwargs(method_config=config)) | |
| for k in _ALLOWED_METHOD_CONFIG_KEYS: | |
| assert k in report["method_config"], f"Missing allowlisted key: {k}" | |
| def test_no_model_name_in_report(self): | |
| report = build_report(**self._base_kwargs()) | |
| report_str = json.dumps(report) | |
| assert "meta-llama" not in report_str | |
| assert "Llama-3" not in report_str | |
| def test_environment_info(self): | |
| report = build_report(**self._base_kwargs()) | |
| env = report["environment"] | |
| assert "python_version" in env | |
| assert "os" in env | |
| assert "arch" in env | |
| def test_stage_durations(self): | |
| durations = {"summon": 2.5, "probe": 10.1, "distill": 3.2} | |
| report = build_report(**self._base_kwargs(stage_durations=durations)) | |
| assert report["stage_durations"] == durations | |
| def test_direction_stats(self): | |
| stats = {"direction_norms": {"10": 0.95}, "mean_direction_persistence": 0.87} | |
| report = build_report(**self._base_kwargs(direction_stats=stats)) | |
| assert report["direction_stats"]["mean_direction_persistence"] == 0.87 | |
| def test_excise_details(self): | |
| details = {"modified_count": 128, "used_techniques": ["head_surgery"]} | |
| report = build_report(**self._base_kwargs(excise_details=details)) | |
| assert report["excise_details"]["modified_count"] == 128 | |
| def test_prompt_counts(self): | |
| counts = {"harmful": 33, "harmless": 33, "jailbreak": 15} | |
| report = build_report(**self._base_kwargs(prompt_counts=counts)) | |
| assert report["prompt_counts"]["harmful"] == 33 | |
| assert report["prompt_counts"]["jailbreak"] == 15 | |
| def test_gpu_memory(self): | |
| mem = {"peak_allocated_gb": 7.2, "peak_reserved_gb": 8.0} | |
| report = build_report(**self._base_kwargs(gpu_memory=mem)) | |
| assert report["gpu_memory"]["peak_allocated_gb"] == 7.2 | |
| def test_analysis_insights_filtered(self): | |
| """Only allowlisted analysis keys should pass through.""" | |
| insights = { | |
| "detected_alignment_method": "DPO", | |
| "alignment_confidence": 0.92, | |
| "secret_internal_data": "should not appear", | |
| } | |
| report = build_report(**self._base_kwargs(analysis_insights=insights)) | |
| assert report["analysis_insights"]["detected_alignment_method"] == "DPO" | |
| assert "secret_internal_data" not in report["analysis_insights"] | |
| def test_informed_extras(self): | |
| extras = {"ouroboros_passes": 3, "final_refusal_rate": 0.02, "total_duration": 120.5} | |
| report = build_report(**self._base_kwargs(informed_extras=extras)) | |
| assert report["informed"]["ouroboros_passes"] == 3 | |
| def test_optional_fields_omitted_when_empty(self): | |
| """Optional fields should not appear when not provided.""" | |
| report = build_report(**self._base_kwargs()) | |
| assert "stage_durations" not in report | |
| assert "direction_stats" not in report | |
| assert "excise_details" not in report | |
| assert "prompt_counts" not in report | |
| assert "gpu_memory" not in report | |
| assert "analysis_insights" not in report | |
| assert "informed" not in report | |
| # ββ Direction stats extraction ββββββββββββββββββββββββββββββββββββββββββ | |
| class TestDirectionStats: | |
| """Test direction quality metric extraction.""" | |
| def test_direction_norms(self): | |
| pipeline = MagicMock() | |
| pipeline.refusal_directions = { | |
| 0: torch.randn(128), | |
| 1: torch.randn(128), | |
| } | |
| pipeline.refusal_subspaces = {} | |
| stats = _direction_stats(pipeline) | |
| assert "direction_norms" in stats | |
| assert "0" in stats["direction_norms"] | |
| assert "1" in stats["direction_norms"] | |
| def test_direction_persistence(self): | |
| """Adjacent layers with similar directions should have high persistence.""" | |
| d = torch.randn(128) | |
| d = d / d.norm() | |
| pipeline = MagicMock() | |
| pipeline.refusal_directions = {0: d, 1: d + 0.01 * torch.randn(128)} | |
| pipeline.refusal_subspaces = {} | |
| stats = _direction_stats(pipeline) | |
| assert "mean_direction_persistence" in stats | |
| assert stats["mean_direction_persistence"] > 0.9 | |
| def test_effective_rank(self): | |
| """Multi-direction subspace should yield effective rank > 1.""" | |
| pipeline = MagicMock() | |
| pipeline.refusal_directions = {0: torch.randn(128)} | |
| # 4-direction subspace with distinct directions | |
| sub = torch.randn(4, 128) | |
| pipeline.refusal_subspaces = {0: sub} | |
| stats = _direction_stats(pipeline) | |
| assert "effective_ranks" in stats | |
| assert float(stats["effective_ranks"]["0"]) > 1.0 | |
| def test_empty_directions(self): | |
| pipeline = MagicMock() | |
| pipeline.refusal_directions = {} | |
| pipeline.refusal_subspaces = {} | |
| stats = _direction_stats(pipeline) | |
| assert stats == {} | |
| # ββ Excise details extraction βββββββββββββββββββββββββββββββββββββββββββ | |
| class TestExciseDetails: | |
| def test_basic_excise_details(self): | |
| pipeline = MagicMock() | |
| pipeline._excise_modified_count = 64 | |
| pipeline._refusal_heads = {10: [(0, 0.9), (3, 0.8)], 11: [(1, 0.7)]} | |
| pipeline._sae_directions = {} | |
| pipeline._expert_safety_scores = {} | |
| pipeline._layer_excise_weights = {} | |
| pipeline._expert_directions = {} | |
| pipeline._steering_hooks = [] | |
| pipeline.invert_refusal = False | |
| pipeline.project_embeddings = False | |
| pipeline.activation_steering = False | |
| pipeline.expert_transplant = False | |
| details = _extract_excise_details(pipeline) | |
| assert details["modified_count"] == 64 | |
| assert details["head_surgery_layers"] == 2 | |
| assert details["total_heads_projected"] == 3 | |
| assert "head_surgery" in details["used_techniques"] | |
| def test_adaptive_weights(self): | |
| pipeline = MagicMock() | |
| pipeline._excise_modified_count = None | |
| pipeline._refusal_heads = {} | |
| pipeline._sae_directions = {} | |
| pipeline._expert_safety_scores = {} | |
| pipeline._layer_excise_weights = {0: 0.2, 1: 0.8, 2: 0.5} | |
| pipeline._expert_directions = {} | |
| pipeline._steering_hooks = [] | |
| pipeline.invert_refusal = False | |
| pipeline.project_embeddings = False | |
| pipeline.activation_steering = False | |
| pipeline.expert_transplant = False | |
| details = _extract_excise_details(pipeline) | |
| assert details["adaptive_weight_min"] == 0.2 | |
| assert details["adaptive_weight_max"] == 0.8 | |
| assert "layer_adaptive" in details["used_techniques"] | |
| # ββ Prompt counts extraction ββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestPromptCounts: | |
| def test_basic_counts(self): | |
| pipeline = MagicMock() | |
| pipeline.harmful_prompts = ["a"] * 33 | |
| pipeline.harmless_prompts = ["b"] * 33 | |
| pipeline.jailbreak_prompts = None | |
| counts = _extract_prompt_counts(pipeline) | |
| assert counts["harmful"] == 33 | |
| assert counts["harmless"] == 33 | |
| assert "jailbreak" not in counts | |
| def test_with_jailbreak(self): | |
| pipeline = MagicMock() | |
| pipeline.harmful_prompts = ["a"] * 33 | |
| pipeline.harmless_prompts = ["b"] * 33 | |
| pipeline.jailbreak_prompts = ["c"] * 10 | |
| counts = _extract_prompt_counts(pipeline) | |
| assert counts["jailbreak"] == 10 | |
| # ββ Send behavior βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestSendReport: | |
| def setup_method(self): | |
| _reset_telemetry() | |
| def test_does_not_send_when_disabled(self): | |
| disable_telemetry() | |
| with patch("obliteratus.telemetry._send_sync") as mock_send: | |
| send_report({"test": True}) | |
| mock_send.assert_not_called() | |
| def test_sends_when_enabled(self): | |
| enable_telemetry() | |
| with patch("obliteratus.telemetry._send_sync") as mock_send: | |
| send_report({"test": True}) | |
| import time | |
| time.sleep(0.1) | |
| mock_send.assert_called_once_with({"test": True}) | |
| def test_send_failure_is_silent(self): | |
| enable_telemetry() | |
| with patch("obliteratus.telemetry._send_sync", side_effect=Exception("network down")) as mock_send: | |
| # send_report should not propagate the exception to the caller | |
| send_report({"test": True}) | |
| import time | |
| time.sleep(0.1) # Allow background thread to execute | |
| mock_send.assert_called_once_with({"test": True}) | |
| # ββ Pipeline integration ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _make_mock_pipeline(): | |
| """Build a mock pipeline with all fields the telemetry module reads.""" | |
| p = MagicMock() | |
| p.handle.summary.return_value = { | |
| "architecture": "LlamaForCausalLM", | |
| "num_layers": 32, | |
| "num_heads": 32, | |
| "hidden_size": 4096, | |
| "total_params": 8_000_000_000, | |
| } | |
| p.method = "advanced" | |
| p.n_directions = 4 | |
| p.norm_preserve = True | |
| p.regularization = 0.1 | |
| p.refinement_passes = 2 | |
| p.project_biases = True | |
| p.use_chat_template = True | |
| p.use_whitened_svd = True | |
| p.true_iterative_refinement = False | |
| p.use_jailbreak_contrast = False | |
| p.layer_adaptive_strength = False | |
| p.attention_head_surgery = True | |
| p.safety_neuron_masking = False | |
| p.per_expert_directions = False | |
| p.use_sae_features = False | |
| p.invert_refusal = False | |
| p.project_embeddings = False | |
| p.embed_regularization = 0.5 | |
| p.activation_steering = False | |
| p.steering_strength = 0.3 | |
| p.expert_transplant = False | |
| p.transplant_blend = 0.3 | |
| p.reflection_strength = 2.0 | |
| p.quantization = None | |
| p._quality_metrics = {"perplexity": 5.2, "coherence": 0.8, "refusal_rate": 0.05} | |
| p._strong_layers = [10, 11, 12, 13] | |
| p._stage_durations = {"summon": 3.0, "probe": 12.5, "distill": 4.1, "excise": 2.0, "verify": 8.3, "rebirth": 5.0} | |
| p._excise_modified_count = 128 | |
| # Direction data | |
| d = torch.randn(4096) | |
| d = d / d.norm() | |
| p.refusal_directions = {10: d, 11: d + 0.01 * torch.randn(4096), 12: d, 13: d} | |
| p.refusal_subspaces = {10: torch.randn(4, 4096)} | |
| # Excise details | |
| p._refusal_heads = {10: [(0, 0.9), (3, 0.8)]} | |
| p._sae_directions = {} | |
| p._expert_safety_scores = {} | |
| p._layer_excise_weights = {} | |
| p._expert_directions = {} | |
| p._steering_hooks = [] | |
| # Prompts | |
| p.harmful_prompts = ["x"] * 33 | |
| p.harmless_prompts = ["y"] * 33 | |
| p.jailbreak_prompts = None | |
| return p | |
| class TestPipelineIntegration: | |
| def setup_method(self): | |
| _reset_telemetry() | |
| def test_does_nothing_when_disabled(self): | |
| disable_telemetry() | |
| with patch("obliteratus.telemetry.send_report") as mock_send: | |
| maybe_send_pipeline_report(_make_mock_pipeline()) | |
| mock_send.assert_not_called() | |
| def test_comprehensive_report(self): | |
| """Verify that all data points are extracted from the pipeline.""" | |
| enable_telemetry() | |
| p = _make_mock_pipeline() | |
| with patch("obliteratus.telemetry.send_report") as mock_send: | |
| maybe_send_pipeline_report(p) | |
| mock_send.assert_called_once() | |
| report = mock_send.call_args[0][0] | |
| # Core fields | |
| assert report["schema_version"] == 2 | |
| assert report["model"]["architecture"] == "LlamaForCausalLM" | |
| assert report["method"] == "advanced" | |
| # Method config β check all keys passed through | |
| cfg = report["method_config"] | |
| assert cfg["n_directions"] == 4 | |
| assert cfg["norm_preserve"] is True | |
| assert cfg["use_whitened_svd"] is True | |
| assert cfg["attention_head_surgery"] is True | |
| # Quality metrics | |
| assert report["quality_metrics"]["perplexity"] == 5.2 | |
| assert report["quality_metrics"]["refusal_rate"] == 0.05 | |
| # Stage durations | |
| assert "stage_durations" in report | |
| assert report["stage_durations"]["summon"] == 3.0 | |
| assert report["stage_durations"]["verify"] == 8.3 | |
| # Strong layers | |
| assert report["strong_layers"] == [10, 11, 12, 13] | |
| # Direction stats | |
| assert "direction_stats" in report | |
| assert "direction_norms" in report["direction_stats"] | |
| assert "mean_direction_persistence" in report["direction_stats"] | |
| # Excise details | |
| assert "excise_details" in report | |
| assert report["excise_details"]["modified_count"] == 128 | |
| assert "head_surgery" in report["excise_details"]["used_techniques"] | |
| # Prompt counts | |
| assert report["prompt_counts"]["harmful"] == 33 | |
| assert report["prompt_counts"]["harmless"] == 33 | |
| # Environment | |
| assert "os" in report["environment"] | |
| assert "python_version" in report["environment"] | |
| # ββ Informed pipeline integration ββββββββββββββββββββββββββββββββββββββββ | |
| class _MockInsights: | |
| detected_alignment_method: str = "DPO" | |
| alignment_confidence: float = 0.92 | |
| alignment_probabilities: dict = field(default_factory=lambda: {"DPO": 0.92, "RLHF": 0.05}) | |
| cone_is_polyhedral: bool = True | |
| cone_dimensionality: float = 3.2 | |
| mean_pairwise_cosine: float = 0.45 | |
| direction_specificity: dict = field(default_factory=lambda: {"violence": 0.8}) | |
| cluster_count: int = 3 | |
| direction_persistence: float = 0.87 | |
| mean_refusal_sparsity_index: float = 0.15 | |
| recommended_sparsity: float = 0.1 | |
| use_sparse_surgery: bool = True | |
| estimated_robustness: str = "medium" | |
| self_repair_estimate: float = 0.3 | |
| entanglement_score: float = 0.2 | |
| entangled_layers: list = field(default_factory=lambda: [15, 16]) | |
| clean_layers: list = field(default_factory=lambda: [10, 11, 12]) | |
| recommended_n_directions: int = 6 | |
| recommended_regularization: float = 0.05 | |
| recommended_refinement_passes: int = 3 | |
| recommended_layers: list = field(default_factory=lambda: [10, 11, 12, 13]) | |
| skip_layers: list = field(default_factory=lambda: [15]) | |
| class _MockInformedReport: | |
| insights: _MockInsights = field(default_factory=_MockInsights) | |
| ouroboros_passes: int = 2 | |
| final_refusal_rate: float = 0.02 | |
| analysis_duration: float = 15.3 | |
| total_duration: float = 85.7 | |
| class TestInformedPipelineIntegration: | |
| def setup_method(self): | |
| _reset_telemetry() | |
| def test_does_nothing_when_disabled(self): | |
| disable_telemetry() | |
| with patch("obliteratus.telemetry.send_report") as mock_send: | |
| maybe_send_informed_report(_make_mock_pipeline(), _MockInformedReport()) | |
| mock_send.assert_not_called() | |
| def test_comprehensive_informed_report(self): | |
| enable_telemetry() | |
| p = _make_mock_pipeline() | |
| report_obj = _MockInformedReport() | |
| with patch("obliteratus.telemetry.send_report") as mock_send: | |
| maybe_send_informed_report(p, report_obj) | |
| mock_send.assert_called_once() | |
| report = mock_send.call_args[0][0] | |
| # All base fields present | |
| assert report["schema_version"] == 2 | |
| assert report["model"]["architecture"] == "LlamaForCausalLM" | |
| assert "direction_stats" in report | |
| assert "excise_details" in report | |
| # Analysis insights | |
| ai = report["analysis_insights"] | |
| assert ai["detected_alignment_method"] == "DPO" | |
| assert ai["alignment_confidence"] == 0.92 | |
| assert ai["cone_is_polyhedral"] is True | |
| assert ai["cone_dimensionality"] == 3.2 | |
| assert ai["cluster_count"] == 3 | |
| assert ai["self_repair_estimate"] == 0.3 | |
| assert ai["entanglement_score"] == 0.2 | |
| assert ai["recommended_n_directions"] == 6 | |
| # Informed extras | |
| inf = report["informed"] | |
| assert inf["ouroboros_passes"] == 2 | |
| assert inf["final_refusal_rate"] == 0.02 | |
| assert inf["analysis_duration"] == 15.3 | |
| assert inf["total_duration"] == 85.7 | |
| def test_analysis_insights_filter_unknown_keys(self): | |
| enable_telemetry() | |
| _make_mock_pipeline() | |
| class _BadInsights(_MockInsights): | |
| secret_sauce: str = "should not appear" | |
| report_obj = _MockInformedReport(insights=_BadInsights()) | |
| insights = _extract_analysis_insights(report_obj) | |
| assert "detected_alignment_method" in insights | |
| assert "secret_sauce" not in insights | |
| # ββ Stage duration tracking on pipeline ββββββββββββββββββββββββββββββββββ | |
| class TestStageDurationTracking: | |
| def test_emit_records_durations(self): | |
| """Verify _emit stores durations in _stage_durations dict.""" | |
| from obliteratus.abliterate import AbliterationPipeline | |
| p = AbliterationPipeline.__new__(AbliterationPipeline) | |
| p._stage_durations = {} | |
| p._excise_modified_count = None | |
| p._on_stage = lambda r: None | |
| p._emit("summon", "done", "loaded", duration=3.5) | |
| p._emit("probe", "done", "probed", duration=10.2) | |
| p._emit("excise", "done", "excised", duration=2.1, modified_count=64) | |
| assert p._stage_durations == {"summon": 3.5, "probe": 10.2, "excise": 2.1} | |
| assert p._excise_modified_count == 64 | |
| def test_running_status_does_not_record(self): | |
| """Only 'done' status should record durations.""" | |
| from obliteratus.abliterate import AbliterationPipeline | |
| p = AbliterationPipeline.__new__(AbliterationPipeline) | |
| p._stage_durations = {} | |
| p._excise_modified_count = None | |
| p._on_stage = lambda r: None | |
| p._emit("summon", "running", "loading...", duration=0) | |
| assert p._stage_durations == {} | |
| # ββ Storage helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestStorageHelpers: | |
| """Test persistent storage helper functions.""" | |
| def test_test_writable_valid_dir(self): | |
| with tempfile.TemporaryDirectory() as d: | |
| assert _test_writable(Path(d) / "subdir") | |
| def test_test_writable_unwritable(self): | |
| # /proc is never writable for arbitrary files | |
| assert not _test_writable(Path("/proc/obliteratus_test")) | |
| def test_is_mount_point_existing_path(self): | |
| # Should return a bool without raising for any existing path | |
| result = _is_mount_point(Path("/")) | |
| assert isinstance(result, bool) | |
| def test_is_mount_point_nonexistent(self): | |
| assert not _is_mount_point(Path("/nonexistent_dir_12345")) | |
| def test_storage_diagnostic_returns_dict(self): | |
| diag = storage_diagnostic() | |
| assert isinstance(diag, dict) | |
| assert "telemetry_dir" in diag | |
| assert "is_persistent" in diag | |
| assert "on_hf_spaces" in diag | |
| assert "telemetry_enabled" in diag | |
| assert "data_dir_exists" in diag | |
| # ββ Hub restore ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestHubRestore: | |
| """Test Hub-to-local restore functionality.""" | |
| def setup_method(self): | |
| _reset_telemetry() | |
| # Reset restore state so each test can trigger it | |
| import obliteratus.telemetry as t | |
| t._restore_done = False | |
| def test_restore_skips_when_no_repo(self): | |
| with patch("obliteratus.telemetry._TELEMETRY_REPO", ""): | |
| assert restore_from_hub() == 0 | |
| def test_restore_deduplicates(self): | |
| """Records already in local JSONL should not be re-added.""" | |
| import obliteratus.telemetry as t | |
| with tempfile.TemporaryDirectory() as d: | |
| test_file = Path(d) / "telemetry.jsonl" | |
| existing = {"session_id": "abc", "timestamp": "2025-01-01T00:00:00"} | |
| test_file.write_text(json.dumps(existing) + "\n") | |
| old_file = t.TELEMETRY_FILE | |
| old_repo = t._TELEMETRY_REPO | |
| t.TELEMETRY_FILE = test_file | |
| t._TELEMETRY_REPO = "test/repo" | |
| t._restore_done = False | |
| try: | |
| hub_records = [ | |
| {"session_id": "abc", "timestamp": "2025-01-01T00:00:00"}, # duplicate | |
| {"session_id": "def", "timestamp": "2025-01-02T00:00:00"}, # new | |
| ] | |
| with patch("obliteratus.telemetry.fetch_hub_records", return_value=hub_records): | |
| count = restore_from_hub() | |
| assert count == 1 # Only the new record | |
| # Verify file contents | |
| lines = test_file.read_text().strip().split("\n") | |
| assert len(lines) == 2 # original + 1 new | |
| finally: | |
| t.TELEMETRY_FILE = old_file | |
| t._TELEMETRY_REPO = old_repo | |
| def test_restore_only_runs_once(self): | |
| """Calling restore_from_hub() twice should be a no-op the second time.""" | |
| import obliteratus.telemetry as t | |
| t._restore_done = False | |
| with patch("obliteratus.telemetry._TELEMETRY_REPO", "test/repo"): | |
| with patch("obliteratus.telemetry.fetch_hub_records", return_value=[]): | |
| restore_from_hub() | |
| # Second call should return 0 immediately | |
| assert restore_from_hub() == 0 | |