Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from dotenv import load_dotenv | |
| import fitz # PyMuPDF | |
| from langchain_openai import ChatOpenAI # Correct import from langchain-openai | |
| from langchain.schema import HumanMessage, SystemMessage # For creating structured chat messages | |
| QUESTIONS_PATH = "questions.json" | |
| # Load environment variables | |
| load_dotenv() | |
| def split_text_into_chunks(text: str, chunk_size: int) -> list: | |
| """ | |
| Splits the text into chunks of a specified maximum size. | |
| """ | |
| # Trim the text to remove leading/trailing whitespace and reduce multiple spaces to a single space | |
| cleaned_text = " ".join(text.split()) | |
| words = cleaned_text.split(" ") | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for word in words: | |
| if current_length + len(word) + 1 > chunk_size: | |
| chunks.append(" ".join(current_chunk)) | |
| current_chunk = [word] | |
| current_length = len(word) | |
| else: | |
| current_chunk.append(word) | |
| current_length += len(word) + 1 | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| return chunks | |
| def distribute_questions_across_chunks(n_chunks: int, n_questions: int) -> list: | |
| """ | |
| Distributes a specified number of questions across a specified number of chunks. | |
| """ | |
| questions_per_chunk = [1] * min(n_chunks, n_questions) | |
| remaining_questions = n_questions - len(questions_per_chunk) | |
| if remaining_questions > 0: | |
| for i in range(len(questions_per_chunk)): | |
| if remaining_questions == 0: | |
| break | |
| questions_per_chunk[i] += 1 | |
| remaining_questions -= 1 | |
| while len(questions_per_chunk) < n_chunks: | |
| questions_per_chunk.append(0) | |
| return questions_per_chunk | |
| def extract_text_from_pdf(pdf_path): | |
| text = "" | |
| try: | |
| print(f"[DEBUG] Opening PDF: {pdf_path}") | |
| with fitz.open(pdf_path) as pdf: | |
| print(f"[DEBUG] Extracting text from PDF: {pdf_path}") | |
| for page in pdf: | |
| text += page.get_text() | |
| except Exception as e: | |
| print(f"Error reading PDF: {e}") | |
| raise RuntimeError("Unable to extract text from PDF.") | |
| return text | |
| def generate_questions_from_text(text, n_questions=5): | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_api_key: | |
| raise RuntimeError( | |
| "OpenAI API key not found. Please add it to your .env file as OPENAI_API_KEY." | |
| ) | |
| chat = ChatOpenAI( | |
| openai_api_key=openai_api_key, model="gpt-4", temperature=0.7, max_tokens=750 | |
| ) | |
| messages = [ | |
| SystemMessage( | |
| content="You are an expert interviewer who generates concise technical interview questions. Do not enumerate the questions. Answer only with questions." | |
| ), | |
| HumanMessage( | |
| content=f"Based on the following content, generate {n_questions} technical interview questions:\n{text}" | |
| ), | |
| ] | |
| try: | |
| print(f"[DEBUG] Sending request to OpenAI with {n_questions} questions.") | |
| response = chat.invoke(messages) | |
| questions = response.content.strip().split("\n\n") | |
| questions = [q.strip() for q in questions if q.strip()] | |
| print(f"[DEBUG] Raw questions from LLM: {questions}") | |
| formatted_questions = [] | |
| for i, q in enumerate(questions): | |
| formatted_questions.append(f"Question {i+1}: {q}") | |
| print(f"[DEBUG] Formatted questions: {formatted_questions}") | |
| return formatted_questions | |
| except Exception as e: | |
| print(f"[ERROR] Failed to generate questions: {e}") | |
| return ["An error occurred while generating questions."] | |
| def save_questions(questions): | |
| with open(QUESTIONS_PATH, "w") as f: | |
| json.dump(questions, f, indent=4) | |
| import os | |
| import json | |
| import re | |
| def generate_and_save_questions_from_pdf3(pdf_path, total_questions=5): | |
| print(f"[INFO] Generating questions from PDF: {pdf_path}") | |
| print(f"[DEBUG] Number of total questions to generate: {total_questions}") | |
| if not os.path.exists(pdf_path): | |
| yield "β Error: PDF file not found.", [] | |
| return | |
| yield "π PDF uploaded successfully. Processing started...", [] | |
| try: | |
| # 1. Extract text from the PDF | |
| pdf_text = extract_text_from_pdf(pdf_path) | |
| if not pdf_text.strip(): | |
| yield "β Error: The PDF content is empty or could not be read.", [] | |
| return | |
| # 2. Split the PDF content into chunks | |
| chunk_size = 2000 # Adjust as necessary | |
| chunks = split_text_into_chunks(pdf_text, chunk_size) | |
| n_chunks = len(chunks) | |
| yield f"π Splitting text into {n_chunks} chunks...", [] | |
| # 3. Distribute total_questions evenly across the chunks | |
| base = total_questions // n_chunks | |
| remainder = total_questions % n_chunks | |
| questions_per_chunk = [base] * n_chunks | |
| for i in range(remainder): | |
| questions_per_chunk[i] += 1 | |
| print(f"[DEBUG] Questions per chunk distribution: {questions_per_chunk}") | |
| combined_questions = [] | |
| # Helper function to split any chunk's output into individual questions | |
| def split_into_individual_questions(text_block): | |
| """ | |
| Attempts to split a text block that might contain multiple questions | |
| (like '1. Some question? 2. Another question?') into separate items. | |
| """ | |
| # 1) Remove any "Question X:" prefix (e.g., "Question 1: ") | |
| text_block = re.sub(r'Question\s*\d+\s*:\s*', '', text_block, flags=re.IGNORECASE) | |
| # 2) Split on patterns like "1. Something", "2. Something" | |
| # This looks for one or more digits, then a dot, then whitespace: "(\d+\.\s+)" | |
| splitted = re.split(r'\d+\.\s+', text_block.strip()) | |
| # 3) Clean up and filter out empty items | |
| splitted = [s.strip() for s in splitted if s.strip()] | |
| return splitted | |
| # 4. Process each chunk and generate questions | |
| for i, (chunk, n_questions) in enumerate(zip(chunks, questions_per_chunk)): | |
| yield f"π Processing chunk {i+1} of {n_chunks} with {n_questions} questions...", [] | |
| if n_questions > 0: | |
| # This function returns either a list of questions or a single string with multiple questions | |
| questions_output = generate_questions_from_text(chunk, n_questions=n_questions) | |
| if isinstance(questions_output, list): | |
| # If it's already a list, we further ensure each item is split if needed | |
| for item in questions_output: | |
| combined_questions.extend(split_into_individual_questions(str(item))) | |
| else: | |
| # If it's a single string, we split it | |
| combined_questions.extend(split_into_individual_questions(str(questions_output))) | |
| # 5. Check if the number of generated questions matches the desired total | |
| if len(combined_questions) != total_questions: | |
| yield f"β οΈ Warning: Expected {total_questions}, but generated {len(combined_questions)}.", [] | |
| yield f"β Total {len(combined_questions)} questions generated. Saving questions...", [] | |
| # 6. Save the combined questions in `generated_questions_from_pdf.json` | |
| detailed_save_path = "generated_questions_from_pdf.json" | |
| with open(detailed_save_path, "w", encoding="utf-8") as f: | |
| json.dump({"questions": combined_questions}, f, indent=4, ensure_ascii=False) | |
| # 7. Save only the questions (overwrite `questions.json` if it already exists) | |
| #simple_save_path = "questions.json" | |
| #with open(simple_save_path, "w", encoding="utf-8") as f: | |
| # json.dump(combined_questions, f, indent=4, ensure_ascii=False) | |
| save_questions(combined_questions) | |
| print(f"[INFO] Questions saved to {QUESTIONS_PATH}") | |
| yield "β PDF processing complete. Questions saved successfully!", combined_questions | |
| except Exception as e: | |
| error_message = f"β Error during question generation: {str(e)}" | |
| print(f"[ERROR] {error_message}") | |
| yield error_message, [] | |
| def generate_questions_from_job_description_old(job_description, num_questions): | |
| print(f"[DEBUG] Generating {num_questions} questions from job description.") | |
| if not job_description.strip(): | |
| return "β Error: Job description is empty.", [] | |
| try: | |
| questions = generate_questions_from_text(job_description, num_questions=num_questions) | |
| if not questions: | |
| return "β Error: No questions generated.", [] | |
| return "β Questions generated successfully!", questions | |
| except Exception as e: | |
| error_message = f"β Error during question generation: {str(e)}" | |
| print(f"[ERROR] {error_message}") | |
| return error_message, [] | |
| import os | |
| import json | |
| import math | |
| import re | |
| import os | |
| import json | |
| import math | |
| import re | |
| def distribute_questions_evenly(total_questions, n_chunks): | |
| base = total_questions // n_chunks | |
| remainder = total_questions % n_chunks | |
| questions_per_chunk = [base] * n_chunks | |
| # Distribute the remainder by incrementing the first `remainder` chunks | |
| for i in range(remainder): | |
| questions_per_chunk[i] += 1 | |
| return questions_per_chunk | |
| def generate_questions_from_job_description(job_description, total_questions=5): | |
| print(f"[DEBUG] Generating {total_questions} questions from job description.") | |
| if not job_description.strip(): | |
| return "β Error: Job description is empty.", [] | |
| try: | |
| # 1. Split the job description into chunks | |
| chunk_size = 2000 # Adjust as necessary | |
| chunks = split_text_into_chunks(job_description, chunk_size) | |
| n_chunks = len(chunks) | |
| print(f"[DEBUG] Splitting text into {n_chunks} chunks...") | |
| # 2. Distribute total_questions evenly across the chunks | |
| questions_per_chunk = distribute_questions_evenly(total_questions, n_chunks) | |
| print(f"[DEBUG] Questions per chunk distribution: {questions_per_chunk}") | |
| combined_questions = [] | |
| # Helper function to split any chunk's output into individual questions | |
| def split_into_individual_questions(text_block): | |
| """ | |
| Attempts to split a text block that might contain multiple questions | |
| (like '1. Some question? 2. Another question?') into separate items. | |
| """ | |
| # Remove any "Question X:" prefix (e.g., "Question 1: ") | |
| text_block = re.sub(r'Question\s*\d+\s*:\s*', '', text_block, flags=re.IGNORECASE) | |
| # Split on patterns like "1. Something", "2. Something" | |
| splitted = re.split(r'\d+\.\s+', text_block.strip()) | |
| # Clean up and filter out empty items | |
| return [s.strip() for s in splitted if s.strip()] | |
| # 3. Process each chunk and generate questions | |
| for i, (chunk, n_questions) in enumerate(zip(chunks, questions_per_chunk)): | |
| print(f"[DEBUG] Processing chunk {i+1} of {n_chunks} with {n_questions} questions...") | |
| if n_questions > 0: | |
| questions_output = generate_questions_from_text(chunk, n_questions=n_questions) | |
| if isinstance(questions_output, list): | |
| for item in questions_output: | |
| combined_questions.extend(split_into_individual_questions(str(item))) | |
| else: | |
| combined_questions.extend(split_into_individual_questions(str(questions_output))) | |
| if len(combined_questions) != total_questions: | |
| print(f"β οΈ Warning: Expected {total_questions}, but generated {len(combined_questions)}.") | |
| print(f"β Total {len(combined_questions)} questions generated. Saving questions...") | |
| # Save the combined questions in `generated_questions_from_job_description.json` | |
| detailed_save_path = "generated_questions_from_job_description.json" | |
| with open(detailed_save_path, "w", encoding="utf-8") as f: | |
| json.dump({"questions": combined_questions}, f, indent=4, ensure_ascii=False) | |
| # Save only the questions (overwrite `questions.json` if it already exists) | |
| #simple_save_path = "questions.json" | |
| #with open(simple_save_path, "w", encoding="utf-8") as f: | |
| # json.dump(combined_questions, f, indent=4, ensure_ascii=False) | |
| save_questions(combined_questions) | |
| print(f"[INFO] Questions saved to {QUESTIONS_PATH}") | |
| return "β Job description processing complete. Questions saved successfully!", combined_questions | |
| except Exception as e: | |
| error_message = f"β Error during question generation: {str(e)}" | |
| print(f"[ERROR] {error_message}") | |
| return error_message, [] | |
| if __name__ == "__main__": | |
| pdf_path = "professional_machine_learning_engineer_exam_guide_english.pdf" # Replace with your PDF path | |
| try: | |
| # Using the generator to get the results | |
| for status, questions in generate_and_save_questions_from_pdf3(pdf_path, total_questions=5): | |
| print(status) # Print the status message | |
| if questions: | |
| print(json.dumps(questions, indent=2)) # Print the questions if available | |
| except Exception as e: | |
| print(f"Failed to generate questions: {e}") |