Spaces:
Sleeping
Sleeping
| import time | |
| import os | |
| import requests | |
| import mimetypes | |
| from PyPDF2 import PdfReader, PdfWriter | |
| import tempfile | |
| import re | |
| AZURE_ENDPOINT = os.environ.get("AZURE_ENDPOINT") | |
| AZURE_KEY = os.environ.get("AZURE_KEY") | |
| if not AZURE_ENDPOINT or not AZURE_KEY: | |
| raise RuntimeError("Set AZURE_ENDPOINT and AZURE_KEY in .env") | |
| AZURE_ENDPOINT = AZURE_ENDPOINT.rstrip("/") | |
| def read_file_bytes(path): | |
| with open(path, "rb") as f: | |
| return f.read() | |
| def detect_content_type(file_path: str): | |
| mime, _ = mimetypes.guess_type(file_path) | |
| return mime or "application/octet-stream" | |
| def submit_read_api(file_path): | |
| """Submit file to Computer Vision Read API""" | |
| url = f"{AZURE_ENDPOINT}/vision/v3.2/read/analyze" | |
| headers = { | |
| "Ocp-Apim-Subscription-Key": AZURE_KEY, | |
| "Content-Type": "application/octet-stream" | |
| } | |
| data = read_file_bytes(file_path) | |
| resp = requests.post(url, headers=headers, data=data) | |
| print("Azure OCR request URL:", url) | |
| print("Azure OCR response status:", resp.status_code) | |
| print("Azure OCR response headers:", resp.headers) | |
| resp.raise_for_status() | |
| op_location = resp.headers.get("Operation-Location") | |
| if not op_location: | |
| raise RuntimeError(f"No Operation-Location header. Response: {resp.text}") | |
| return op_location | |
| def poll_read_result(operation_location, timeout=180, interval=2.0): | |
| """Poll until Computer Vision OCR completes""" | |
| headers = {"Ocp-Apim-Subscription-Key": AZURE_KEY} | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| r = requests.get(operation_location, headers=headers) | |
| r.raise_for_status() | |
| j = r.json() | |
| status = j.get("status", "").lower() | |
| if status in ("succeeded", "failed"): | |
| break | |
| time.sleep(interval) | |
| if status != "succeeded": | |
| raise RuntimeError(f"OCR failed. Status={status}, Response={j}") | |
| analyze_result = j.get("analyzeResult", {}) | |
| lines = [] | |
| for read_result in analyze_result.get("readResults", []): | |
| for line in read_result.get("lines", []): | |
| lines.append(line["text"]) | |
| print(f"✅ Extracted {len(lines)} lines of text") | |
| return "\n".join(lines) | |
| def split_pdf_into_chunks(pdf_path, chunk_size=2): | |
| reader = PdfReader(pdf_path) | |
| total_pages = len(reader.pages) | |
| chunk_files = [] | |
| for start in range(0, total_pages, chunk_size): | |
| writer = PdfWriter() | |
| for p in range(start, min(start + chunk_size, total_pages)): | |
| writer.add_page(reader.pages[p]) | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") | |
| with open(tmp.name, "wb") as f: | |
| writer.write(f) | |
| chunk_files.append(tmp.name) | |
| return chunk_files | |
| def clean_extracted_text(text: str) -> str: | |
| # Remove page markers | |
| text = re.sub(r"--- Page.*?---", "", text) | |
| # Remove chunk markers | |
| text = re.sub(r"\(chunk\)", "", text) | |
| # Remove junk words | |
| text = re.sub(r"\b(?:stone|Stegaumen|studystone\.in)\b", "", text, flags=re.IGNORECASE) | |
| # Remove roll numbers and codes | |
| text = re.sub(r"Z-\d+", "", text) | |
| # Remove P.T.O | |
| text = re.sub(r"P\.T\.O\.", "", text, flags=re.IGNORECASE) | |
| # Normalize per-line spacing but preserve newlines | |
| lines = [re.sub(r"\s+", " ", line).strip() for line in text.splitlines()] | |
| return "\n".join([l for l in lines if l]) | |
| def poll_read_result(operation_location, timeout=180, interval=2.0): | |
| headers = {"Ocp-Apim-Subscription-Key": AZURE_KEY} | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| r = requests.get(operation_location, headers=headers) | |
| r.raise_for_status() | |
| j = r.json() | |
| status = j.get("status", "").lower() | |
| if status in ("succeeded", "failed"): | |
| break | |
| time.sleep(interval) | |
| if status != "succeeded": | |
| raise RuntimeError(f"OCR failed. Status={status}, Response={j}") | |
| analyze_result = j.get("analyzeResult", {}) | |
| pages = analyze_result.get("pages", []) | |
| content = analyze_result.get("content", "") | |
| pages_text = [] | |
| for page in pages: | |
| page_num = page.get("pageNumber", "?") | |
| spans = page.get("spans", []) | |
| text_parts = [content[s["offset"]: s["offset"] + s["length"]] for s in spans] | |
| joined = "\n".join(text_parts).strip() or "(No text detected)" | |
| pages_text.append(f"--- Page {page_num} ---\n{joined}") | |
| print(f"✅ Processed {len(pages)} pages successfully") | |
| return "\n\n".join(pages_text) | |