cryogenic22 commited on
Commit
a685f5a
·
verified ·
1 Parent(s): d871011

Update utils/document_processor.py

Browse files
Files changed (1) hide show
  1. utils/document_processor.py +105 -7
utils/document_processor.py CHANGED
@@ -5,10 +5,22 @@ from pdf2image import convert_from_bytes
5
  import pytesseract
6
  from PIL import Image
7
  from typing import Tuple, List, Dict
 
 
 
8
 
9
  class DocumentProcessor:
10
- def process_document(self, file) -> Tuple[str, List[Dict]]:
11
- """Process a document and return its text and chunks."""
 
 
 
 
 
 
 
 
 
12
  file_type = file.name.split(".")[-1].lower()
13
  if file_type == "pdf":
14
  text = self._process_pdf(file)
@@ -18,9 +30,10 @@ class DocumentProcessor:
18
  text = self._process_text(file)
19
  else:
20
  raise ValueError(f"Unsupported file type: {file_type}")
21
-
22
  chunks = self._chunk_text(text)
23
- return text, chunks
 
24
 
25
  def _process_pdf(self, file) -> str:
26
  """Extract text from a PDF, including OCR for scanned PDFs."""
@@ -28,10 +41,9 @@ class DocumentProcessor:
28
  text = ""
29
  for page in reader.pages:
30
  page_text = page.extract_text()
31
- if page_text:
32
  text += page_text
33
  else:
34
- st.warning("Detected a scanned PDF. Performing OCR...")
35
  pdf_bytes = file.read()
36
  text += self._perform_ocr(pdf_bytes)
37
  return text
@@ -56,10 +68,96 @@ class DocumentProcessor:
56
  detected_encoding = chardet.detect(raw_data)["encoding"]
57
  return raw_data.decode(detected_encoding)
58
  except Exception as e:
59
- st.error(f"Error processing text file: {e}")
60
  return ""
61
 
62
  def _chunk_text(self, text: str, chunk_size: int = 500) -> List[Dict]:
63
  """Split text into smaller chunks for vectorization."""
64
  return [{"chunk_id": idx, "text": text[i:i + chunk_size]}
65
  for idx, i in enumerate(range(0, len(text), chunk_size))]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  import pytesseract
6
  from PIL import Image
7
  from typing import Tuple, List, Dict
8
+ import json
9
+ import os
10
+
11
 
12
  class DocumentProcessor:
13
+ def __init__(self, ontology_path: str = "data/legal_ontology.json"):
14
+ """
15
+ Initialize Document Processor.
16
+
17
+ Args:
18
+ ontology_path (str): Path to the legal ontology JSON file.
19
+ """
20
+ self.ontology = self._load_ontology(ontology_path)
21
+
22
+ def process_document(self, file) -> Tuple[str, List[Dict], Dict]:
23
+ """Process a document, extract text, chunks, and metadata."""
24
  file_type = file.name.split(".")[-1].lower()
25
  if file_type == "pdf":
26
  text = self._process_pdf(file)
 
30
  text = self._process_text(file)
31
  else:
32
  raise ValueError(f"Unsupported file type: {file_type}")
33
+
34
  chunks = self._chunk_text(text)
35
+ metadata = self._extract_metadata(text, file.name)
36
+ return text, chunks, metadata
37
 
38
  def _process_pdf(self, file) -> str:
39
  """Extract text from a PDF, including OCR for scanned PDFs."""
 
41
  text = ""
42
  for page in reader.pages:
43
  page_text = page.extract_text()
44
+ if page_text.strip():
45
  text += page_text
46
  else:
 
47
  pdf_bytes = file.read()
48
  text += self._perform_ocr(pdf_bytes)
49
  return text
 
68
  detected_encoding = chardet.detect(raw_data)["encoding"]
69
  return raw_data.decode(detected_encoding)
70
  except Exception as e:
71
+ print(f"Error processing text file: {e}")
72
  return ""
73
 
74
  def _chunk_text(self, text: str, chunk_size: int = 500) -> List[Dict]:
75
  """Split text into smaller chunks for vectorization."""
76
  return [{"chunk_id": idx, "text": text[i:i + chunk_size]}
77
  for idx, i in enumerate(range(0, len(text), chunk_size))]
78
+
79
+ def _extract_metadata(self, text: str, file_name: str) -> Dict:
80
+ """
81
+ Extract metadata such as document type, jurisdiction, and key parties.
82
+
83
+ Args:
84
+ text (str): Extracted document text.
85
+ file_name (str): Original file name.
86
+
87
+ Returns:
88
+ Dict: Extracted metadata.
89
+ """
90
+ metadata = {
91
+ "title": file_name,
92
+ "type": self._infer_document_type(text),
93
+ "jurisdiction": self._infer_jurisdiction(text),
94
+ "key_parties": self._extract_key_parties(text),
95
+ "effective_dates": self._extract_dates(text),
96
+ "ontology_links": self._link_to_ontology(text)
97
+ }
98
+ return metadata
99
+
100
+ def _infer_document_type(self, text: str) -> str:
101
+ """Infer the type of the document based on keywords."""
102
+ document_types = {
103
+ "judgement": ["court", "judge", "judgment", "verdict"],
104
+ "contract": ["agreement", "contract", "clause", "terms"],
105
+ "mou": ["memorandum of understanding", "mou", "collaboration"],
106
+ "will": ["testament", "executor", "bequest", "inheritance"]
107
+ }
108
+ for doc_type, keywords in document_types.items():
109
+ if any(keyword.lower() in text.lower() for keyword in keywords):
110
+ return doc_type
111
+ return "unknown"
112
+
113
+ def _infer_jurisdiction(self, text: str) -> str:
114
+ """Infer the jurisdiction based on keywords in the text."""
115
+ jurisdictions = {
116
+ "US": ["united states", "california", "federal law"],
117
+ "UK": ["united kingdom", "england", "scotland", "british law"],
118
+ "UAE": ["united arab emirates", "dubai", "abu dhabi"],
119
+ "India": ["india", "indian law", "supreme court"]
120
+ }
121
+ for jurisdiction, keywords in jurisdictions.items():
122
+ if any(keyword.lower() in text.lower() for keyword in keywords):
123
+ return jurisdiction
124
+ return "unknown"
125
+
126
+ def _extract_key_parties(self, text: str) -> List[str]:
127
+ """Extract key parties involved in the document."""
128
+ # Simplified logic for extracting parties; regex or NLP can enhance this.
129
+ lines = text.splitlines()
130
+ parties = [line.strip() for line in lines if "party" in line.lower()]
131
+ return parties[:5] # Limit to 5 parties for simplicity
132
+
133
+ def _extract_dates(self, text: str) -> List[str]:
134
+ """Extract dates from the text."""
135
+ # Simplified example using date patterns
136
+ import re
137
+ date_pattern = r"\b(?:\d{1,2}/\d{1,2}/\d{2,4}|\d{1,2} \w+ \d{4})\b"
138
+ return re.findall(date_pattern, text)
139
+
140
+ def _link_to_ontology(self, text: str) -> List[Dict]:
141
+ """
142
+ Link document content to legal ontology for context and relevance.
143
+
144
+ Args:
145
+ text (str): Extracted document text.
146
+
147
+ Returns:
148
+ List[Dict]: Relevant ontology concepts and links.
149
+ """
150
+ relevant_ontology = []
151
+ for concept in self.ontology:
152
+ if concept["keyword"].lower() in text.lower():
153
+ relevant_ontology.append({"concept": concept["name"], "description": concept["description"]})
154
+ return relevant_ontology
155
+
156
+ def _load_ontology(self, path: str) -> List[Dict]:
157
+ """Load the legal ontology from a JSON file."""
158
+ if os.path.exists(path):
159
+ with open(path, "r") as f:
160
+ return json.load(f)
161
+ else:
162
+ print("Ontology file not found. Using an empty ontology.")
163
+ return []