import os import tempfile import time from random import random from pathlib import Path import pandas as pd import polars as pl import streamlit as st import requests import cloudscraper from bs4 import BeautifulSoup import regex as re import subprocess base_url = 'https://www.taiwan-panorama.com/' #%% def getPage(url): headers = {'user-agent': 'Chrome/143.0.7499.170'} scraper = cloudscraper.create_scraper( browser={ 'browser': 'chrome', 'platform': 'windows', 'mobile': False } ) DONE = False MAXTRIALS = 10 cnt_fail = 0 res = None while not DONE: try: #res = requests.get(url, headers=headers) res = scraper.get(url) except requests.exceptions.RequestException: try: res = requests.get(url, headers=headers) except: cnt_fail += 1 print(f"failed {cnt_fail} time(s)...[{url}]", flush=True) DONE = res != None or cnt_fail > MAXTRIALS time.sleep(5 + random()*6) if res == None: return None else: res.encoding = 'utf-8' return res.status_code, res.text def retrieveTWP(src_url, lang): paras_zh = None paras_en = None # zh status, html = getPage(src_url) if status == 200: soup = BeautifulSoup(html, 'lxml') try: articles = soup.find_all('article') if articles: paras = articles[0].find_all(('h1', 'h2', 'p')) paras_zh = [p.text.strip() for p in paras] paras_zh = [p for p in paras_zh if p] except: pass # en tgt_url = base_url + f"/{lang}/Articles/Details?Guid=" + getURLlang(soup, lang) status, html = getPage(tgt_url) if status == 200: soup = BeautifulSoup(html, 'lxml') try: articles = soup.find_all('article') if articles: paras = articles[0].find_all(('h1', 'h2', 'p')) paras_en = [p.text.strip() for p in paras] paras_en = [p for p in paras_en if p] except: pass return paras_zh, paras_en def getURLlang(soup, lang): """ Input: Parsed HTML of zh article Output: URL of same article but in language "lang" (e.g., 'en', 'ja') """ guid_regex = re.compile(r"Guid=([\da-z-]+?)\&") urls = soup.find_all('a', {'href': re.compile(fr"^/{lang}/Articles/Details\?Guid=")}) if urls: guids = guid_regex.findall(urls[0]['href']) if guids: return guids[0] return None def save_uploaded_file(uploaded_file): """ Helper function to save an uploaded file to a temporary directory. Returns the absolute path to the saved file. """ try: # Use a safe ASCII suffix suffix = os.path.splitext(uploaded_file.name)[1] if not suffix: suffix = ".pdf" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: tmp_file.write(uploaded_file.getvalue()) return tmp_file.name except Exception as e: st.error(f"Error saving file: {e}") return None # Import Defense Digest Processor try: from defense_digest import DefenseDigestProcessor except ImportError: DefenseDigestProcessor = None st.error( "Could not import DefenseDigestProcessor. Make sure dependencies are installed." ) def processInputData(files=None, urls=None, input_type=None): """ Dummy placeholder function to process input data. Returns a pandas DataFrame with columns: cosine_distance, source_language, target_language. """ # Handle Defense Digest Logic if input_type == "Defense Digest" and DefenseDigestProcessor: processor = DefenseDigestProcessor() all_dfs = [] progress_bar = st.progress(0) status_text = st.empty() log_area = st.expander("Processing Log", expanded=True) logs = [] def update_progress(msg): status_text.text(msg) logs.append(msg) # Keep only last 20 lines for display to avoid clutter, or show all in expander log_area.code("\n".join(logs[-20:])) if files: for i, uploaded_file in enumerate(files): saved_path = save_uploaded_file(uploaded_file) if saved_path: update_progress(f"Processing {uploaded_file.name}...") try: # Save intermediate files to current directory # Pass original filename as display_name df = processor.process_pdf( saved_path, output_dir=os.getcwd(), progress_callback=update_progress, display_name=uploaded_file.name, ) if df is not None and not df.empty: all_dfs.append(df) except Exception as e: st.error(f"Error processing {uploaded_file.name}: {e}") finally: # Cleanup temp file try: # os.remove(saved_path) # Commented out for debugging pass except: pass progress_bar.progress((i + 1) / len(files)) if all_dfs: return pd.concat(all_dfs, ignore_index=True) else: return pd.DataFrame( columns=["cosine_distance", "source_language", "target_language"] ) elif input_type == "Taiwan Panorama": src_url = urls[0] tgt_url = urls[1] lang = 'en' # target language st.success("Retrieving Panorama pages...") paras_zh, paras_en = retrieveTWP(src_url, lang) st.success("Completed retrieval of Panorama pages...") fon_src = f'{paras_zh[0]}.zh.txt' fon_tgt = f'{paras_zh[0]}.{lang}.txt' with open(fon_src, 'w', encoding='utf-8', newline='\n') as fo: fo.write('\n'.join(paras_zh) + '\n') st.success(f"Written source file: {fon_src}") with open(fon_tgt, 'w', encoding='utf-8', newline='\n') as fo: fo.write('\n'.join(paras_en) + '\n') st.success(f"Written target file: {fon_tgt}") st.success(f"Source URL: {src_url}") st.success(f"Target URL: {tgt_url}") start_time = time.perf_counter() st.success("Begin aligning bitext...") fin = paras_zh[0] result = subprocess.run(['python', 'alignGenericGGUF.py', fin], capture_output=True, text=True) end_time = time.perf_counter() duration = end_time - start_time st.success(f"Done aligning bitext in: {duration:.2f} seconds") print(result.stdout) print(result.stderr) elif input_type == "Scientific American Taiwan": pass # Default Dummy Logic for other types or if processor fails # Example: Save files to disk for processing file_paths = [] if files: for uploaded_file in files: # Option 1: Process directly from memory (if library supports it) # pdf_reader = PyPDF2.PdfReader(uploaded_file) # Option 2: Save to disk (common for many libraries) saved_path = save_uploaded_file(uploaded_file) if saved_path: file_paths.append(saved_path) # st.write(f"Saved {uploaded_file.name} to {saved_path}") # Debug info # Dummy data data = { "cosine_distance": [0.1, 0.2, 0.05, 0.3], "source_language": [ "This is a sentence.", "Another sentence.", "Hello world.", "Testing.", ], "target_language": [ "C'est une phrase.", "Une autre phrase.", "Bonjour le monde.", "Test.", ], } aligned_files = list(Path('./').rglob(f"{fin}.vecalign*.txt")) if aligned_files: df = pl.read_csv( source=aligned_files[0], separator='\t', has_header=True, null_values='', ) else: df = pd.DataFrame(data) return df def main(): st.set_page_config(page_title="Bitext Aligner", layout="wide") st.title("Bitext Alignment Tool") st.markdown(""" """, unsafe_allow_html=True) # Sidebar for inputs with st.sidebar: st.header("Input Settings") # 3. Nature of input info input_type = st.radio( "Select Input Nature:", ("Defense Digest", "Taiwan Panorama", "Scientific American Taiwan"), ) st.divider() # 1. File Upload (PDFs) st.subheader("Upload PDFs") uploaded_files = st.file_uploader( "Upload one or two PDF files", type=["pdf"], accept_multiple_files=True ) st.divider() # 2. URLs st.subheader("Enter URLs") url1 = st.text_input("URL 1 (Source)") url2 = st.text_input("URL 2 (Target)") process_btn = st.button("Process") # Main area if process_btn: if not uploaded_files and not (url1 and url2): st.warning("Please upload files or provide a pair of URLs.") else: with st.spinner("Processing..."): # Call dummy business logic df = processInputData( files=uploaded_files, urls=(url1, url2) if url1 and url2 else None, input_type=input_type, ) st.success("Processing Complete!") # 4. Display data in text grid #st.dataframe(df, width="stretch") st.table(df) # Option to download as Excel (implied by requirement to create Excel file) # For now, we just show the dataframe as requested. #%% if __name__ == "__main__": main()