File size: 7,516 Bytes
6e24a3d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import re

def parse_transcript_into_qa(transcript_string: str):
    """
    Parses a transcript string with speaker labels into a structured Q&A format.
    It assumes the first speaker is the interviewer and groups consecutive lines
    from the same speaker.

    Args:
        transcript_string: A multi-line string from the STT service.
        e.g., "Speaker 0: Hello\nSpeaker 1: Hi there\nSpeaker 0: How are you?"

    Returns:
        A list of dictionaries, where each dictionary represents a Q&A pair.
        e.g., [{"question": "...", "answer": "..."}]
    """
    if not transcript_string or not isinstance(transcript_string, str):
        return []

    lines = transcript_string.strip().split('\n')
    if not lines:
        return []

    # Define a flexible regex to find speaker labels
    speaker_regex = re.compile(r'^(Speaker \d+|Interviewer|Candidate):', re.IGNORECASE)

    # Find the first speaker to identify the interviewer
    first_speaker_match = speaker_regex.search(lines[0])
    if not first_speaker_match:
        # If the format is unexpected, we can't proceed reliably.
        return []
    interviewer_id = first_speaker_match.group(1).strip(':')

    qa_pairs = []
    current_question = ""
    current_answer = ""
    last_speaker = None

    for line in lines:
        line = line.strip()
        if not line:
            continue

        speaker_match = speaker_regex.search(line)
        text_content = re.sub(speaker_regex, '', line, 1).strip()
        if not speaker_match:
            # If a line doesn't have a speaker, append it to the last utterance
            if last_speaker and interviewer_id.lower() in last_speaker.lower():
                current_question += " " + line
            elif last_speaker:
                current_answer += " " + line
            continue

        current_speaker = speaker_match.group(1).strip(':')
        text = text_content

        # Check if the current speaker is the interviewer, case-insensitively
        if interviewer_id.lower() in current_speaker.lower():
            # New question starts. If we have a completed Q&A, save it.
            if current_question and current_answer:
                qa_pairs.append({
                    "qa_id": len(qa_pairs) + 1,
                    "question": current_question.strip(), 
                    "answer": current_answer.strip()
                })
                current_answer = ""
                current_question = text
            else:
                # This handles consecutive questions or the very first question
                current_question = (current_question + " " + text).strip()
        else:
            # This is an answer from the candidate
            current_answer = (current_answer + " " + text).strip()
        
        last_speaker = current_speaker

    # Add the last Q&A pair to the list
    if current_question and current_answer:
        qa_pairs.append({
            "qa_id": len(qa_pairs) + 1,
            "question": current_question.strip(), 
            "answer": current_answer.strip()
        })

    return qa_pairs


def test_parse_transcript_into_qa():
    """Test suite for the parse_transcript_into_qa function."""
    print("Running tests for parse_transcript_into_qa...")

    # Test Case 1: Basic 'Speaker' format
    test_1_input = """
Speaker 0: What is your greatest strength?
Speaker 1: My greatest strength is my persistence.
"""
    test_1_expected = [{'qa_id': 1, 'question': 'What is your greatest strength?', 'answer': 'My greatest strength is my persistence.'}]
    assert parse_transcript_into_qa(test_1_input) == test_1_expected, "Test Case 1 Failed"
    print("Test Case 1 Passed")

    # Test Case 2: 'Interviewer/Candidate' format, case-insensitive
    test_2_input = """
interviewer: Tell me about a challenge you faced.
Candidate: I once had to meet a very tight deadline.
"""
    test_2_expected = [{'qa_id': 1, 'question': 'Tell me about a challenge you faced.', 'answer': 'I once had to meet a very tight deadline.'}]
    assert parse_transcript_into_qa(test_2_input) == test_2_expected, "Test Case 2 Failed"
    print("Test Case 2 Passed")

    # Test Case 3: Multi-line answer and question
    test_3_input = """
Speaker 0: Can you describe your experience with Python?
It is a key requirement for this role.
Speaker 1: Certainly. I have over five years of experience.
I have used it for web development and data analysis.
"""
    test_3_expected = [{'qa_id': 1, 'question': 'Can you describe your experience with Python? It is a key requirement for this role.', 'answer': 'Certainly. I have over five years of experience. I have used it for web development and data analysis.'}]
    assert parse_transcript_into_qa(test_3_input) == test_3_expected, "Test Case 3 Failed"
    print("Test Case 3 Passed")

    # Test Case 4: Multiple Q&A pairs
    test_4_input = """
Interviewer: First question?
Candidate: First answer.
Interviewer: Second question?
Candidate: Second answer.
"""
    test_4_expected = [
        {'qa_id': 1, 'question': 'First question?', 'answer': 'First answer.'},
        {'qa_id': 2, 'question': 'Second question?', 'answer': 'Second answer.'}
    ]
    assert parse_transcript_into_qa(test_4_input) == test_4_expected, "Test Case 4 Failed"
    print("Test Case 4 Passed")

    # Test Case 5: Edge case - Empty and whitespace strings
    assert parse_transcript_into_qa("") == [], "Test Case 5.1 Failed"
    assert parse_transcript_into_qa("   \n   ") == [], "Test Case 5.2 Failed"
    print("Test Case 5 Passed")

    # Test Case 6: Edge case - No speaker labels
    test_6_input = "This is just a block of text without any speakers."
    assert parse_transcript_into_qa(test_6_input) == [], "Test Case 6 Failed"
    print("Test Case 6 Passed")

    # Test Case 7: Transcript starts with Candidate
    test_7_input = """
Candidate: I have a question first.
Interviewer: Please go ahead.
"""
    # Expects the first speaker to be the interviewer, so this will be parsed with C as Q and I as A
    test_7_expected = [{'qa_id': 1, 'question': 'I have a question first.', 'answer': 'Please go ahead.'}]
    assert parse_transcript_into_qa(test_7_input) == test_7_expected, "Test Case 7 Failed"
    print("Test Case 7 Passed")

    print("\nAll tests passed successfully!")

if __name__ == '__main__':
    # First, run the standard test suite
    # test_parse_transcript_into_qa()

    # Then, test with the real example file
    import json
    print("\n" + "-"*50)
    print("--- Testing with example_interview_transcipt.txt ---")
    print("-"*50)

    # The script is in core/, the example is in examples/
    # So we need to go up one level and then into examples/
    file_path = 'e:/projects/intelcruit/backend/examples/example_interview_transcipt.txt'
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            transcript_content = f.read()
        
        qa_pairs = parse_transcript_into_qa(transcript_content)
        
        print(f"\nFound {len(qa_pairs)} Q&A pairs.")
        output_file_path = 'test_output.json'
        print(f"Writing output to {output_file_path}...")
        with open(output_file_path, 'w', encoding='utf-8') as f_out:
            json.dump(qa_pairs, f_out, indent=2, ensure_ascii=False)
        print(f"Successfully wrote results to {output_file_path}")
        
    except FileNotFoundError:
        print(f"\nError: Test file not found at {file_path}")
    except Exception as e:
        print(f"\nAn error occurred: {e}")