masterllm / utilities /extract_text.py
redhairedshanks1's picture
Update utilities/extract_text.py
dd3d80f
# import os
# import requests
# EXTRACT_TEXT_API = "https://point9-extract-text-and-table.hf.space/api/text" # Replace with your space URL
# def extract_text_remote(state):
# filename = state["filename"]
# path = state["temp_files"][filename]
# with open(path, "rb") as f:
# files = {"file": (filename, f, "application/pdf")}
# data = {
# "filename": filename,
# "start_page": state.get("start_page", 1),
# "end_page": state.get("end_page", 1)
# }
# headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACE_API_TOKEN')}"}
# resp = requests.post(EXTRACT_TEXT_API, files=files, data=data, headers=headers)
# if resp.status_code != 200:
# raise RuntimeError(f"Extract text API failed: {resp.text}")
# state["text"] = resp.json().get("text", "")
# return state
import os
import requests
# Hardcoded API URL - DO NOT CHANGE
EXTRACT_TEXT_API = "https://point9-extract-text-and-table.hf.space/api/text"
def extract_text_remote(state):
"""
Extract text from documents via API.
All file types are sent to the API (no local processing).
Args:
state: Dict with 'filename', 'temp_files', 'start_page', 'end_page'
Returns:
state: Dict with 'text' key containing extracted text
"""
filename = state["filename"]
path = state["temp_files"][filename]
if not os.path.exists(path):
raise RuntimeError(f"File not found: {path}")
# Get file size for debugging
file_size = os.path.getsize(path)
# Extract just the filename (not full path) to match curl format
file_basename = os.path.basename(path)
print(f"\n{'='*60}")
print(f"πŸ“„ EXTRACT TEXT API CALL")
print(f"{'='*60}")
print(f"File: {filename}")
print(f"Basename: {file_basename}")
print(f"Path: {path}")
print(f"Size: {file_size} bytes")
print(f"Start Page: {state.get('start_page', 1)}")
print(f"End Page: {state.get('end_page', 1)}")
print(f"API URL: {EXTRACT_TEXT_API}")
print(f"Auth Token: {'βœ“ Set' if os.getenv('HUGGINGFACE_API_TOKEN') else 'βœ— Not Set'}")
with open(path, "rb") as f:
# IMPORTANT: Use basename for the file tuple (matches curl format)
files = {"file": (file_basename, f, "application/pdf")}
# IMPORTANT: Convert page numbers to strings (matches curl -F format)
data = {
"filename": file_basename, # Just filename, not full path
"start_page": str(state.get("start_page", 1)), # String, not int
"end_page": str(state.get("end_page", 1)) # String, not int
}
headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACE_API_TOKEN')}"}
# DON'T set Content-Type - let requests handle it for multipart/form-data
print(f"\nπŸš€ Sending request to API...")
print(f"File tuple: ('file', ('{file_basename}', <binary>, 'application/pdf'))")
print(f"Data params: {data}")
print(f"Data types: start_page={type(data['start_page'])}, end_page={type(data['end_page'])}")
print(f"Headers: {headers}")
print(f"File size in bytes: {file_size}")
# Call API and wait for response
# NOTE: Increased timeout to 300 seconds (5 minutes) for large files
try:
resp = requests.post(EXTRACT_TEXT_API, files=files, data=data, headers=headers, timeout=300)
except requests.exceptions.Timeout:
print(f"❌ Request timed out after 300 seconds")
raise RuntimeError("API request timed out after 5 minutes")
except requests.exceptions.RequestException as e:
print(f"❌ Request exception: {str(e)}")
raise RuntimeError(f"API request failed: {str(e)}")
print(f"\nπŸ“₯ API Response:")
print(f"Status Code: {resp.status_code}")
print(f"Response Headers: {dict(resp.headers)}")
if resp.status_code != 200:
print(f"❌ Error Response: {resp.text[:500]}")
raise RuntimeError(f"Extract text API failed with status {resp.status_code}: {resp.text}")
try:
response_json = resp.json()
print(f"Response JSON keys: {list(response_json.keys())}")
print(f"Response JSON: {str(response_json)[:500]}")
# IMPORTANT: The API returns {"status": "completed", "result": "text here", "process_id": "..."}
# NOT {"text": "..."}
extracted_text = response_json.get("result", "") or response_json.get("text", "")
text_length = len(extracted_text) if extracted_text else 0
print(f"\nπŸ“Š Extraction Result:")
print(f"API Status: {response_json.get('status', 'unknown')}")
print(f"Process ID: {response_json.get('process_id', 'none')}")
print(f"Text Length: {text_length} characters")
if text_length > 0:
print(f"First 200 chars: {extracted_text[:200]}")
else:
print(f"⚠️ WARNING: API returned EMPTY text!")
print(f"Full response: {response_json}")
state["text"] = extracted_text
print(f"{'='*60}\n")
return state
except Exception as e:
print(f"❌ Error parsing response: {str(e)}")
print(f"Raw response: {resp.text[:500]}")
raise RuntimeError(f"Failed to parse API response: {str(e)}")