import re def parse_transcript_into_qa(transcript_string: str): """ Parses a transcript string with speaker labels into a structured Q&A format. It assumes the first speaker is the interviewer and groups consecutive lines from the same speaker. Args: transcript_string: A multi-line string from the STT service. e.g., "Speaker 0: Hello\nSpeaker 1: Hi there\nSpeaker 0: How are you?" Returns: A list of dictionaries, where each dictionary represents a Q&A pair. e.g., [{"question": "...", "answer": "..."}] """ if not transcript_string or not isinstance(transcript_string, str): return [] lines = transcript_string.strip().split('\n') if not lines: return [] # Define a flexible regex to find speaker labels speaker_regex = re.compile(r'^(Speaker \d+|Interviewer|Candidate):', re.IGNORECASE) # Find the first speaker to identify the interviewer first_speaker_match = speaker_regex.search(lines[0]) if not first_speaker_match: # If the format is unexpected, we can't proceed reliably. return [] interviewer_id = first_speaker_match.group(1).strip(':') qa_pairs = [] current_question = "" current_answer = "" last_speaker = None for line in lines: line = line.strip() if not line: continue speaker_match = speaker_regex.search(line) text_content = re.sub(speaker_regex, '', line, 1).strip() if not speaker_match: # If a line doesn't have a speaker, append it to the last utterance if last_speaker and interviewer_id.lower() in last_speaker.lower(): current_question += " " + line elif last_speaker: current_answer += " " + line continue current_speaker = speaker_match.group(1).strip(':') text = text_content # Check if the current speaker is the interviewer, case-insensitively if interviewer_id.lower() in current_speaker.lower(): # New question starts. If we have a completed Q&A, save it. if current_question and current_answer: qa_pairs.append({ "qa_id": len(qa_pairs) + 1, "question": current_question.strip(), "answer": current_answer.strip() }) current_answer = "" current_question = text else: # This handles consecutive questions or the very first question current_question = (current_question + " " + text).strip() else: # This is an answer from the candidate current_answer = (current_answer + " " + text).strip() last_speaker = current_speaker # Add the last Q&A pair to the list if current_question and current_answer: qa_pairs.append({ "qa_id": len(qa_pairs) + 1, "question": current_question.strip(), "answer": current_answer.strip() }) return qa_pairs def test_parse_transcript_into_qa(): """Test suite for the parse_transcript_into_qa function.""" print("Running tests for parse_transcript_into_qa...") # Test Case 1: Basic 'Speaker' format test_1_input = """ Speaker 0: What is your greatest strength? Speaker 1: My greatest strength is my persistence. """ test_1_expected = [{'qa_id': 1, 'question': 'What is your greatest strength?', 'answer': 'My greatest strength is my persistence.'}] assert parse_transcript_into_qa(test_1_input) == test_1_expected, "Test Case 1 Failed" print("Test Case 1 Passed") # Test Case 2: 'Interviewer/Candidate' format, case-insensitive test_2_input = """ interviewer: Tell me about a challenge you faced. Candidate: I once had to meet a very tight deadline. """ test_2_expected = [{'qa_id': 1, 'question': 'Tell me about a challenge you faced.', 'answer': 'I once had to meet a very tight deadline.'}] assert parse_transcript_into_qa(test_2_input) == test_2_expected, "Test Case 2 Failed" print("Test Case 2 Passed") # Test Case 3: Multi-line answer and question test_3_input = """ Speaker 0: Can you describe your experience with Python? It is a key requirement for this role. Speaker 1: Certainly. I have over five years of experience. I have used it for web development and data analysis. """ test_3_expected = [{'qa_id': 1, 'question': 'Can you describe your experience with Python? It is a key requirement for this role.', 'answer': 'Certainly. I have over five years of experience. I have used it for web development and data analysis.'}] assert parse_transcript_into_qa(test_3_input) == test_3_expected, "Test Case 3 Failed" print("Test Case 3 Passed") # Test Case 4: Multiple Q&A pairs test_4_input = """ Interviewer: First question? Candidate: First answer. Interviewer: Second question? Candidate: Second answer. """ test_4_expected = [ {'qa_id': 1, 'question': 'First question?', 'answer': 'First answer.'}, {'qa_id': 2, 'question': 'Second question?', 'answer': 'Second answer.'} ] assert parse_transcript_into_qa(test_4_input) == test_4_expected, "Test Case 4 Failed" print("Test Case 4 Passed") # Test Case 5: Edge case - Empty and whitespace strings assert parse_transcript_into_qa("") == [], "Test Case 5.1 Failed" assert parse_transcript_into_qa(" \n ") == [], "Test Case 5.2 Failed" print("Test Case 5 Passed") # Test Case 6: Edge case - No speaker labels test_6_input = "This is just a block of text without any speakers." assert parse_transcript_into_qa(test_6_input) == [], "Test Case 6 Failed" print("Test Case 6 Passed") # Test Case 7: Transcript starts with Candidate test_7_input = """ Candidate: I have a question first. Interviewer: Please go ahead. """ # Expects the first speaker to be the interviewer, so this will be parsed with C as Q and I as A test_7_expected = [{'qa_id': 1, 'question': 'I have a question first.', 'answer': 'Please go ahead.'}] assert parse_transcript_into_qa(test_7_input) == test_7_expected, "Test Case 7 Failed" print("Test Case 7 Passed") print("\nAll tests passed successfully!") if __name__ == '__main__': # First, run the standard test suite # test_parse_transcript_into_qa() # Then, test with the real example file import json print("\n" + "-"*50) print("--- Testing with example_interview_transcipt.txt ---") print("-"*50) # The script is in core/, the example is in examples/ # So we need to go up one level and then into examples/ file_path = 'e:/projects/intelcruit/backend/examples/example_interview_transcipt.txt' try: with open(file_path, 'r', encoding='utf-8') as f: transcript_content = f.read() qa_pairs = parse_transcript_into_qa(transcript_content) print(f"\nFound {len(qa_pairs)} Q&A pairs.") output_file_path = 'test_output.json' print(f"Writing output to {output_file_path}...") with open(output_file_path, 'w', encoding='utf-8') as f_out: json.dump(qa_pairs, f_out, indent=2, ensure_ascii=False) print(f"Successfully wrote results to {output_file_path}") except FileNotFoundError: print(f"\nError: Test file not found at {file_path}") except Exception as e: print(f"\nAn error occurred: {e}")