|
|
import json
|
|
|
from pathlib import Path
|
|
|
from sentence_transformers import SentenceTransformer, util
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_subject(subject: str) -> str:
|
|
|
if not subject:
|
|
|
return ""
|
|
|
subject = subject.strip().lower()
|
|
|
subject = re.sub(r"\s+", " ", subject)
|
|
|
subject = re.sub(r"[^\w\s]", "", subject)
|
|
|
return subject
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_all_questions(json_files):
|
|
|
part_a = []
|
|
|
part_b = []
|
|
|
subject = None
|
|
|
print("from all loaded question papers spliting into 3 parts subject, part a and part b")
|
|
|
for file in json_files:
|
|
|
data = json.loads(Path(file).read_text(encoding="utf-8"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current_subject_raw = data.get("subject") or ""
|
|
|
current_subject = normalize_subject(current_subject_raw)
|
|
|
|
|
|
|
|
|
if not current_subject:
|
|
|
pass
|
|
|
|
|
|
|
|
|
elif subject is None:
|
|
|
subject = current_subject
|
|
|
display_subject = current_subject_raw.strip()
|
|
|
|
|
|
|
|
|
elif subject != current_subject:
|
|
|
raise ValueError(
|
|
|
f"β Mixed subjects detected: '{subject}' vs '{current_subject}'"
|
|
|
)
|
|
|
|
|
|
for sq in data.get("PART_A", []):
|
|
|
question = sq.get("question")
|
|
|
|
|
|
if not isinstance(question, str):
|
|
|
continue
|
|
|
question = question.strip()
|
|
|
if not question:
|
|
|
continue
|
|
|
part_a.append({
|
|
|
"text": question,
|
|
|
"images": [sq.get("image")] if sq.get("image") else []
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for block in data["PART_B"]:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for sq in block.get("subquestions", []):
|
|
|
question = sq.get("question")
|
|
|
|
|
|
if not isinstance(question, str):
|
|
|
continue
|
|
|
question = question.strip()
|
|
|
if not question:
|
|
|
continue
|
|
|
part_b.append({
|
|
|
"text": question,
|
|
|
"images": [sq.get("image")] if sq.get("image") else []
|
|
|
})
|
|
|
|
|
|
print("data splited into 3 parts subject, part a and part b successfully")
|
|
|
return subject, part_a, part_b
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def semantic_frequency(questions, threshold=0.70):
|
|
|
print("clustring started for part using all-mpnet-base-v2 model")
|
|
|
model = SentenceTransformer("all-mpnet-base-v2")
|
|
|
texts = [q["text"] for q in questions]
|
|
|
embeddings = model.encode(texts, convert_to_tensor=True, normalize_embeddings=True)
|
|
|
|
|
|
visited = set()
|
|
|
results = []
|
|
|
|
|
|
for i in range(len(texts)):
|
|
|
if i in visited:
|
|
|
continue
|
|
|
|
|
|
cluster = [i]
|
|
|
visited.add(i)
|
|
|
|
|
|
sims = util.cos_sim(embeddings[i], embeddings)[0]
|
|
|
|
|
|
for j in range(len(texts)):
|
|
|
if j not in visited and sims[j] >= threshold:
|
|
|
cluster.append(j)
|
|
|
visited.add(j)
|
|
|
|
|
|
|
|
|
rep_question = texts[cluster[0]]
|
|
|
|
|
|
|
|
|
images = set()
|
|
|
for idx in cluster:
|
|
|
images.update(questions[idx]["images"])
|
|
|
|
|
|
results.append({
|
|
|
"question": rep_question,
|
|
|
"frequency": len(cluster),
|
|
|
"images": list(images) if images else None
|
|
|
})
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_semantic_frequency_multiple(input_jsons, output_json):
|
|
|
print("in semantic frequency multiple function")
|
|
|
subject, part_a, part_b = load_all_questions(input_jsons)
|
|
|
|
|
|
output = {
|
|
|
"subject": subject,
|
|
|
"PART_A": semantic_frequency(part_a),
|
|
|
"PART_B": semantic_frequency(part_b)
|
|
|
}
|
|
|
print("β
Questions clustered and frequency calculated")
|
|
|
Path(output_json).write_text(
|
|
|
json.dumps(output, indent=2),
|
|
|
encoding="utf-8"
|
|
|
)
|
|
|
|
|
|
print("β
Semantic Frequency Analysis Completed")
|
|
|
print(f"π Output saved at: {output_json}")
|
|
|
return output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|