Spaces:
Sleeping
Sleeping
| import re | |
| def parse_transcript_into_qa(transcript_string: str): | |
| """ | |
| Parses a transcript string with speaker labels into a structured Q&A format. | |
| It assumes the first speaker is the interviewer and groups consecutive lines | |
| from the same speaker. | |
| Args: | |
| transcript_string: A multi-line string from the STT service. | |
| e.g., "Speaker 0: Hello\nSpeaker 1: Hi there\nSpeaker 0: How are you?" | |
| Returns: | |
| A list of dictionaries, where each dictionary represents a Q&A pair. | |
| e.g., [{"question": "...", "answer": "..."}] | |
| """ | |
| if not transcript_string or not isinstance(transcript_string, str): | |
| return [] | |
| lines = transcript_string.strip().split('\n') | |
| if not lines: | |
| return [] | |
| # Define a flexible regex to find speaker labels | |
| speaker_regex = re.compile(r'^(Speaker \d+|Interviewer|Candidate):', re.IGNORECASE) | |
| # Find the first speaker to identify the interviewer | |
| first_speaker_match = speaker_regex.search(lines[0]) | |
| if not first_speaker_match: | |
| # If the format is unexpected, we can't proceed reliably. | |
| return [] | |
| interviewer_id = first_speaker_match.group(1).strip(':') | |
| qa_pairs = [] | |
| current_question = "" | |
| current_answer = "" | |
| last_speaker = None | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| speaker_match = speaker_regex.search(line) | |
| text_content = re.sub(speaker_regex, '', line, 1).strip() | |
| if not speaker_match: | |
| # If a line doesn't have a speaker, append it to the last utterance | |
| if last_speaker and interviewer_id.lower() in last_speaker.lower(): | |
| current_question += " " + line | |
| elif last_speaker: | |
| current_answer += " " + line | |
| continue | |
| current_speaker = speaker_match.group(1).strip(':') | |
| text = text_content | |
| # Check if the current speaker is the interviewer, case-insensitively | |
| if interviewer_id.lower() in current_speaker.lower(): | |
| # New question starts. If we have a completed Q&A, save it. | |
| if current_question and current_answer: | |
| qa_pairs.append({ | |
| "qa_id": len(qa_pairs) + 1, | |
| "question": current_question.strip(), | |
| "answer": current_answer.strip() | |
| }) | |
| current_answer = "" | |
| current_question = text | |
| else: | |
| # This handles consecutive questions or the very first question | |
| current_question = (current_question + " " + text).strip() | |
| else: | |
| # This is an answer from the candidate | |
| current_answer = (current_answer + " " + text).strip() | |
| last_speaker = current_speaker | |
| # Add the last Q&A pair to the list | |
| if current_question and current_answer: | |
| qa_pairs.append({ | |
| "qa_id": len(qa_pairs) + 1, | |
| "question": current_question.strip(), | |
| "answer": current_answer.strip() | |
| }) | |
| return qa_pairs | |
| def test_parse_transcript_into_qa(): | |
| """Test suite for the parse_transcript_into_qa function.""" | |
| print("Running tests for parse_transcript_into_qa...") | |
| # Test Case 1: Basic 'Speaker' format | |
| test_1_input = """ | |
| Speaker 0: What is your greatest strength? | |
| Speaker 1: My greatest strength is my persistence. | |
| """ | |
| test_1_expected = [{'qa_id': 1, 'question': 'What is your greatest strength?', 'answer': 'My greatest strength is my persistence.'}] | |
| assert parse_transcript_into_qa(test_1_input) == test_1_expected, "Test Case 1 Failed" | |
| print("Test Case 1 Passed") | |
| # Test Case 2: 'Interviewer/Candidate' format, case-insensitive | |
| test_2_input = """ | |
| interviewer: Tell me about a challenge you faced. | |
| Candidate: I once had to meet a very tight deadline. | |
| """ | |
| test_2_expected = [{'qa_id': 1, 'question': 'Tell me about a challenge you faced.', 'answer': 'I once had to meet a very tight deadline.'}] | |
| assert parse_transcript_into_qa(test_2_input) == test_2_expected, "Test Case 2 Failed" | |
| print("Test Case 2 Passed") | |
| # Test Case 3: Multi-line answer and question | |
| test_3_input = """ | |
| Speaker 0: Can you describe your experience with Python? | |
| It is a key requirement for this role. | |
| Speaker 1: Certainly. I have over five years of experience. | |
| I have used it for web development and data analysis. | |
| """ | |
| test_3_expected = [{'qa_id': 1, 'question': 'Can you describe your experience with Python? It is a key requirement for this role.', 'answer': 'Certainly. I have over five years of experience. I have used it for web development and data analysis.'}] | |
| assert parse_transcript_into_qa(test_3_input) == test_3_expected, "Test Case 3 Failed" | |
| print("Test Case 3 Passed") | |
| # Test Case 4: Multiple Q&A pairs | |
| test_4_input = """ | |
| Interviewer: First question? | |
| Candidate: First answer. | |
| Interviewer: Second question? | |
| Candidate: Second answer. | |
| """ | |
| test_4_expected = [ | |
| {'qa_id': 1, 'question': 'First question?', 'answer': 'First answer.'}, | |
| {'qa_id': 2, 'question': 'Second question?', 'answer': 'Second answer.'} | |
| ] | |
| assert parse_transcript_into_qa(test_4_input) == test_4_expected, "Test Case 4 Failed" | |
| print("Test Case 4 Passed") | |
| # Test Case 5: Edge case - Empty and whitespace strings | |
| assert parse_transcript_into_qa("") == [], "Test Case 5.1 Failed" | |
| assert parse_transcript_into_qa(" \n ") == [], "Test Case 5.2 Failed" | |
| print("Test Case 5 Passed") | |
| # Test Case 6: Edge case - No speaker labels | |
| test_6_input = "This is just a block of text without any speakers." | |
| assert parse_transcript_into_qa(test_6_input) == [], "Test Case 6 Failed" | |
| print("Test Case 6 Passed") | |
| # Test Case 7: Transcript starts with Candidate | |
| test_7_input = """ | |
| Candidate: I have a question first. | |
| Interviewer: Please go ahead. | |
| """ | |
| # Expects the first speaker to be the interviewer, so this will be parsed with C as Q and I as A | |
| test_7_expected = [{'qa_id': 1, 'question': 'I have a question first.', 'answer': 'Please go ahead.'}] | |
| assert parse_transcript_into_qa(test_7_input) == test_7_expected, "Test Case 7 Failed" | |
| print("Test Case 7 Passed") | |
| print("\nAll tests passed successfully!") | |
| if __name__ == '__main__': | |
| # First, run the standard test suite | |
| # test_parse_transcript_into_qa() | |
| # Then, test with the real example file | |
| import json | |
| print("\n" + "-"*50) | |
| print("--- Testing with example_interview_transcipt.txt ---") | |
| print("-"*50) | |
| # The script is in core/, the example is in examples/ | |
| # So we need to go up one level and then into examples/ | |
| file_path = 'e:/projects/intelcruit/backend/examples/example_interview_transcipt.txt' | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| transcript_content = f.read() | |
| qa_pairs = parse_transcript_into_qa(transcript_content) | |
| print(f"\nFound {len(qa_pairs)} Q&A pairs.") | |
| output_file_path = 'test_output.json' | |
| print(f"Writing output to {output_file_path}...") | |
| with open(output_file_path, 'w', encoding='utf-8') as f_out: | |
| json.dump(qa_pairs, f_out, indent=2, ensure_ascii=False) | |
| print(f"Successfully wrote results to {output_file_path}") | |
| except FileNotFoundError: | |
| print(f"\nError: Test file not found at {file_path}") | |
| except Exception as e: | |
| print(f"\nAn error occurred: {e}") | |