Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import time | |
| from random import random | |
| from pathlib import Path | |
| import pandas as pd | |
| import polars as pl | |
| import streamlit as st | |
| import requests | |
| import cloudscraper | |
| from bs4 import BeautifulSoup | |
| import regex as re | |
| import subprocess | |
| base_url = 'https://www.taiwan-panorama.com/' | |
| #%% | |
| def getPage(url): | |
| headers = {'user-agent': 'Chrome/143.0.7499.170'} | |
| scraper = cloudscraper.create_scraper( | |
| browser={ | |
| 'browser': 'chrome', | |
| 'platform': 'windows', | |
| 'mobile': False | |
| } | |
| ) | |
| DONE = False | |
| MAXTRIALS = 10 | |
| cnt_fail = 0 | |
| res = None | |
| while not DONE: | |
| try: | |
| #res = requests.get(url, headers=headers) | |
| res = scraper.get(url) | |
| except requests.exceptions.RequestException: | |
| try: | |
| res = requests.get(url, headers=headers) | |
| except: | |
| cnt_fail += 1 | |
| print(f"failed {cnt_fail} time(s)...[{url}]", flush=True) | |
| DONE = res != None or cnt_fail > MAXTRIALS | |
| time.sleep(5 + random()*6) | |
| if res == None: | |
| return None | |
| else: | |
| res.encoding = 'utf-8' | |
| return res.status_code, res.text | |
| def retrieveTWP(src_url, lang): | |
| paras_zh = None | |
| paras_en = None | |
| # zh | |
| status, html = getPage(src_url) | |
| if status == 200: | |
| soup = BeautifulSoup(html, 'lxml') | |
| try: | |
| articles = soup.find_all('article') | |
| if articles: | |
| paras = articles[0].find_all(('h1', 'h2', 'p')) | |
| paras_zh = [p.text.strip() for p in paras] | |
| paras_zh = [p for p in paras_zh if p] | |
| except: | |
| pass | |
| # en | |
| tgt_url = base_url + f"/{lang}/Articles/Details?Guid=" + getURLlang(soup, lang) | |
| status, html = getPage(tgt_url) | |
| if status == 200: | |
| soup = BeautifulSoup(html, 'lxml') | |
| try: | |
| articles = soup.find_all('article') | |
| if articles: | |
| paras = articles[0].find_all(('h1', 'h2', 'p')) | |
| paras_en = [p.text.strip() for p in paras] | |
| paras_en = [p for p in paras_en if p] | |
| except: | |
| pass | |
| return paras_zh, paras_en | |
| def getURLlang(soup, lang): | |
| """ | |
| Input: Parsed HTML of zh article | |
| Output: URL of same article but in language "lang" (e.g., 'en', 'ja') | |
| """ | |
| guid_regex = re.compile(r"Guid=([\da-z-]+?)\&") | |
| urls = soup.find_all('a', {'href': re.compile(fr"^/{lang}/Articles/Details\?Guid=")}) | |
| if urls: | |
| guids = guid_regex.findall(urls[0]['href']) | |
| if guids: | |
| return guids[0] | |
| return None | |
| def save_uploaded_file(uploaded_file): | |
| """ | |
| Helper function to save an uploaded file to a temporary directory. | |
| Returns the absolute path to the saved file. | |
| """ | |
| try: | |
| # Use a safe ASCII suffix | |
| suffix = os.path.splitext(uploaded_file.name)[1] | |
| if not suffix: | |
| suffix = ".pdf" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| return tmp_file.name | |
| except Exception as e: | |
| st.error(f"Error saving file: {e}") | |
| return None | |
| # Import Defense Digest Processor | |
| try: | |
| from defense_digest import DefenseDigestProcessor | |
| except ImportError: | |
| DefenseDigestProcessor = None | |
| st.error( | |
| "Could not import DefenseDigestProcessor. Make sure dependencies are installed." | |
| ) | |
| def processInputData(files=None, urls=None, input_type=None): | |
| """ | |
| Dummy placeholder function to process input data. | |
| Returns a pandas DataFrame with columns: cosine_distance, source_language, target_language. | |
| """ | |
| # Handle Defense Digest Logic | |
| if input_type == "Defense Digest" and DefenseDigestProcessor: | |
| processor = DefenseDigestProcessor() | |
| all_dfs = [] | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| log_area = st.expander("Processing Log", expanded=True) | |
| logs = [] | |
| def update_progress(msg): | |
| status_text.text(msg) | |
| logs.append(msg) | |
| # Keep only last 20 lines for display to avoid clutter, or show all in expander | |
| log_area.code("\n".join(logs[-20:])) | |
| if files: | |
| for i, uploaded_file in enumerate(files): | |
| saved_path = save_uploaded_file(uploaded_file) | |
| if saved_path: | |
| update_progress(f"Processing {uploaded_file.name}...") | |
| try: | |
| # Save intermediate files to current directory | |
| # Pass original filename as display_name | |
| df = processor.process_pdf( | |
| saved_path, | |
| output_dir=os.getcwd(), | |
| progress_callback=update_progress, | |
| display_name=uploaded_file.name, | |
| ) | |
| if df is not None and not df.empty: | |
| all_dfs.append(df) | |
| except Exception as e: | |
| st.error(f"Error processing {uploaded_file.name}: {e}") | |
| finally: | |
| # Cleanup temp file | |
| try: | |
| # os.remove(saved_path) # Commented out for debugging | |
| pass | |
| except: | |
| pass | |
| progress_bar.progress((i + 1) / len(files)) | |
| if all_dfs: | |
| return pd.concat(all_dfs, ignore_index=True) | |
| else: | |
| return pd.DataFrame( | |
| columns=["cosine_distance", "source_language", "target_language"] | |
| ) | |
| elif input_type == "Taiwan Panorama": | |
| src_url = urls[0] | |
| tgt_url = urls[1] | |
| lang = 'en' # target language | |
| st.success("Retrieving Panorama pages...") | |
| paras_zh, paras_en = retrieveTWP(src_url, lang) | |
| st.success("Completed retrieval of Panorama pages...") | |
| fon_src = f'{paras_zh[0]}.zh.txt' | |
| fon_tgt = f'{paras_zh[0]}.{lang}.txt' | |
| with open(fon_src, 'w', encoding='utf-8', newline='\n') as fo: | |
| fo.write('\n'.join(paras_zh) + '\n') | |
| st.success(f"Written source file: {fon_src}") | |
| with open(fon_tgt, 'w', encoding='utf-8', newline='\n') as fo: | |
| fo.write('\n'.join(paras_en) + '\n') | |
| st.success(f"Written target file: {fon_tgt}") | |
| st.success(f"Source URL: {src_url}") | |
| st.success(f"Target URL: {tgt_url}") | |
| start_time = time.perf_counter() | |
| st.success("Begin aligning bitext...") | |
| fin = paras_zh[0] | |
| result = subprocess.run(['python', 'alignGenericGGUF.py', fin], capture_output=True, text=True) | |
| end_time = time.perf_counter() | |
| duration = end_time - start_time | |
| st.success(f"Done aligning bitext in: {duration:.2f} seconds") | |
| print(result.stdout) | |
| print(result.stderr) | |
| elif input_type == "Scientific American Taiwan": | |
| pass | |
| # Default Dummy Logic for other types or if processor fails | |
| # Example: Save files to disk for processing | |
| file_paths = [] | |
| if files: | |
| for uploaded_file in files: | |
| # Option 1: Process directly from memory (if library supports it) | |
| # pdf_reader = PyPDF2.PdfReader(uploaded_file) | |
| # Option 2: Save to disk (common for many libraries) | |
| saved_path = save_uploaded_file(uploaded_file) | |
| if saved_path: | |
| file_paths.append(saved_path) | |
| # st.write(f"Saved {uploaded_file.name} to {saved_path}") # Debug info | |
| # Dummy data | |
| data = { | |
| "cosine_distance": [0.1, 0.2, 0.05, 0.3], | |
| "source_language": [ | |
| "This is a sentence.", | |
| "Another sentence.", | |
| "Hello world.", | |
| "Testing.", | |
| ], | |
| "target_language": [ | |
| "C'est une phrase.", | |
| "Une autre phrase.", | |
| "Bonjour le monde.", | |
| "Test.", | |
| ], | |
| } | |
| aligned_files = list(Path('./').rglob(f"{fin}.vecalign*.txt")) | |
| if aligned_files: | |
| df = pl.read_csv( | |
| source=aligned_files[0], | |
| separator='\t', | |
| has_header=True, | |
| null_values='', | |
| ) | |
| else: | |
| df = pd.DataFrame(data) | |
| return df | |
| def main(): | |
| st.set_page_config(page_title="Bitext Aligner", layout="wide") | |
| st.title("Bitext Alignment Tool") | |
| st.markdown(""" | |
| <style> | |
| table { | |
| width: 100%; | |
| } | |
| th:nth-child(1) { width: 50px; } | |
| th:nth-child(2) { width: 80px; } | |
| th:nth-child(3) { width: 100px; } | |
| th:nth-child(4) { width: 500px; } | |
| th:nth-child(5) { width: 100px; } | |
| th:nth-child(6) { width: 500px; } | |
| td { | |
| word-wrap: break-word; | |
| min-width: 50px; | |
| max-width: 400px; | |
| white-space: normal !important; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Sidebar for inputs | |
| with st.sidebar: | |
| st.header("Input Settings") | |
| # 3. Nature of input info | |
| input_type = st.radio( | |
| "Select Input Nature:", | |
| ("Defense Digest", "Taiwan Panorama", "Scientific American Taiwan"), | |
| ) | |
| st.divider() | |
| # 1. File Upload (PDFs) | |
| st.subheader("Upload PDFs") | |
| uploaded_files = st.file_uploader( | |
| "Upload one or two PDF files", type=["pdf"], accept_multiple_files=True | |
| ) | |
| st.divider() | |
| # 2. URLs | |
| st.subheader("Enter URLs") | |
| url1 = st.text_input("URL 1 (Source)") | |
| url2 = st.text_input("URL 2 (Target)") | |
| process_btn = st.button("Process") | |
| # Main area | |
| if process_btn: | |
| if not uploaded_files and not (url1 and url2): | |
| st.warning("Please upload files or provide a pair of URLs.") | |
| else: | |
| with st.spinner("Processing..."): | |
| # Call dummy business logic | |
| df = processInputData( | |
| files=uploaded_files, | |
| urls=(url1, url2) if url1 and url2 else None, | |
| input_type=input_type, | |
| ) | |
| st.success("Processing Complete!") | |
| # 4. Display data in text grid | |
| #st.dataframe(df, width="stretch") | |
| st.table(df) | |
| # Option to download as Excel (implied by requirement to create Excel file) | |
| # For now, we just show the dataframe as requested. | |
| #%% | |
| if __name__ == "__main__": | |
| main() | |