| | import fitz |
| | import numpy as np |
| | import os |
| | import pandas as pd |
| | import re |
| | import datetime |
| | import pytesseract |
| | import cv2 |
| | import warnings |
| | import ocrmypdf |
| | import spacy |
| | import dateparser |
| | import statistics |
| | from statistics import mode |
| | from textblob import Word |
| | from Levenshtein import distance |
| |
|
| | nlp = spacy.load('en_core_web_trf') |
| |
|
| | def parse_doc(folderpath): |
| | doc = fitz.open(folderpath + '/opinion.pdf') |
| | header_texts, body_texts, footer_texts = [], [], [] |
| | paginated_dict = {} |
| | for (i, p) in enumerate(doc): |
| | ht, bt, ft = parse_page(folderpath, i) |
| | if "preliminary print" in ht.lower(): |
| | continue |
| | body_texts.append(bt) |
| | header_texts.append(ht) |
| | footer_texts.append(ft) |
| | paginated_dict[i] = (ht, bt, ft) |
| | return header_texts, body_texts, footer_texts, paginated_dict |
| |
|
| |
|
| | def parse_page(folderpath, pg_ind): |
| | df = pd.read_csv(folderpath + '/data.csv') |
| |
|
| | header_text, body_text, footer_text = None, None, None |
| | page_df = df[df['Pg Ind'] == pg_ind].to_dict('records')[0] |
| | header = [page_df['Header X1'], page_df['Header Y1'], page_df['Header X2'], page_df['Header Y2']] |
| | body = [page_df['Body X1'], page_df['Body Y1'], page_df['Body X2'], page_df['Body Y2']] |
| | footer = [page_df['Footer X1'], page_df['Footer Y1'], page_df['Footer X2'], page_df['Footer Y2']] |
| | case_split = page_df['Case Separator Y'] |
| | body_rect = fitz.Rect(body[0], body[1], body[2], body[3]) |
| | header_rect = fitz.Rect(header[0], header[1], header[2], header[3]) |
| | footer_rect = fitz.Rect(footer[0], footer[1], footer[2], footer[3]) |
| |
|
| | doc = fitz.open(folderpath + '/opinion.pdf') |
| | page = doc[pg_ind] |
| | header_text = page.get_text("text", clip=header_rect).strip().replace('Page Proof Pending Publication', '') |
| | body_text = page.get_text("text", clip=body_rect).strip().replace('Page Proof Pending Publication', '') |
| | if str(footer_rect[0]) != "nan": |
| | footer_text = page.get_text("text", clip=footer_rect).strip().replace('Page Proof Pending Publication', '') |
| | return header_text, body_text, footer_text |
| |
|
| |
|
| | def get_splits(folderpath): |
| | header_texts, body_texts, footer_texts, paginated_dict = parse_doc(folderpath) |
| | full_body_text = "\n".join(body_texts).replace('-', '') |
| | full_body_text = correct(full_body_text, "justice") |
| |
|
| | split_p = re.compile('((\n|^)\s*Per Curiam\.\s*\n)|(Justice[A-z\s\n,]*delivered the opinion)|((\n|^)\s*(mr\.\s*)?justice[A-Za-z\n\s,–-]*(concurring|dissenting)[A-Za-z\n\s,–]*\.)', re.IGNORECASE) |
| | |
| | splits_m = list(re.finditer(split_p, full_body_text)) |
| | splits = [] |
| |
|
| | if len(splits_m) > 0: |
| | print("---Found split---") |
| | i = 0 |
| | while i <= len(splits_m): |
| | if i == 0: |
| | start = 0 |
| | else: |
| | start = splits_m[i - 1].span()[0] |
| | if i == len(splits_m): |
| | splits.append(full_body_text[start:].strip()) |
| | else: |
| | splits.append(full_body_text[start:splits_m[i].span()[0]].strip()) |
| | i = i + 1 |
| | return splits, paginated_dict |
| |
|
| |
|
| | def get_split_data(split): |
| | txt = split[0:300] |
| | d = nlp(txt) |
| | first_sent = list(d.sents)[0] |
| | first_sent_text = " ".join([t.text for t in first_sent]) |
| | ents = nlp(first_sent_text).ents |
| | person_ents = [e.text.lower().split('tice')[-1].strip().capitalize() for e in ents if e.label_ == "PERSON"] |
| | if "chief justice" in first_sent_text.lower(): |
| | person_ents.append("Chief") |
| | opinion_type, author, joining = None, None, [] |
| | if "delivered" in first_sent_text: |
| | author = person_ents[0] |
| | joining = [] |
| | opinion_type = "majority" |
| | if "per curiam" in first_sent_text.lower(): |
| | author = "Per Curiam" |
| | joining = [] |
| | opinion_type = "majority" |
| | if "concurring" in first_sent_text: |
| | author = person_ents[0] |
| | joining = person_ents[1:] |
| | opinion_type = "concurrence" |
| | if "dissenting" in first_sent_text: |
| | author = person_ents[0] |
| | joining = person_ents[1:] |
| | opinion_type = "dissent" |
| | if opinion_type == None: |
| | opinion_type = "pre" |
| | return opinion_type, author, joining |
| |
|
| | def court_from_year(date_time): |
| | df = pd.read_csv('Justices Table.csv') |
| | justice_dict = df.to_dict('records') |
| | court_year = {'Associate':[], 'Chief': None} |
| | for j in justice_dict: |
| | start = datetime.datetime.strptime(j['Start'], '%Y-%m-%d') |
| | if str(j['End']) != "nan": |
| | end = datetime.datetime.strptime(j['End'], '%Y-%m-%d') |
| | else: |
| | end = datetime.datetime.now() |
| | if date_time > start and date_time < end: |
| | name = j['Name'] |
| | if "Associate" in name: |
| | court_year['Associate'].append(name.split('(Associate Justice)')[0].split(', ')[0].strip().split(' ')[-1]) |
| | if "Chief" in name: |
| | court_year['Chief'] = name.split('(Chief Justice)')[0].split(', ')[0].strip() |
| | return court_year |
| |
|
| | def correct(corpus, keyword): |
| | words = corpus.split(' ') |
| | potential_targets = [] |
| | for (i, w) in enumerate(words): |
| | d = distance(keyword, w.lower()) |
| | if d < 2 and d > 0: |
| | potential_targets.append((i, w)) |
| |
|
| | for (ind, pt) in potential_targets: |
| | word = Word(pt.lower()) |
| | result = word.spellcheck() |
| | if result[0][1] > 0.9 and result[0][0].lower() != pt.lower(): |
| | if "\n" in pt: |
| | words[ind] = "\n" + result[0][0] |
| | else: |
| | words[ind] = result[0][0] |
| | return " ".join(words) |
| |
|
| | def closest_justice(name, datetime): |
| | cy = court_from_year(datetime) |
| | justices = cy['Associate'] |
| | if cy['Chief'] is not None: |
| | justices += [cy['Chief']] |
| | if name.capitalize() not in justices: |
| | scores = [distance(j, name) for (i,j) in enumerate(justices)] |
| | closest_name = justices[np.argmin(scores)] |
| | if closest_name.capitalize() == cy['Chief']: |
| | closest_name = "Chief" |
| | return closest_name |
| | else: |
| | return name |
| |
|
| | class Opinion: |
| | def __init__(self, opinion_type, author, joining, body_text, fn_text, header_text): |
| | self.opinion_type = opinion_type |
| | self.author = author |
| | self.joining = joining |
| | self.body_text = body_text |
| | self.fn_text = fn_text |
| | self.header_text = header_text |
| |
|
| | class Case: |
| | def __init__(self, paginated_dict): |
| | self.paginated_dict = paginated_dict |
| | self.majority, self.concurrences, self.dissents, self.pre = None, [], [], None |
| | self.date, self.case_name, self.case_citation, self.page_numbers = None, "", None, [] |
| | self.recused = [] |
| | self.cert_info = None |
| |
|
| | def get_date(self): |
| | print("Extracting Date") |
| | if self.pre is None: |
| | print(self.paginated_dict) |
| | doc = nlp(self.pre.body_text[0:2000]) |
| | sents = list(doc.sents) |
| | for s in sents: |
| | if "Decided" in s.text: |
| | date_extract = s.text.replace('\n', '').split('Decided')[-1].strip().replace('.', '') |
| | pattern = re.compile('Decided\s*\w*\s*[0-9]{1,2}[\.,]\s?[0-9]{4}') |
| | match = re.search(pattern, s.text) |
| | date_extract = s.text[match.span()[0]:match.span()[1]].split('Decided')[-1].strip() |
| | date = dateparser.parse(date_extract) |
| | self.date = date |
| | return |
| |
|
| | def update_recused(self): |
| | print("Identifying recused") |
| | p = re.compile('(?:justice )[\w\s]*(?: took no part)', re.IGNORECASE) |
| | m = re.search(p, self.majority.body_text) |
| | if m is not None: |
| | recused_span = self.majority.body_text[m.span()[0]:m.span()[1]].lower() |
| | doc = nlp(recused_span) |
| | self.recused = [e.text.split('justice')[-1].upper().strip().capitalize() for e in doc.ents if |
| | e.label_ == "PERSON"] |
| | if "chief justice" in recused_span: |
| | self.recused.append("Chief") |
| |
|
| | def update_majority_joining(self): |
| | print("Getting updated list") |
| | cy = court_from_year(self.date) |
| | known = [j for d in self.dissents for j in d.joining] + [d.author for d in self.dissents] + [j for c in |
| | self.concurrences |
| | for j in |
| | c.joining] + [ |
| | c.author for c in self.concurrences] + [self.majority.author] + [r for r in self.recused] |
| | all_justices = [aj for aj in cy['Associate']] |
| | if cy['Chief'] is not None: |
| | all_justices.append('Chief') |
| | self.majority.joining = [aj for aj in all_justices if aj not in known] |
| |
|
| | def get_cert_info(self): |
| | print("Extracting Cert Info") |
| | lines = self.pre.body_text.split('\n') |
| | start = -1 |
| | end = -1 |
| | for (i, l) in enumerate(lines): |
| | if "petition" in l.lower() or "cert" in l.lower() or "error" in l.lower() or "appeal" in l.lower() or "on" in l.lower().split(' '): |
| | start = i |
| | if "no." in l.lower() or "nos." in l.lower() or "argued" in l.lower() or "decided" in l.lower(): |
| | end = i |
| | break |
| | self.cert_info = " ".join(lines[start:end]).strip().upper().replace(' ', ' ').replace('.', '') |
| |
|
| | def get_case_name_cite_pns(self): |
| | lines_total = [l for p in self.paginated_dict for l in self.paginated_dict[p][0].split('\n')[:-1]] |
| | lines_selected = [] |
| | p = re.compile('(october|per curiam|opinion of|concur|dissent|statement of|argument|syllabus|[0-9] ?U.)', re.IGNORECASE) |
| | for l in lines_total: |
| | m = re.search(p, l) |
| | if m is None and not l.lower().strip().isnumeric(): |
| | lines_selected.append(l) |
| | self.case_name = mode(lines_selected) |
| |
|
| | p = re.compile('[0-9]*\s?U\.\s?S\. ?([0-9]|_)*', re.IGNORECASE) |
| | lines_selected = [] |
| | for l in lines_total: |
| | m = re.search(p, l) |
| | if m is not None: |
| | self.case_citation = l[m.span()[0]:m.span()[1]] |
| | break |
| |
|
| | p = re.compile('^\s?[0-9]+\s?$', re.IGNORECASE) |
| | page_lines = [self.paginated_dict[p][0].split('\n') for p in self.paginated_dict] |
| | self.page_numbers = [] |
| | for pl in page_lines: |
| | numeric_on_page = [] |
| | for l in pl: |
| | matches = list(re.finditer(p, l)) |
| | for m in matches: |
| | possibility = int(l[m.span()[0]:m.span()[1]].strip()) |
| | numeric_on_page.append(possibility) |
| | if len(numeric_on_page) == 0: |
| | if len(self.page_numbers) > 0: |
| | self.page_numbers.append(self.page_numbers[-1] + 1) |
| | else: |
| | self.page_numbers.append(1) |
| | if len(numeric_on_page) > 0: |
| | page_number = max(numeric_on_page) |
| | if len(self.page_numbers) > 0: |
| | page_number = max(page_number, self.page_numbers[-1] + 1) |
| | self.page_numbers.append(page_number) |
| |
|
| | if self.case_citation is not None and self.case_citation.lower().split('s.')[-1].strip() == "": |
| | self.case_citation = self.case_citation.strip() + ' ' + str(self.page_numbers[0]) |
| |
|
| | def update_justice_names(self): |
| | if self.majority.author.lower() != "per curiam": |
| | self.majority.author = closest_justice(self.majority.author, self.date) |
| | for (i,cons) in enumerate(self.concurrences): |
| | self.concurrences[i].author = closest_justice(self.concurrences[i].author, self.date) |
| | for (i,dissents) in enumerate(self.dissents): |
| | self.dissents[i].author = closest_justice(self.dissents[i].author, self.date) |
| | return |
| |
|
| | def process(self): |
| | self.get_date() |
| | self.update_justice_names() |
| | self.update_recused() |
| | self.update_majority_joining() |
| | self.get_cert_info() |
| | self.get_case_name_cite_pns() |
| |
|
| | def run(folderpath): |
| | splits, paginated_dict = get_splits(folderpath) |
| | C = Case(paginated_dict=paginated_dict) |
| | ops = [] |
| | for s in splits: |
| | opinion_type, author, joining = get_split_data(s) |
| | if opinion_type is not None: |
| | op = Opinion(opinion_type, author, joining, s, fn_text=None, header_text=None) |
| | if opinion_type == "majority": |
| | C.majority = op |
| | if opinion_type == "concurrence": |
| | C.concurrences.append(op) |
| | if opinion_type == "dissent": |
| | C.dissents.append(op) |
| | if opinion_type == "pre": |
| | C.pre = op |
| | ops.append(op) |
| |
|
| | C.process() |
| | return C |