|
|
|
|
|
|
|
|
import pdfplumber
|
|
|
import re
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, Any, Optional, List, Tuple
|
|
|
from dataclasses import dataclass
|
|
|
from loguru import logger
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class DocumentChunk:
|
|
|
"""chunk of text from document"""
|
|
|
chunk_id: str
|
|
|
text: str
|
|
|
page_num: int
|
|
|
start_char: int
|
|
|
end_char: int
|
|
|
metadata: Dict[str, Any]
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class ParsedDocument:
|
|
|
"""parsed document data"""
|
|
|
file_name: str
|
|
|
total_pages: int
|
|
|
text_content: str
|
|
|
pages: List[Dict[str, Any]]
|
|
|
tables: List[Dict[str, Any]]
|
|
|
chunks: List[DocumentChunk]
|
|
|
metadata: Dict[str, Any]
|
|
|
|
|
|
|
|
|
class DocumentParser:
|
|
|
|
|
|
|
|
|
def __init__(self, chunk_size=1000, chunk_overlap=200):
|
|
|
self.chunk_size = chunk_size
|
|
|
self.chunk_overlap = chunk_overlap
|
|
|
logger.info(f"Parser initialized - chunk_size={chunk_size}, overlap={chunk_overlap}")
|
|
|
|
|
|
def parse_pdf(self, pdf_path):
|
|
|
"""
|
|
|
parse PDF and extract content
|
|
|
"""
|
|
|
logger.info(f"Parsing: {Path(pdf_path).name}")
|
|
|
|
|
|
try:
|
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
|
all_text = []
|
|
|
pages_data = []
|
|
|
tables_data = []
|
|
|
|
|
|
|
|
|
for page_num, page in enumerate(pdf.pages, start=1):
|
|
|
try:
|
|
|
page_result = self._parse_page(page, page_num)
|
|
|
|
|
|
all_text.append(page_result["text"])
|
|
|
pages_data.append(page_result["page_data"])
|
|
|
tables_data.extend(page_result["tables"])
|
|
|
|
|
|
logger.debug(f"Page {page_num}: {len(page_result['text'])} chars, {len(page_result['tables'])} tables")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error on page {page_num}: {str(e)}")
|
|
|
continue
|
|
|
|
|
|
full_text = "\n\n".join(all_text)
|
|
|
|
|
|
|
|
|
chunks = self._create_chunks(full_text, Path(pdf_path).name)
|
|
|
|
|
|
metadata = {
|
|
|
"file_path": pdf_path,
|
|
|
"file_name": Path(pdf_path).name,
|
|
|
"total_pages": len(pdf.pages),
|
|
|
"total_tables": len(tables_data),
|
|
|
"total_chunks": len(chunks),
|
|
|
"text_length": len(full_text)
|
|
|
}
|
|
|
|
|
|
parsed_doc = ParsedDocument(
|
|
|
file_name=Path(pdf_path).name,
|
|
|
total_pages=len(pdf.pages),
|
|
|
text_content=full_text,
|
|
|
pages=pages_data,
|
|
|
tables=tables_data,
|
|
|
chunks=chunks,
|
|
|
metadata=metadata
|
|
|
)
|
|
|
|
|
|
logger.success(f"Parsed {len(pdf.pages)} pages, {len(tables_data)} tables, {len(chunks)} chunks")
|
|
|
|
|
|
return parsed_doc
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
logger.error(f"File not found: {pdf_path}")
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to parse {pdf_path}: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def _parse_page(self, page, page_num):
|
|
|
"""parse single page"""
|
|
|
try:
|
|
|
|
|
|
page_text = page.extract_text()
|
|
|
if page_text is None:
|
|
|
page_text = ""
|
|
|
|
|
|
|
|
|
tables = []
|
|
|
raw_tables = page.extract_tables()
|
|
|
|
|
|
for table_idx, table in enumerate(raw_tables):
|
|
|
if table and len(table) > 0:
|
|
|
try:
|
|
|
table_data = {
|
|
|
"page": page_num,
|
|
|
"table_id": f"p{page_num}_t{table_idx + 1}",
|
|
|
"headers": table[0] if table else [],
|
|
|
"rows": table[1:] if len(table) > 1 else [],
|
|
|
"raw_data": table
|
|
|
}
|
|
|
tables.append(table_data)
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Table {table_idx} error on page {page_num}: {str(e)}")
|
|
|
|
|
|
page_data = {
|
|
|
"page_num": page_num,
|
|
|
"text": page_text,
|
|
|
"text_length": len(page_text),
|
|
|
"tables_count": len(tables),
|
|
|
"width": page.width,
|
|
|
"height": page.height
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
"text": page_text,
|
|
|
"tables": tables,
|
|
|
"page_data": page_data
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"_parse_page error for page {page_num}: {str(e)}")
|
|
|
return {
|
|
|
"text": "",
|
|
|
"tables": [],
|
|
|
"page_data": {
|
|
|
"page_num": page_num,
|
|
|
"text": "",
|
|
|
"text_length": 0,
|
|
|
"tables_count": 0
|
|
|
}
|
|
|
}
|
|
|
|
|
|
def _create_chunks(self, text, file_name):
|
|
|
"""
|
|
|
break text into chunks with overlap
|
|
|
TODO: maybe improve the chunking logic later
|
|
|
"""
|
|
|
try:
|
|
|
chunks = []
|
|
|
|
|
|
if not text:
|
|
|
logger.warning("Empty text for chunking")
|
|
|
return chunks
|
|
|
|
|
|
|
|
|
paragraphs = text.split('\n\n')
|
|
|
|
|
|
current_chunk = ""
|
|
|
current_start = 0
|
|
|
chunk_id = 0
|
|
|
|
|
|
for para in paragraphs:
|
|
|
para = para.strip()
|
|
|
if not para:
|
|
|
continue
|
|
|
|
|
|
|
|
|
if len(current_chunk) + len(para) > self.chunk_size and current_chunk:
|
|
|
|
|
|
chunk = DocumentChunk(
|
|
|
chunk_id=f"chunk_{chunk_id}",
|
|
|
text=current_chunk.strip(),
|
|
|
page_num=0,
|
|
|
start_char=current_start,
|
|
|
end_char=current_start + len(current_chunk),
|
|
|
metadata={
|
|
|
"source_file": file_name,
|
|
|
"chunk_length": len(current_chunk)
|
|
|
}
|
|
|
)
|
|
|
chunks.append(chunk)
|
|
|
chunk_id += 1
|
|
|
|
|
|
|
|
|
if len(current_chunk) > self.chunk_overlap:
|
|
|
overlap_text = current_chunk[-self.chunk_overlap:]
|
|
|
else:
|
|
|
overlap_text = current_chunk
|
|
|
current_start = current_start + len(current_chunk) - len(overlap_text)
|
|
|
current_chunk = overlap_text + "\n\n" + para
|
|
|
else:
|
|
|
|
|
|
if current_chunk:
|
|
|
current_chunk += "\n\n" + para
|
|
|
else:
|
|
|
current_chunk = para
|
|
|
|
|
|
|
|
|
if current_chunk:
|
|
|
chunk = DocumentChunk(
|
|
|
chunk_id=f"chunk_{chunk_id}",
|
|
|
text=current_chunk.strip(),
|
|
|
page_num=0,
|
|
|
start_char=current_start,
|
|
|
end_char=current_start + len(current_chunk),
|
|
|
metadata={
|
|
|
"source_file": file_name,
|
|
|
"chunk_length": len(current_chunk)
|
|
|
}
|
|
|
)
|
|
|
chunks.append(chunk)
|
|
|
|
|
|
logger.info(f"Created {len(chunks)} chunks")
|
|
|
return chunks
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Chunking error: {str(e)}")
|
|
|
return []
|
|
|
|
|
|
def extract_bureau_score(self, parsed_doc):
|
|
|
"""
|
|
|
grab CIBIL score from CRIF report
|
|
|
looks for pattern like "PERFORM CONSUMER 2.2 300-900 627"
|
|
|
"""
|
|
|
try:
|
|
|
text = parsed_doc.text_content
|
|
|
|
|
|
|
|
|
pattern = r'PERFORM\s+CONSUMER.*?300-900\s+(\d{3})'
|
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
|
|
|
|
if match:
|
|
|
score = int(match.group(1))
|
|
|
if 300 <= score <= 900:
|
|
|
logger.info(f"Found bureau score: {score}")
|
|
|
return {
|
|
|
"value": score,
|
|
|
"source": "CRIF Report – Score Section"
|
|
|
}
|
|
|
|
|
|
|
|
|
for page in parsed_doc.pages[:2]:
|
|
|
page_text = page["text"]
|
|
|
numbers = re.findall(r'\b(\d{3})\b', page_text)
|
|
|
|
|
|
for num_str in numbers:
|
|
|
num = int(num_str)
|
|
|
if 300 <= num <= 900:
|
|
|
|
|
|
idx = page_text.find(num_str)
|
|
|
context = page_text[max(0, idx-100):idx+100]
|
|
|
|
|
|
keywords = ['score', 'cibil', 'credit', 'bureau']
|
|
|
if any(kw in context.lower() for kw in keywords):
|
|
|
logger.info(f"Found score (fallback): {num}")
|
|
|
return {
|
|
|
"value": num,
|
|
|
"source": f"CRIF Report – Page {page['page_num']}"
|
|
|
}
|
|
|
|
|
|
logger.warning("Bureau score not found")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error extracting bureau score: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def extract_gst_sales(self, parsed_doc):
|
|
|
"""extract sales from GSTR-3B table"""
|
|
|
try:
|
|
|
text = parsed_doc.text_content
|
|
|
filename = parsed_doc.file_name
|
|
|
|
|
|
|
|
|
month_match = re.search(r'Period\s+(\w+)', text)
|
|
|
month_name = month_match.group(1) if month_match else "Unknown"
|
|
|
|
|
|
|
|
|
filename_year_match = re.search(r'_(\d{2})(\d{4})\.pdf', filename)
|
|
|
if filename_year_match:
|
|
|
year = filename_year_match.group(2)
|
|
|
else:
|
|
|
|
|
|
year_match = re.search(r'Year\s+(\d{4})', text)
|
|
|
year = year_match.group(1) if year_match else "2025"
|
|
|
|
|
|
formatted_month = f"{month_name} {year}"
|
|
|
|
|
|
|
|
|
for table in parsed_doc.tables:
|
|
|
rows = table.get("rows", [])
|
|
|
|
|
|
for row in rows:
|
|
|
if row and len(row) > 1:
|
|
|
first_cell = str(row[0]).replace('\n', ' ')
|
|
|
|
|
|
|
|
|
if "(a)" in first_cell and "Outward taxable supplies" in first_cell:
|
|
|
if len(row) > 1 and row[1]:
|
|
|
value_str = str(row[1])
|
|
|
clean_value = re.sub(r'[^\d.]', '', value_str)
|
|
|
|
|
|
if clean_value:
|
|
|
try:
|
|
|
sales = float(clean_value)
|
|
|
logger.info(f"GST sales: {sales} for {formatted_month}")
|
|
|
return {
|
|
|
"month": formatted_month,
|
|
|
"sales": sales,
|
|
|
"source": "GSTR-3B Table 3.1(a)"
|
|
|
}
|
|
|
except ValueError as e:
|
|
|
logger.warning(f"Couldn't parse sales value '{clean_value}': {str(e)}")
|
|
|
|
|
|
logger.warning(f"Sales data not found for {formatted_month}")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error extracting GST sales: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def get_chunks_text(self, chunks):
|
|
|
"""get text from chunks for embedding"""
|
|
|
try:
|
|
|
return [chunk.text for chunk in chunks]
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error getting chunks text: {str(e)}")
|
|
|
return []
|
|
|
|