intelcruit-backend / core /processing.py
Nam Fam
add files
6e24a3d
import re
def parse_transcript_into_qa(transcript_string: str):
"""
Parses a transcript string with speaker labels into a structured Q&A format.
It assumes the first speaker is the interviewer and groups consecutive lines
from the same speaker.
Args:
transcript_string: A multi-line string from the STT service.
e.g., "Speaker 0: Hello\nSpeaker 1: Hi there\nSpeaker 0: How are you?"
Returns:
A list of dictionaries, where each dictionary represents a Q&A pair.
e.g., [{"question": "...", "answer": "..."}]
"""
if not transcript_string or not isinstance(transcript_string, str):
return []
lines = transcript_string.strip().split('\n')
if not lines:
return []
# Define a flexible regex to find speaker labels
speaker_regex = re.compile(r'^(Speaker \d+|Interviewer|Candidate):', re.IGNORECASE)
# Find the first speaker to identify the interviewer
first_speaker_match = speaker_regex.search(lines[0])
if not first_speaker_match:
# If the format is unexpected, we can't proceed reliably.
return []
interviewer_id = first_speaker_match.group(1).strip(':')
qa_pairs = []
current_question = ""
current_answer = ""
last_speaker = None
for line in lines:
line = line.strip()
if not line:
continue
speaker_match = speaker_regex.search(line)
text_content = re.sub(speaker_regex, '', line, 1).strip()
if not speaker_match:
# If a line doesn't have a speaker, append it to the last utterance
if last_speaker and interviewer_id.lower() in last_speaker.lower():
current_question += " " + line
elif last_speaker:
current_answer += " " + line
continue
current_speaker = speaker_match.group(1).strip(':')
text = text_content
# Check if the current speaker is the interviewer, case-insensitively
if interviewer_id.lower() in current_speaker.lower():
# New question starts. If we have a completed Q&A, save it.
if current_question and current_answer:
qa_pairs.append({
"qa_id": len(qa_pairs) + 1,
"question": current_question.strip(),
"answer": current_answer.strip()
})
current_answer = ""
current_question = text
else:
# This handles consecutive questions or the very first question
current_question = (current_question + " " + text).strip()
else:
# This is an answer from the candidate
current_answer = (current_answer + " " + text).strip()
last_speaker = current_speaker
# Add the last Q&A pair to the list
if current_question and current_answer:
qa_pairs.append({
"qa_id": len(qa_pairs) + 1,
"question": current_question.strip(),
"answer": current_answer.strip()
})
return qa_pairs
def test_parse_transcript_into_qa():
"""Test suite for the parse_transcript_into_qa function."""
print("Running tests for parse_transcript_into_qa...")
# Test Case 1: Basic 'Speaker' format
test_1_input = """
Speaker 0: What is your greatest strength?
Speaker 1: My greatest strength is my persistence.
"""
test_1_expected = [{'qa_id': 1, 'question': 'What is your greatest strength?', 'answer': 'My greatest strength is my persistence.'}]
assert parse_transcript_into_qa(test_1_input) == test_1_expected, "Test Case 1 Failed"
print("Test Case 1 Passed")
# Test Case 2: 'Interviewer/Candidate' format, case-insensitive
test_2_input = """
interviewer: Tell me about a challenge you faced.
Candidate: I once had to meet a very tight deadline.
"""
test_2_expected = [{'qa_id': 1, 'question': 'Tell me about a challenge you faced.', 'answer': 'I once had to meet a very tight deadline.'}]
assert parse_transcript_into_qa(test_2_input) == test_2_expected, "Test Case 2 Failed"
print("Test Case 2 Passed")
# Test Case 3: Multi-line answer and question
test_3_input = """
Speaker 0: Can you describe your experience with Python?
It is a key requirement for this role.
Speaker 1: Certainly. I have over five years of experience.
I have used it for web development and data analysis.
"""
test_3_expected = [{'qa_id': 1, 'question': 'Can you describe your experience with Python? It is a key requirement for this role.', 'answer': 'Certainly. I have over five years of experience. I have used it for web development and data analysis.'}]
assert parse_transcript_into_qa(test_3_input) == test_3_expected, "Test Case 3 Failed"
print("Test Case 3 Passed")
# Test Case 4: Multiple Q&A pairs
test_4_input = """
Interviewer: First question?
Candidate: First answer.
Interviewer: Second question?
Candidate: Second answer.
"""
test_4_expected = [
{'qa_id': 1, 'question': 'First question?', 'answer': 'First answer.'},
{'qa_id': 2, 'question': 'Second question?', 'answer': 'Second answer.'}
]
assert parse_transcript_into_qa(test_4_input) == test_4_expected, "Test Case 4 Failed"
print("Test Case 4 Passed")
# Test Case 5: Edge case - Empty and whitespace strings
assert parse_transcript_into_qa("") == [], "Test Case 5.1 Failed"
assert parse_transcript_into_qa(" \n ") == [], "Test Case 5.2 Failed"
print("Test Case 5 Passed")
# Test Case 6: Edge case - No speaker labels
test_6_input = "This is just a block of text without any speakers."
assert parse_transcript_into_qa(test_6_input) == [], "Test Case 6 Failed"
print("Test Case 6 Passed")
# Test Case 7: Transcript starts with Candidate
test_7_input = """
Candidate: I have a question first.
Interviewer: Please go ahead.
"""
# Expects the first speaker to be the interviewer, so this will be parsed with C as Q and I as A
test_7_expected = [{'qa_id': 1, 'question': 'I have a question first.', 'answer': 'Please go ahead.'}]
assert parse_transcript_into_qa(test_7_input) == test_7_expected, "Test Case 7 Failed"
print("Test Case 7 Passed")
print("\nAll tests passed successfully!")
if __name__ == '__main__':
# First, run the standard test suite
# test_parse_transcript_into_qa()
# Then, test with the real example file
import json
print("\n" + "-"*50)
print("--- Testing with example_interview_transcipt.txt ---")
print("-"*50)
# The script is in core/, the example is in examples/
# So we need to go up one level and then into examples/
file_path = 'e:/projects/intelcruit/backend/examples/example_interview_transcipt.txt'
try:
with open(file_path, 'r', encoding='utf-8') as f:
transcript_content = f.read()
qa_pairs = parse_transcript_into_qa(transcript_content)
print(f"\nFound {len(qa_pairs)} Q&A pairs.")
output_file_path = 'test_output.json'
print(f"Writing output to {output_file_path}...")
with open(output_file_path, 'w', encoding='utf-8') as f_out:
json.dump(qa_pairs, f_out, indent=2, ensure_ascii=False)
print(f"Successfully wrote results to {output_file_path}")
except FileNotFoundError:
print(f"\nError: Test file not found at {file_path}")
except Exception as e:
print(f"\nAn error occurred: {e}")