|
|
import fitz |
|
|
import numpy as np |
|
|
import os |
|
|
import pandas as pd |
|
|
import re |
|
|
import datetime |
|
|
import pytesseract |
|
|
import cv2 |
|
|
import warnings |
|
|
import ocrmypdf |
|
|
import spacy |
|
|
import dateparser |
|
|
import statistics |
|
|
from statistics import mode |
|
|
from textblob import Word |
|
|
from Levenshtein import distance |
|
|
|
|
|
nlp = spacy.load('en_core_web_trf') |
|
|
|
|
|
def parse_doc(folderpath): |
|
|
doc = fitz.open(folderpath + '/opinion.pdf') |
|
|
header_texts, body_texts, footer_texts = [], [], [] |
|
|
paginated_dict = {} |
|
|
for (i, p) in enumerate(doc): |
|
|
ht, bt, ft = parse_page(folderpath, i) |
|
|
if "preliminary print" in ht.lower(): |
|
|
continue |
|
|
body_texts.append(bt) |
|
|
header_texts.append(ht) |
|
|
footer_texts.append(ft) |
|
|
paginated_dict[i] = (ht, bt, ft) |
|
|
return header_texts, body_texts, footer_texts, paginated_dict |
|
|
|
|
|
|
|
|
def parse_page(folderpath, pg_ind): |
|
|
df = pd.read_csv(folderpath + '/data.csv') |
|
|
|
|
|
header_text, body_text, footer_text = None, None, None |
|
|
page_df = df[df['Pg Ind'] == pg_ind].to_dict('records')[0] |
|
|
header = [page_df['Header X1'], page_df['Header Y1'], page_df['Header X2'], page_df['Header Y2']] |
|
|
body = [page_df['Body X1'], page_df['Body Y1'], page_df['Body X2'], page_df['Body Y2']] |
|
|
footer = [page_df['Footer X1'], page_df['Footer Y1'], page_df['Footer X2'], page_df['Footer Y2']] |
|
|
case_split = page_df['Case Separator Y'] |
|
|
body_rect = fitz.Rect(body[0], body[1], body[2], body[3]) |
|
|
header_rect = fitz.Rect(header[0], header[1], header[2], header[3]) |
|
|
footer_rect = fitz.Rect(footer[0], footer[1], footer[2], footer[3]) |
|
|
|
|
|
doc = fitz.open(folderpath + '/opinion.pdf') |
|
|
page = doc[pg_ind] |
|
|
header_text = page.get_text("text", clip=header_rect).strip().replace('Page Proof Pending Publication', '') |
|
|
body_text = page.get_text("text", clip=body_rect).strip().replace('Page Proof Pending Publication', '') |
|
|
if str(footer_rect[0]) != "nan": |
|
|
footer_text = page.get_text("text", clip=footer_rect).strip().replace('Page Proof Pending Publication', '') |
|
|
return header_text, body_text, footer_text |
|
|
|
|
|
|
|
|
def get_splits(folderpath): |
|
|
header_texts, body_texts, footer_texts, paginated_dict = parse_doc(folderpath) |
|
|
full_body_text = "\n".join(body_texts).replace('-', '') |
|
|
full_body_text = correct(full_body_text, "justice") |
|
|
|
|
|
split_p = re.compile('((\n|^)\s*Per Curiam\.\s*\n)|(Justice[A-z\s\n,]*delivered the opinion)|((\n|^)\s*(mr\.\s*)?justice[A-Za-z\n\s,–-]*(concurring|dissenting)[A-Za-z\n\s,–]*\.)', re.IGNORECASE) |
|
|
|
|
|
splits_m = list(re.finditer(split_p, full_body_text)) |
|
|
splits = [] |
|
|
|
|
|
if len(splits_m) > 0: |
|
|
print("---Found split---") |
|
|
i = 0 |
|
|
while i <= len(splits_m): |
|
|
if i == 0: |
|
|
start = 0 |
|
|
else: |
|
|
start = splits_m[i - 1].span()[0] |
|
|
if i == len(splits_m): |
|
|
splits.append(full_body_text[start:].strip()) |
|
|
else: |
|
|
splits.append(full_body_text[start:splits_m[i].span()[0]].strip()) |
|
|
i = i + 1 |
|
|
return splits, paginated_dict |
|
|
|
|
|
|
|
|
def get_split_data(split): |
|
|
txt = split[0:300] |
|
|
d = nlp(txt) |
|
|
first_sent = list(d.sents)[0] |
|
|
first_sent_text = " ".join([t.text for t in first_sent]) |
|
|
ents = nlp(first_sent_text).ents |
|
|
person_ents = [e.text.lower().split('tice')[-1].strip().capitalize() for e in ents if e.label_ == "PERSON"] |
|
|
if "chief justice" in first_sent_text.lower(): |
|
|
person_ents.append("Chief") |
|
|
opinion_type, author, joining = None, None, [] |
|
|
if "delivered" in first_sent_text: |
|
|
author = person_ents[0] |
|
|
joining = [] |
|
|
opinion_type = "majority" |
|
|
if "per curiam" in first_sent_text.lower(): |
|
|
author = "Per Curiam" |
|
|
joining = [] |
|
|
opinion_type = "majority" |
|
|
if "concurring" in first_sent_text: |
|
|
author = person_ents[0] |
|
|
joining = person_ents[1:] |
|
|
opinion_type = "concurrence" |
|
|
if "dissenting" in first_sent_text: |
|
|
author = person_ents[0] |
|
|
joining = person_ents[1:] |
|
|
opinion_type = "dissent" |
|
|
if opinion_type == None: |
|
|
opinion_type = "pre" |
|
|
return opinion_type, author, joining |
|
|
|
|
|
def court_from_year(date_time): |
|
|
df = pd.read_csv('Justices Table.csv') |
|
|
justice_dict = df.to_dict('records') |
|
|
court_year = {'Associate':[], 'Chief': None} |
|
|
for j in justice_dict: |
|
|
start = datetime.datetime.strptime(j['Start'], '%Y-%m-%d') |
|
|
if str(j['End']) != "nan": |
|
|
end = datetime.datetime.strptime(j['End'], '%Y-%m-%d') |
|
|
else: |
|
|
end = datetime.datetime.now() |
|
|
if date_time > start and date_time < end: |
|
|
name = j['Name'] |
|
|
if "Associate" in name: |
|
|
court_year['Associate'].append(name.split('(Associate Justice)')[0].split(', ')[0].strip().split(' ')[-1]) |
|
|
if "Chief" in name: |
|
|
court_year['Chief'] = name.split('(Chief Justice)')[0].split(', ')[0].strip() |
|
|
return court_year |
|
|
|
|
|
def correct(corpus, keyword): |
|
|
words = corpus.split(' ') |
|
|
potential_targets = [] |
|
|
for (i, w) in enumerate(words): |
|
|
d = distance(keyword, w.lower()) |
|
|
if d < 2 and d > 0: |
|
|
potential_targets.append((i, w)) |
|
|
|
|
|
for (ind, pt) in potential_targets: |
|
|
word = Word(pt.lower()) |
|
|
result = word.spellcheck() |
|
|
if result[0][1] > 0.9 and result[0][0].lower() != pt.lower(): |
|
|
if "\n" in pt: |
|
|
words[ind] = "\n" + result[0][0] |
|
|
else: |
|
|
words[ind] = result[0][0] |
|
|
return " ".join(words) |
|
|
|
|
|
def closest_justice(name, datetime): |
|
|
cy = court_from_year(datetime) |
|
|
justices = cy['Associate'] |
|
|
if cy['Chief'] is not None: |
|
|
justices += [cy['Chief']] |
|
|
if name.capitalize() not in justices: |
|
|
scores = [distance(j, name) for (i,j) in enumerate(justices)] |
|
|
closest_name = justices[np.argmin(scores)] |
|
|
if closest_name.capitalize() == cy['Chief']: |
|
|
closest_name = "Chief" |
|
|
return closest_name |
|
|
else: |
|
|
return name |
|
|
|
|
|
class Opinion: |
|
|
def __init__(self, opinion_type, author, joining, body_text, fn_text, header_text): |
|
|
self.opinion_type = opinion_type |
|
|
self.author = author |
|
|
self.joining = joining |
|
|
self.body_text = body_text |
|
|
self.fn_text = fn_text |
|
|
self.header_text = header_text |
|
|
|
|
|
class Case: |
|
|
def __init__(self, paginated_dict): |
|
|
self.paginated_dict = paginated_dict |
|
|
self.majority, self.concurrences, self.dissents, self.pre = None, [], [], None |
|
|
self.date, self.case_name, self.case_citation, self.page_numbers = None, "", None, [] |
|
|
self.recused = [] |
|
|
self.cert_info = None |
|
|
|
|
|
def get_date(self): |
|
|
print("Extracting Date") |
|
|
if self.pre is None: |
|
|
print(self.paginated_dict) |
|
|
doc = nlp(self.pre.body_text[0:2000]) |
|
|
sents = list(doc.sents) |
|
|
for s in sents: |
|
|
if "Decided" in s.text: |
|
|
date_extract = s.text.replace('\n', '').split('Decided')[-1].strip().replace('.', '') |
|
|
pattern = re.compile('Decided\s*\w*\s*[0-9]{1,2}[\.,]\s?[0-9]{4}') |
|
|
match = re.search(pattern, s.text) |
|
|
date_extract = s.text[match.span()[0]:match.span()[1]].split('Decided')[-1].strip() |
|
|
date = dateparser.parse(date_extract) |
|
|
self.date = date |
|
|
return |
|
|
|
|
|
def update_recused(self): |
|
|
print("Identifying recused") |
|
|
p = re.compile('(?:justice )[\w\s]*(?: took no part)', re.IGNORECASE) |
|
|
m = re.search(p, self.majority.body_text) |
|
|
if m is not None: |
|
|
recused_span = self.majority.body_text[m.span()[0]:m.span()[1]].lower() |
|
|
doc = nlp(recused_span) |
|
|
self.recused = [e.text.split('justice')[-1].upper().strip().capitalize() for e in doc.ents if |
|
|
e.label_ == "PERSON"] |
|
|
if "chief justice" in recused_span: |
|
|
self.recused.append("Chief") |
|
|
|
|
|
def update_majority_joining(self): |
|
|
print("Getting updated list") |
|
|
cy = court_from_year(self.date) |
|
|
known = [j for d in self.dissents for j in d.joining] + [d.author for d in self.dissents] + [j for c in |
|
|
self.concurrences |
|
|
for j in |
|
|
c.joining] + [ |
|
|
c.author for c in self.concurrences] + [self.majority.author] + [r for r in self.recused] |
|
|
all_justices = [aj for aj in cy['Associate']] |
|
|
if cy['Chief'] is not None: |
|
|
all_justices.append('Chief') |
|
|
self.majority.joining = [aj for aj in all_justices if aj not in known] |
|
|
|
|
|
def get_cert_info(self): |
|
|
print("Extracting Cert Info") |
|
|
lines = self.pre.body_text.split('\n') |
|
|
start = -1 |
|
|
end = -1 |
|
|
for (i, l) in enumerate(lines): |
|
|
if "petition" in l.lower() or "cert" in l.lower() or "error" in l.lower() or "appeal" in l.lower() or "on" in l.lower().split(' '): |
|
|
start = i |
|
|
if "no." in l.lower() or "nos." in l.lower() or "argued" in l.lower() or "decided" in l.lower(): |
|
|
end = i |
|
|
break |
|
|
self.cert_info = " ".join(lines[start:end]).strip().upper().replace(' ', ' ').replace('.', '') |
|
|
|
|
|
def get_case_name_cite_pns(self): |
|
|
lines_total = [l for p in self.paginated_dict for l in self.paginated_dict[p][0].split('\n')[:-1]] |
|
|
lines_selected = [] |
|
|
p = re.compile('(october|per curiam|opinion of|concur|dissent|statement of|argument|syllabus|[0-9] ?U.)', re.IGNORECASE) |
|
|
for l in lines_total: |
|
|
m = re.search(p, l) |
|
|
if m is None and not l.lower().strip().isnumeric(): |
|
|
lines_selected.append(l) |
|
|
self.case_name = mode(lines_selected) |
|
|
|
|
|
p = re.compile('[0-9]*\s?U\.\s?S\. ?([0-9]|_)*', re.IGNORECASE) |
|
|
lines_selected = [] |
|
|
for l in lines_total: |
|
|
m = re.search(p, l) |
|
|
if m is not None: |
|
|
self.case_citation = l[m.span()[0]:m.span()[1]] |
|
|
break |
|
|
|
|
|
p = re.compile('^\s?[0-9]+\s?$', re.IGNORECASE) |
|
|
page_lines = [self.paginated_dict[p][0].split('\n') for p in self.paginated_dict] |
|
|
self.page_numbers = [] |
|
|
for pl in page_lines: |
|
|
numeric_on_page = [] |
|
|
for l in pl: |
|
|
matches = list(re.finditer(p, l)) |
|
|
for m in matches: |
|
|
possibility = int(l[m.span()[0]:m.span()[1]].strip()) |
|
|
numeric_on_page.append(possibility) |
|
|
if len(numeric_on_page) == 0: |
|
|
if len(self.page_numbers) > 0: |
|
|
self.page_numbers.append(self.page_numbers[-1] + 1) |
|
|
else: |
|
|
self.page_numbers.append(1) |
|
|
if len(numeric_on_page) > 0: |
|
|
page_number = max(numeric_on_page) |
|
|
if len(self.page_numbers) > 0: |
|
|
page_number = max(page_number, self.page_numbers[-1] + 1) |
|
|
self.page_numbers.append(page_number) |
|
|
|
|
|
if self.case_citation is not None and self.case_citation.lower().split('s.')[-1].strip() == "": |
|
|
self.case_citation = self.case_citation.strip() + ' ' + str(self.page_numbers[0]) |
|
|
|
|
|
def update_justice_names(self): |
|
|
if self.majority.author.lower() != "per curiam": |
|
|
self.majority.author = closest_justice(self.majority.author, self.date) |
|
|
for (i,cons) in enumerate(self.concurrences): |
|
|
self.concurrences[i].author = closest_justice(self.concurrences[i].author, self.date) |
|
|
for (i,dissents) in enumerate(self.dissents): |
|
|
self.dissents[i].author = closest_justice(self.dissents[i].author, self.date) |
|
|
return |
|
|
|
|
|
def process(self): |
|
|
self.get_date() |
|
|
self.update_justice_names() |
|
|
self.update_recused() |
|
|
self.update_majority_joining() |
|
|
self.get_cert_info() |
|
|
self.get_case_name_cite_pns() |
|
|
|
|
|
def run(folderpath): |
|
|
splits, paginated_dict = get_splits(folderpath) |
|
|
C = Case(paginated_dict=paginated_dict) |
|
|
ops = [] |
|
|
for s in splits: |
|
|
opinion_type, author, joining = get_split_data(s) |
|
|
if opinion_type is not None: |
|
|
op = Opinion(opinion_type, author, joining, s, fn_text=None, header_text=None) |
|
|
if opinion_type == "majority": |
|
|
C.majority = op |
|
|
if opinion_type == "concurrence": |
|
|
C.concurrences.append(op) |
|
|
if opinion_type == "dissent": |
|
|
C.dissents.append(op) |
|
|
if opinion_type == "pre": |
|
|
C.pre = op |
|
|
ops.append(op) |
|
|
|
|
|
C.process() |
|
|
return C |