| import fitz |
| import numpy as np |
| import os |
| import pandas as pd |
| import re |
| import datetime |
| import pytesseract |
| import cv2 |
| import warnings |
| import ocrmypdf |
| import spacy |
| import dateparser |
| import statistics |
| from statistics import mode |
| from textblob import Word |
| from Levenshtein import distance |
|
|
| nlp = spacy.load('en_core_web_trf') |
|
|
| def parse_doc(folderpath): |
| doc = fitz.open(folderpath + '/opinion.pdf') |
| header_texts, body_texts, footer_texts = [], [], [] |
| paginated_dict = {} |
| for (i, p) in enumerate(doc): |
| ht, bt, ft = parse_page(folderpath, i) |
| if "preliminary print" in ht.lower(): |
| continue |
| body_texts.append(bt) |
| header_texts.append(ht) |
| footer_texts.append(ft) |
| paginated_dict[i] = (ht, bt, ft) |
| return header_texts, body_texts, footer_texts, paginated_dict |
|
|
|
|
| def parse_page(folderpath, pg_ind): |
| df = pd.read_csv(folderpath + '/data.csv') |
|
|
| header_text, body_text, footer_text = None, None, None |
| page_df = df[df['Pg Ind'] == pg_ind].to_dict('records')[0] |
| header = [page_df['Header X1'], page_df['Header Y1'], page_df['Header X2'], page_df['Header Y2']] |
| body = [page_df['Body X1'], page_df['Body Y1'], page_df['Body X2'], page_df['Body Y2']] |
| footer = [page_df['Footer X1'], page_df['Footer Y1'], page_df['Footer X2'], page_df['Footer Y2']] |
| case_split = page_df['Case Separator Y'] |
| body_rect = fitz.Rect(body[0], body[1], body[2], body[3]) |
| header_rect = fitz.Rect(header[0], header[1], header[2], header[3]) |
| footer_rect = fitz.Rect(footer[0], footer[1], footer[2], footer[3]) |
|
|
| doc = fitz.open(folderpath + '/opinion.pdf') |
| page = doc[pg_ind] |
| header_text = page.get_text("text", clip=header_rect).strip().replace('Page Proof Pending Publication', '') |
| body_text = page.get_text("text", clip=body_rect).strip().replace('Page Proof Pending Publication', '') |
| if str(footer_rect[0]) != "nan": |
| footer_text = page.get_text("text", clip=footer_rect).strip().replace('Page Proof Pending Publication', '') |
| return header_text, body_text, footer_text |
|
|
|
|
| def get_splits(folderpath): |
| header_texts, body_texts, footer_texts, paginated_dict = parse_doc(folderpath) |
| full_body_text = "\n".join(body_texts).replace('-', '') |
| full_body_text = correct(full_body_text, "justice") |
|
|
| |
| split_p = re.compile('((\n|^)\s*Per Curiam\.\s*\n)|(Justice[A-z\s\n,]*delivered the opinion)|((\n|^)\s*(mr\.\s*)?justice[A-Za-z\n\s,–-]*(concurring|dissenting)[A-Za-z\n\s,–]*\.)', re.IGNORECASE) |
|
|
| splits_m = list(re.finditer(split_p, full_body_text)) |
| splits = [] |
|
|
| if len(splits_m) > 0: |
| print("---Found split---") |
| i = 0 |
| while i <= len(splits_m): |
| if i == 0: |
| start = 0 |
| else: |
| start = splits_m[i - 1].span()[0] |
| if i == len(splits_m): |
| splits.append(full_body_text[start:].strip()) |
| else: |
| splits.append(full_body_text[start:splits_m[i].span()[0]].strip()) |
| i = i + 1 |
| return splits, paginated_dict |
|
|
|
|
| def get_split_data(split): |
| txt = split[0:300] |
| d = nlp(txt) |
| first_sent = list(d.sents)[0] |
| first_sent_text = " ".join([t.text for t in first_sent]) |
| ents = nlp(first_sent_text).ents |
| person_ents = [e.text.lower().split('tice')[-1].strip().capitalize() for e in ents if e.label_ == "PERSON"] |
| if "chief justice" in first_sent_text.lower(): |
| person_ents.append("Chief") |
| opinion_type, author, joining = None, None, [] |
| if "delivered" in first_sent_text: |
| author = person_ents[0] |
| joining = [] |
| opinion_type = "majority" |
| if "per curiam" in first_sent_text.lower(): |
| author = "Per Curiam" |
| joining = [] |
| opinion_type = "majority" |
| if "concurring" in first_sent_text: |
| author = person_ents[0] |
| joining = person_ents[1:] |
| opinion_type = "concurrence" |
| if "dissenting" in first_sent_text: |
| author = person_ents[0] |
| joining = person_ents[1:] |
| opinion_type = "dissent" |
| if opinion_type == None: |
| opinion_type = "pre" |
| return opinion_type, author, joining |
|
|
| def court_from_year(date_time): |
| df = pd.read_csv('Justices Table.csv') |
| justice_dict = df.to_dict('records') |
| court_year = {'Associate':[], 'Chief': None} |
| for j in justice_dict: |
| start = datetime.datetime.strptime(j['Start'], '%Y-%m-%d') |
| if str(j['End']) != "nan": |
| end = datetime.datetime.strptime(j['End'], '%Y-%m-%d') |
| else: |
| end = datetime.datetime.now() |
| if date_time > start and date_time < end: |
| name = j['Name'] |
| if "Associate" in name: |
| court_year['Associate'].append(name.split('(Associate Justice)')[0].split(', ')[0].strip().split(' ')[-1]) |
| if "Chief" in name: |
| court_year['Chief'] = name.split('(Chief Justice)')[0].split(', ')[0].strip() |
| return court_year |
|
|
| def correct(corpus, keyword): |
| words = corpus.split(' ') |
| potential_targets = [] |
| for (i, w) in enumerate(words): |
| d = distance(keyword, w.lower()) |
| if d < 2 and d > 0: |
| potential_targets.append((i, w)) |
|
|
| for (ind, pt) in potential_targets: |
| word = Word(pt.lower()) |
| result = word.spellcheck() |
| if result[0][1] > 0.9 and result[0][0].lower() != pt.lower(): |
| if "\n" in pt: |
| words[ind] = "\n" + result[0][0] |
| else: |
| words[ind] = result[0][0] |
| return " ".join(words) |
|
|
| class Opinion: |
| def __init__(self, opinion_type, author, joining, body_text, fn_text, header_text): |
| self.opinion_type = opinion_type |
| self.author = author |
| self.joining = joining |
| self.body_text = body_text |
| self.fn_text = fn_text |
| self.header_text = header_text |
|
|
| class Case: |
| def __init__(self, paginated_dict): |
| self.paginated_dict = paginated_dict |
| self.majority, self.concurrences, self.dissents, self.pre = None, [], [], None |
| self.date, self.case_name, self.case_citation, self.page_numbers = None, "", None, [] |
| self.recused = [] |
| self.cert_info = None |
|
|
| def get_date(self): |
| print("Extracting Date") |
| doc = nlp(self.pre.body_text[0:2000]) |
| sents = list(doc.sents) |
| for s in sents: |
| if "Decided" in s.text: |
| date_extract = s.text.replace('\n', '').split('Decided')[-1].strip().replace('.', '') |
| pattern = re.compile('Decided\s*\w*\s*[0-9]{1,2}[\.,]\s?[0-9]{4}') |
| match = re.search(pattern, s.text) |
| date_extract = s.text[match.span()[0]:match.span()[1]].split('Decided')[-1].strip() |
| date = dateparser.parse(date_extract) |
| self.date = date |
| return |
|
|
| def update_recused(self): |
| print("Identifying recused") |
| p = re.compile('(?:justice )[\w\s]*(?: took no part)', re.IGNORECASE) |
| m = re.search(p, self.majority.body_text) |
| if m is not None: |
| recused_span = self.majority.body_text[m.span()[0]:m.span()[1]].lower() |
| doc = nlp(recused_span) |
| self.recused = [e.text.split('justice')[-1].upper().strip().capitalize() for e in doc.ents if |
| e.label_ == "PERSON"] |
| if "chief justice" in recused_span: |
| self.recused.append("Chief") |
|
|
| def update_majority_joining(self): |
| print("Getting updated list") |
| cy = court_from_year(self.date) |
| known = [j for d in self.dissents for j in d.joining] + [d.author for d in self.dissents] + [j for c in self.concurrences for j in c.joining] + [c.author for c in self.concurrences] + [self.majority.author] + [r for r in self.recused] |
| all_justices = [aj for aj in cy['Associate']] |
| if cy['Chief'] is not None: |
| all_justices.append('Chief') |
| self.majority.joining = [aj for aj in all_justices if aj not in known] |
|
|
| def get_cert_info(self): |
| print("Extracting Cert Info") |
| lines = self.pre.body_text.split('\n') |
| start = -1 |
| end = -1 |
| for (i, l) in enumerate(lines): |
| if "petition" in l.lower() or "cert" in l.lower() or "error" in l.lower() or "appeals" in l.lower() or "on" in l.lower().split(' '): |
| start = i |
| if "no." in l.lower() or "no.s" in l.lower() or "argued" in l.lower() or "decided" in l.lower(): |
| end = i |
| break |
| self.cert_info = " ".join(lines[start:end]).strip().upper().replace(' ', ' ').replace('.', '') |
|
|
| def get_case_name_cite_pns(self): |
| lines_total = [l for p in self.paginated_dict for l in self.paginated_dict[p][0].split('\n')[:-1]] |
| lines_selected = [] |
| p = re.compile('(october|per curiam|opinion of|concur|dissent|statement of|argument|syllabus|[0-9] ?U.)', re.IGNORECASE) |
| for l in lines_total: |
| m = re.search(p, l) |
| if m is None and not l.lower().strip().isnumeric(): |
| lines_selected.append(l) |
| self.case_name = mode(lines_selected) |
|
|
| p = re.compile('[0-9]*\s?U\.\s?S\. ?([0-9]|_)*', re.IGNORECASE) |
| lines_selected = [] |
| for l in lines_total: |
| m = re.search(p, l) |
| if m is not None: |
| self.case_citation = l[m.span()[0]:m.span()[1]] |
| break |
|
|
| p = re.compile('^\s?[0-9]+\s?$', re.IGNORECASE) |
| page_lines = [self.paginated_dict[p][0].split('\n') for p in self.paginated_dict] |
| self.page_numbers = [] |
| for pl in page_lines: |
| numeric_on_page = [] |
| for l in pl: |
| matches = list(re.finditer(p, l)) |
| for m in matches: |
| possibility = int(l[m.span()[0]:m.span()[1]].strip()) |
| numeric_on_page.append(possibility) |
| if len(numeric_on_page) == 0: |
| if len(self.page_numbers) > 0: |
| self.page_numbers.append(self.page_numbers[-1] + 1) |
| else: |
| self.page_numbers.append(1) |
| if len(numeric_on_page) > 0: |
| page_number = max(numeric_on_page) |
| if len(self.page_numbers) > 0: |
| page_number = max(page_number, self.page_numbers[-1] + 1) |
| self.page_numbers.append(page_number) |
|
|
| if self.case_citation is not None and self.case_citation.lower().split('s.')[-1].strip() == "": |
| self.case_citation = self.case_citation.strip() + ' ' + str(self.page_numbers[0]) |
|
|
| def process(self): |
| self.get_date() |
| self.update_recused() |
| self.update_majority_joining() |
| self.get_cert_info() |
| self.get_case_name_cite_pns() |
|
|
| def run(folderpath): |
| splits, paginated_dict = get_splits(folderpath) |
| C = Case(paginated_dict=paginated_dict) |
| ops = [] |
| for s in splits: |
| opinion_type, author, joining = get_split_data(s) |
| if opinion_type is not None: |
| op = Opinion(opinion_type, author, joining, s, fn_text=None, header_text=None) |
| if opinion_type == "majority": |
| C.majority = op |
| if opinion_type == "concurrence": |
| C.concurrences.append(op) |
| if opinion_type == "dissent": |
| C.dissents.append(op) |
| if opinion_type == "pre": |
| C.pre = op |
| ops.append(op) |
|
|
| C.process() |
| return C |