|
|
from bs4 import BeautifulSoup |
|
|
import re |
|
|
from models import FinancialResult,FinancialEntry,Financials,RatioEntry,CompanyData, CircuitBreakerRow |
|
|
from typing import List, Optional, Dict, Any |
|
|
|
|
|
class PsxScraper(object): |
|
|
def __init__(self, html_content:str): |
|
|
self.soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
def _clean_number(self, text: str) -> float: |
|
|
"""Clean and convert number strings to float""" |
|
|
if not text: |
|
|
return 0.0 |
|
|
|
|
|
text = str(text).replace(',', '').replace(' ', '').replace('Rs.', '') |
|
|
|
|
|
match = re.search(r'[-+]?\d*\.?\d+', text) |
|
|
return float(match.group()) if match else 0.0 |
|
|
|
|
|
def _extract_range(self, range_text: str) -> Dict[str, float]: |
|
|
"""Extract low, high, and current values from range strings""" |
|
|
|
|
|
parts = range_text.split('—') |
|
|
if len(parts) == 2: |
|
|
return { |
|
|
'low': self._clean_number(parts[0]), |
|
|
'high': self._clean_number(parts[1]), |
|
|
'current': 0.0 |
|
|
} |
|
|
return {'low': 0.0, 'high': 0.0, 'current': 0.0} |
|
|
|
|
|
|
|
|
def extract_announcements(self) -> List[FinancialResult]: |
|
|
"""Extract financial results announcements""" |
|
|
announcements = [] |
|
|
|
|
|
|
|
|
financial_results_tab = self.soup.find('div', class_='tabs__panel', attrs={'data-name': 'Financial Results'}) |
|
|
if not financial_results_tab: |
|
|
return announcements |
|
|
|
|
|
table = financial_results_tab.find('table') |
|
|
if not table: |
|
|
return announcements |
|
|
|
|
|
rows = table.find_all('tr')[1:] |
|
|
for row in rows: |
|
|
cols = row.find_all('td') |
|
|
if len(cols) >= 3: |
|
|
date = cols[0].text.strip() |
|
|
title = cols[1].text.strip() |
|
|
|
|
|
|
|
|
document_link = None |
|
|
pdf_link = None |
|
|
|
|
|
links = cols[2].find_all('a') |
|
|
for link in links: |
|
|
href = link.get('href', '') |
|
|
if 'javascript:' in href: |
|
|
document_link = href |
|
|
elif '.pdf' in href: |
|
|
pdf_link = href |
|
|
|
|
|
announcements.append(FinancialResult( |
|
|
date=date, |
|
|
title=title, |
|
|
documentLink=document_link, |
|
|
pdfLink=pdf_link |
|
|
)) |
|
|
|
|
|
return announcements |
|
|
|
|
|
|
|
|
def extract_financials(self) -> Financials: |
|
|
"""Extract financial data (annual and quarterly)""" |
|
|
annual_data = [] |
|
|
quarterly_data = [] |
|
|
|
|
|
|
|
|
financials_section = self.soup.find('div', id='financials') |
|
|
if not financials_section: |
|
|
return Financials(annual=[], quarterly=[]) |
|
|
|
|
|
|
|
|
annual_tab = financials_section.find('div', class_='tabs__panel', attrs={'data-name': 'Annual'}) |
|
|
if annual_tab: |
|
|
table = annual_tab.find('table') |
|
|
if table: |
|
|
headers = [] |
|
|
rows_data = [] |
|
|
|
|
|
|
|
|
header_row = table.find('thead').find('tr') |
|
|
for th in header_row.find_all('th'): |
|
|
headers.append(th.text.strip()) |
|
|
|
|
|
|
|
|
body_rows = table.find('tbody').find_all('tr') |
|
|
for row in body_rows: |
|
|
row_data = {} |
|
|
cells = row.find_all('td') |
|
|
if len(cells) == len(headers): |
|
|
for i, cell in enumerate(cells): |
|
|
row_data[headers[i]] = cell.text.strip() |
|
|
rows_data.append(row_data) |
|
|
|
|
|
|
|
|
if headers and rows_data: |
|
|
for i in range(1, len(headers)): |
|
|
period = headers[i] |
|
|
entry = FinancialEntry(period=period) |
|
|
|
|
|
for row in rows_data: |
|
|
metric = row[headers[0]] |
|
|
value = row[period] |
|
|
|
|
|
if 'Sales' in metric: |
|
|
entry.sales = self._clean_number(value) |
|
|
elif 'Profit after Taxation' in metric: |
|
|
entry.profit_after_tax = self._clean_number(value) |
|
|
elif 'EPS' in metric: |
|
|
entry.eps = self._clean_number(value) |
|
|
|
|
|
annual_data.append(entry) |
|
|
|
|
|
|
|
|
quarterly_tab = financials_section.find('div', class_='tabs__panel', attrs={'data-name': 'Quarterly'}) |
|
|
if quarterly_tab: |
|
|
table = quarterly_tab.find('table') |
|
|
if table: |
|
|
headers = [] |
|
|
rows_data = [] |
|
|
|
|
|
|
|
|
header_row = table.find('thead').find('tr') |
|
|
for th in header_row.find_all('th'): |
|
|
headers.append(th.text.strip()) |
|
|
|
|
|
|
|
|
body_rows = table.find('tbody').find_all('tr') |
|
|
for row in body_rows: |
|
|
row_data = {} |
|
|
cells = row.find_all('td') |
|
|
if len(cells) == len(headers): |
|
|
for i, cell in enumerate(cells): |
|
|
row_data[headers[i]] = cell.text.strip() |
|
|
rows_data.append(row_data) |
|
|
|
|
|
|
|
|
if headers and rows_data: |
|
|
for i in range(1, len(headers)): |
|
|
period = headers[i] |
|
|
entry = FinancialEntry(period=period) |
|
|
|
|
|
for row in rows_data: |
|
|
metric = row[headers[0]] |
|
|
value = row[period] |
|
|
|
|
|
if 'Sales' in metric: |
|
|
entry.sales = self._clean_number(value) |
|
|
elif 'Profit after Taxation' in metric: |
|
|
entry.profit_after_tax = self._clean_number(value) |
|
|
elif 'EPS' in metric: |
|
|
entry.eps = self._clean_number(value) |
|
|
|
|
|
quarterly_data.append(entry) |
|
|
|
|
|
return Financials(annual=annual_data, quarterly=quarterly_data) |
|
|
|
|
|
|
|
|
def extract_ratios(self) -> List[RatioEntry]: |
|
|
"""Extract financial ratios""" |
|
|
ratios = [] |
|
|
|
|
|
ratios_section = self.soup.find('div', id='ratios') |
|
|
if not ratios_section: |
|
|
return ratios |
|
|
|
|
|
table = ratios_section.find('table') |
|
|
if not table: |
|
|
return ratios |
|
|
|
|
|
headers = [] |
|
|
rows_data = [] |
|
|
|
|
|
|
|
|
header_row = table.find('thead').find('tr') |
|
|
for th in header_row.find_all('th'): |
|
|
headers.append(th.text.strip()) |
|
|
|
|
|
|
|
|
body_rows = table.find('tbody').find_all('tr') |
|
|
for row in body_rows: |
|
|
row_data = {} |
|
|
cells = row.find_all('td') |
|
|
if len(cells) == len(headers): |
|
|
for i, cell in enumerate(cells): |
|
|
row_data[headers[i]] = cell.text.strip() |
|
|
rows_data.append(row_data) |
|
|
|
|
|
|
|
|
if headers and rows_data: |
|
|
for i in range(1, len(headers)): |
|
|
period = headers[i] |
|
|
entry = RatioEntry(period=period) |
|
|
|
|
|
for row in rows_data: |
|
|
ratio_name = row[headers[0]] |
|
|
value = row[period] |
|
|
|
|
|
|
|
|
clean_value = value.replace('(', '').replace(')', '') |
|
|
|
|
|
if 'Gross Profit Margin' in ratio_name: |
|
|
entry.gross_profit_margin = self._clean_number(clean_value) |
|
|
elif 'Net Profit Margin' in ratio_name: |
|
|
entry.net_profit_margin = self._clean_number(clean_value) |
|
|
elif 'EPS Growth' in ratio_name: |
|
|
entry.eps_growth = self._clean_number(clean_value) |
|
|
elif 'PEG' in ratio_name: |
|
|
entry.peg = self._clean_number(clean_value) |
|
|
|
|
|
ratios.append(entry) |
|
|
|
|
|
return ratios |
|
|
|
|
|
def scrape_all_data(self) -> CompanyData: |
|
|
"""Scrape all data and return as CompanyData object""" |
|
|
return CompanyData( |
|
|
announcements=self.extract_announcements(), |
|
|
financials=self.extract_financials(), |
|
|
ratios=self.extract_ratios() |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def fetch_circuit_breaker_table(self,table_id:str) -> list[CircuitBreakerRow]: |
|
|
table = self.soup.find("table", id=table_id) |
|
|
if not table or not table.tbody: |
|
|
return [] |
|
|
records = [] |
|
|
for row in table.tbody.find_all("tr"): |
|
|
cols = [td.get_text(strip=True) for td in row.find_all("td")] |
|
|
|
|
|
records.append( |
|
|
CircuitBreakerRow( |
|
|
symbol=cols[0], |
|
|
ldcp=cols[1], |
|
|
open=cols[2], |
|
|
high=cols[3], |
|
|
low=cols[4], |
|
|
current=cols[5], |
|
|
change=cols[6], |
|
|
change_percent=cols[7], |
|
|
volume=cols[8], |
|
|
) |
|
|
) |
|
|
|
|
|
return records |
|
|
|
|
|
|