File size: 7,744 Bytes
7f99b5c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import asyncio
import json
import logging
from datetime import datetime, timezone
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession

# Setup configuration overrides for testing
import os
os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///:memory:"  # Use in-memory SQLite for testing

from db import init_db, async_session_maker, engine
from models import ContentSubmission, ModerationLog, ResolutionRequest, FeedbackRequest
from moderation.detector import ModerationDetector
from services.gemini_service import GeminiService
from api.moderation import check_submission, get_moderation_queue, resolve_item, submit_feedback

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("test_backend")

async def test_all():
    logger.info("Initializing in-memory test database...")
    await init_db()
    
    # Open session
    async with async_session_maker() as session:
        # --- TEST 1: Spam Heuristics ---
        logger.info("Running Test 1: Spam Heuristics...")
        clean_text = "This is a wonderful discussion about Reddit Devvit. I love building tools."
        spam_text = "Join my telegram group now! Click here to get free giveaway bonus and make money quick!"
        
        is_spam_clean, score_clean, _ = ModerationDetector.check_spam_heuristics(clean_text)
        is_spam_dirty, score_dirty, reason_dirty = ModerationDetector.check_spam_heuristics(spam_text)
        
        assert not is_spam_clean, "Clean text flagged as spam incorrectly"
        assert is_spam_dirty, "Spam text was not flagged as spam"
        assert score_dirty > 0.6, f"Expected higher spam score for dirty text, got {score_dirty}"
        logger.info(f"Test 1 passed! Spam detected successfully. Reason: {reason_dirty}")

        # --- TEST 2: Toxicity and Submission Logic ---
        logger.info("Running Test 2: Toxicity Detection and DB logging...")
        toxic_post = ContentSubmission(
            id="t3_toxic1",
            type="post",
            subreddit="testsub",
            title="I hate this place",
            content="This is the worst forum ever, you all are complete idiots. Shut up!",
            author="angry_user"
        )
        
        # Analyze submission
        log = await ModerationDetector.analyze_submission(session, toxic_post)
        assert log.is_toxic or log.is_spam or log.toxicity_score > 0.7, "Expected toxicity flag for insulting text"
        logger.info(f"Toxicity detected: Score={log.toxicity_score}, Reason: {log.toxicity_reason}")
        
        # Save to DB
        session.add(log)
        await session.commit()
        await session.refresh(log)
        
        # Query from DB
        db_log = await session.get(ModerationLog, "t3_toxic1")
        assert db_log is not None, "Failed to retrieve log from database"
        assert db_log.author == "angry_user", "Database values corrupted"
        logger.info("Test 2 passed! Toxicity and DB persistence verified.")

        # --- TEST 3: Duplicate detection ---
        logger.info("Running Test 3: Duplicate Post Detection...")
        # Since we're in mock mode if API key is missing, get_embedding returns [0.0]*768
        # Let's manually set identical embeddings in the DB to test the cosine similarity logic
        post1 = ModerationLog(
            id="t3_post1",
            type="post",
            subreddit="testsub",
            title="First Post Title",
            content="A very unique post content about development.",
            author="dev1",
            embedding_json=json.dumps([1.0, 0.5, 0.0])
        )
        session.add(post1)
        await session.commit()
        
        # Check cosine similarity logic
        v1 = [1.0, 0.5, 0.0]
        v2 = [1.0, 0.5, 0.0]  # Exact match
        v3 = [0.0, 0.0, 1.0]  # Orthogonal
        
        sim_match = ModerationDetector.calculate_cosine_similarity(v1, v2)
        sim_ortho = ModerationDetector.calculate_cosine_similarity(v1, v3)
        
        assert abs(sim_match - 1.0) < 1e-5, f"Expected 1.0 similarity, got {sim_match}"
        assert abs(sim_ortho - 0.0) < 1e-5, f"Expected 0.0 similarity, got {sim_ortho}"
        
        # Run detector duplicate check
        is_dup, score, reason, parent = await ModerationDetector.detect_duplicates(
            session,
            ContentSubmission(id="t3_post2", type="post", subreddit="testsub", title="First Post Title", content="Duplicate", author="dev2"),
            embedding=[1.0, 0.5, 0.0]
        )
        assert is_dup, "Duplicate was not identified"
        assert parent == "t3_post1", "Failed to identify correct original post parent"
        logger.info(f"Test 3 passed! Similarity match: {score:.1%}. Reason: {reason}")

        # --- TEST 4: Escalation detection ---
        logger.info("Running Test 4: Thread Escalation Heuristics...")
        # Populate thread comments
        comment1 = ModerationLog(
            id="t1_c1",
            type="comment",
            subreddit="testsub",
            content="I disagree with you.",
            author="user1"
        )
        comment2 = ModerationLog(
            id="t1_c2",
            type="comment",
            subreddit="testsub",
            content="You are stupid for disagreeing, idiot.",
            author="user2",
            is_toxic=True,
            toxicity_score=0.85
        )
        session.add(comment1)
        session.add(comment2)
        await session.commit()
        
        new_comment = ContentSubmission(
            id="t1_c3",
            type="comment",
            subreddit="testsub",
            content="No you are the idiot, shut up and go away!",
            author="user1",
            parent_id="toxic1"  # Belongs to post 'toxic1'
        )
        
        # Set up mock escalation if in mock mode
        # The new comment plus comment2 triggers mock escalation
        is_esc, esc_score, esc_reason = await ModerationDetector.detect_escalation(session, new_comment)
        logger.info(f"Escalation check complete. Is Escalating: {is_esc}, Score: {esc_score}, Reason: {esc_reason}")
        logger.info("Test 4 passed!")

        # --- TEST 5: API Endpoint Operations ---
        logger.info("Running Test 5: Queue, Resolution, and Feedback API logic...")
        # Get moderation queue (should return the flagged toxic post t3_toxic1)
        queue = await get_moderation_queue(subreddit="testsub", db=session)
        assert len(queue) >= 1, "Expected at least 1 flagged item in queue"
        assert queue[0].id == "t3_toxic1", "Expected t3_toxic1 in the queue"
        
        # Resolve item
        res = await resolve_item(
            request=ResolutionRequest(id="t3_toxic1", action="remove", moderator="mod_hero"),
            db=session
        )
        assert res["status"] == "success"
        assert res["new_status"] == "removed"
        
        # Verify it's no longer in the flagged queue
        queue_after = await get_moderation_queue(subreddit="testsub", db=session)
        assert not any(item.id == "t3_toxic1" for item in queue_after), "Resolved item still in queue"
        
        # Submit feedback
        fb = await submit_feedback(
            request=FeedbackRequest(id="t3_toxic1", is_correct=True, reason="Definitely toxic content"),
            db=session
        )
        assert fb["status"] == "success"
        
        db_log_after = await session.get(ModerationLog, "t3_toxic1")
        assert db_log_after.feedback_correct is True, "Feedback not saved"
        logger.info("Test 5 passed! API queue and resolution workflow verified.")

    # Clean up
    await engine.dispose()
    logger.info("All backend tests completed successfully!")

if __name__ == "__main__":
    asyncio.run(test_all())