import codecs import io import random import requests import time from datetime import date, timedelta, datetime from tqdm import tqdm from typing import Generator, Tuple import numpy as np import pandas as pd import plotly.express as px import gradio as gr # ラップする関数 def date_range( start: date, stop: date, step: timedelta = timedelta(1) ) -> Generator[date, None, None]: """startからendまで日付をstep日ずつループさせるジェネレータ""" current = start while current < stop: yield current current += step def get_url(download_date: date) -> Tuple[str, str]: """ダウンロードするURLと日付の文字列を返す""" month = download_date.strftime("%Y%m") day = download_date.strftime("%Y%m%d") return ( f"https://www.shijou-nippo.metro.tokyo.lg.jp/SN/{month}/{day}/Sui/Sui_K1.csv", day, ) def content_wrap(content): """1行目にヘッダ行が来るまでスキップする""" buffer = "" first = True for line in io.BytesIO(content): line_str = codecs.decode(line, "shift-jis") if first: if "品名" in line_str: first = False buffer = line_str else: continue else: buffer += line_str return io.StringIO(buffer) def insert_data(data, day, low_price, center_price, high_price, quantity): """ "データをリストに追加する""" data["date"].append(day) data["low_price"].append(low_price) data["center_price"].append(center_price) data["high_price"].append(high_price) data["quantity"].append(quantity) def to_numeric(x): """文字列を数値に変換する""" if isinstance(x, str): return float(x) else: return x def get_fish_data(year, month, day, drop): """ 東京卸売市場からデータを引っ張ってくる :param start_date: 開始日 :param end_date: 終了日 :return: さばの値段を結合したデータ """ year = str(year) if len(str(month)) == 1: month = '0' + str(month) else: month = str(month) if len(str(day)) == 1: day = '0' + str(day) else: day = str(day) date_str = year +'-' + month + '-' + day date_title = year + '年' + month + '月' + day + '日' date = datetime.strptime(date_str, '%Y-%m-%d') yesterday = date - timedelta(days=2) tommorow = date + timedelta(days=3) data = { "date": [], "low_price": [], "center_price": [], "high_price": [], "quantity": [], } iterator = tqdm( date_range(yesterday, tommorow), total=(tommorow - yesterday).days ) for download_date in iterator: url, day = get_url(download_date) iterator.set_description(day) response = requests.get(url) # URLが存在しないとき if response.status_code == 404: insert_data(data, day, np.nan, np.nan, np.nan, 0) continue assert ( response.status_code == 200 ), f"Unexpected HTTP response. Please check the website {url}." df = pd.read_csv(content_wrap(response.content)) # 欠損値補完 price_cols = ["安値(円)", "中値(円)", "高値(円)"] for c in price_cols: df[c].mask(df[c] == "-", np.nan, inplace=True) df[c].mask(df[c] == "−", np.nan, inplace=True) df["卸売数量"].mask(df["卸売数量"] == "-", np.nan, inplace=True) df["卸売数量"].mask(df["卸売数量"] == "−", np.nan, inplace=True) # 長崎で獲れたあじの中値と卸売数量 # 品目 == さば の行だけ抽出 df_aji = df.loc[df["品名"] == "さば", ["卸売数量"] + price_cols] # さばの販売がなかったら欠損扱いに if len(df_aji) == 0: insert_data(data, day, np.nan, np.nan, np.nan, 0) continue isnan = lambda x: isinstance(x, float) and np.isnan(x) # 産地ごと(?)のさばの販売実績を調べる low_prices = [] center_prices = [] high_prices = [] quantities = [] for i, row in enumerate(df_aji.iloc): lp, cp, hp, q = row[price_cols + ["卸売数量"]] lp, cp, hp, q = ( to_numeric(lp), to_numeric(cp), to_numeric(hp), to_numeric(q), ) # 中値だけが記録されている -> 価格帯が1個だけなので高値、安値も中値と同じにしておく if isnan(lp) and isnan(hp) and (not isnan(cp)): low_prices.append(cp) center_prices.append(cp) high_prices.append(cp) # 高値・安値があり中値がない -> 価格帯2個、とりあえず両者の平均を中値とする elif (not isnan(lp)) and (not isnan(hp)) and isnan(cp): low_prices.append(lp) center_prices.append((lp + hp) / 2) high_prices.append(hp) else: low_prices.append(lp) center_prices.append(cp) high_prices.append(hp) if isnan(row["卸売数量"]): quantities.append(0) else: quantities.append(q) low_price = int(min(low_prices)) center_price = int(sum(center_prices) / len(center_prices)) high_price = int(max(high_prices)) quantity = int(float(sum(quantities))) # 保存 insert_data(data, day, low_price, center_price, high_price, quantity) # 短期間にアクセスが集中しないようにクールタイムを設定 time.sleep(max(0.5 + random.normalvariate(0, 0.3), 0.1)) # DataFrameを作成 df = pd.DataFrame(data) df['date'] = pd.to_datetime(df['date']) df['日付'] = df['date'].dt.strftime('%m月%d日') df = df.rename(columns={'low_price':"低値(円/kg)", 'center_price':"中値(円/kg)",'high_price':"高値(円/kg)",'quantity': "卸売数量(kg)"}) df = df.fillna(0) quantity = df["卸売数量(kg)"].iloc[-3] high_price = df["高値(円/kg)"].iloc[-3] center_price = df["中値(円/kg)"].iloc[-3] low_price = df["低値(円/kg)"].iloc[-3] fig = px.line(df, x='日付', y=df[drop]) fig.update_layout( title=dict(text=date_title + 'と前後5日間の' + drop, font=dict(family='Times New Roman', size=15)), xaxis_title="日付", yaxis_title=drop, ) if drop == "卸売数量(kg)": return quantity, fig elif drop == "高値(円/kg)": return high_price, fig elif drop == "中値(円/kg)": return center_price, fig else: return low_price, fig def get_tide(year, month, day): tide_url = 'https://api.tide736.net/get_tide.php' year = str(year) if len(str(month)) == 1: month = '0' + str(month) else: month = str(month) if len(str(day)) == 1: day = '0' + str(day) else: day = str(day) date_str = year +'-' + month + '-' + day params = { # 鯖の漁獲量全国1位である茨城県の大津市に設定 'pc': 8, 'hc': 1, 'yr': year, 'mn': month, 'dy': day, 'rg': 'day' } res = requests.get(tide_url, params) info = res.json() tide_chart = info['tide']['chart'][date_str] tide = tide_chart['moon']['title'] if len(tide_chart['flood']) == 1: high_tide1 = tide_chart['flood'][0]['time'] high_tide_height1 = tide_chart['flood'][0]['cm'] high_tide2 = np.nan high_tide_height2 = np.nan else: high_tide1 = tide_chart['flood'][0]['time'] high_tide_height1 = tide_chart['flood'][0]['cm'] high_tide2 = tide_chart['flood'][1]['time'] high_tide_height2 = tide_chart['flood'][1]['cm'] if len(tide_chart['edd']) == 1: low_tide1 = tide_chart['edd'][0]['time'] low_tide_height1 = tide_chart['edd'][0]['cm'] low_tide2 = np.nan low_tide_height2 = np.nan else: low_tide1 = tide_chart['edd'][0]['time'] low_tide_height1 = tide_chart['edd'][0]['cm'] low_tide2 = tide_chart['edd'][1]['time'] low_tide_height2 = tide_chart['edd'][1]['cm'] hinode = tide_chart['sun']['rise'] hinoiri = tide_chart['sun']['set'] data ={'date':date_str, 'tide':tide, 'high_tide1':high_tide1, 'high_tide_height1':high_tide_height1, 'high_tide2':high_tide2, 'high_tide_height2':high_tide_height2, 'low_tide1':low_tide1, 'low_tide_height1':low_tide_height1, 'low_tide2':low_tide2, 'low_tide_height2':low_tide_height2, 'hinode':hinode, 'hinoiri':hinoiri} df_otu = pd.DataFrame(data = [data]) tide= df_otu['tide'].iloc[-1] high_tide1= df_otu['high_tide1'].iloc[-1] high_tide_height1= df_otu['high_tide_height1'].iloc[-1] high_tide2= df_otu['high_tide2'].iloc[-1] high_tide_height2= df_otu['high_tide_height2'].iloc[-1] low_tide1= df_otu['low_tide1'].iloc[-1] low_tide_height1= df_otu['low_tide_height1'].iloc[-1] low_tide2= df_otu['low_tide2'].iloc[-1] low_tide_height2= df_otu['low_tide_height2'].iloc[-1] hinode= df_otu['hinode'].iloc[-1] hinoiri= df_otu['hinoiri'].iloc[-1] return tide, hinode, hinoiri, high_tide1, high_tide_height1, high_tide2, high_tide_height2, low_tide1, low_tide_height1, low_tide2, low_tide_height2, hinode, hinoiri with gr.Blocks() as demo: gr.Markdown( """ # 鯖の市場データ&潮見表 #### 鯖の卸売数量と市場での価格、また鯖がよく獲れる茨城県の潮見表を表示できます。 """) with gr.Accordion("項目の説明", open = False): gr.Markdown(""" **卸売数量(kg)** : 市場で取引された数量 **高値(円/kg)** : 1日の中で最も高い卸売価格 **中値(円/kg)** : 最も総卸売数量の多い卸売価格 **低値(円/kg)** : 中値未満の卸売価格のうち、総卸売数量が最も多い卸売価格 """) with gr.Tabs(): with gr.TabItem("鯖の市場データ"): gr.Markdown( """ 指定した日時と項目の結果を表示します。 """) with gr.Row(): with gr.Column(): year_input_fish = gr.Slider(2010, 2023, step = 1, interactive = True, label="年", info="年を選択して下さい") month_input_fish = gr.Slider(1, 12, step = 1, interactive = True, label="月", info="月を選択して下さい") day_input_fish = gr.Slider(1, 31, step = 1, interactive = True, label="日", info="日を選択して下さい") drop_input_data = gr.Radio(["卸売数量(kg)", "高値(円/kg)", "中値(円/kg)", "低値(円/kg)"], label="卸売数量 or 市場での価格", info="項目をひとつ選択して下さい") with gr.Column(): drop_output_data = gr.Textbox(label = "結果", info="取引がない日は0になります") line_fig = gr.Plot(label="結果") fish_data_button = gr.Button("表示") gr.Examples(examples=[[2015, 12, 18, "卸売数量(kg)"], [2023, 3, 7, "高値(円/kg)"]], label="入力例", inputs=[year_input_fish, month_input_fish, day_input_fish, drop_input_data]) with gr.TabItem("潮見表"): gr.Markdown( """ 指定した日時の潮見表を表示します。 """) with gr.Row(): with gr.Column(): year_input_tide = gr.Slider(2010, 2023, step = 1, interactive = True, label="年", info="年を選択して下さい") month_input_tide = gr.Slider(1, 12, step = 1, interactive = True, label="月", info="月を選択して下さい") day_input_tide = gr.Slider(1, 31, step = 1, interactive = True, label="日", info="日を選択して下さい") tide_button = gr.Button("表示") with gr.Row(): output_tide = gr.Textbox(label = "潮名") output_hinode = gr.Textbox(label = "日の出") output_hinoiri = gr.Textbox(label = "日の入り") with gr.Row(): output_high_tide1 = gr.Textbox(label = "満潮時刻1") output_high_tide_height1 = gr.Textbox(label = "満潮水位1(cm)") output_high_tide2 = gr.Textbox(label = "満潮時刻2") output_high_tide_height2 = gr.Textbox(label = "満潮水位2(cm)") with gr.Row(): output_low_tide1 = gr.Textbox(label = "干潮時刻1") output_low_tide_height1 = gr.Textbox(label = "干潮水位1(cm)") output_low_tide2 = gr.Textbox(label = "干潮時刻2") output_low_tide_height2 = gr.Textbox(label = "干潮水位2(cm)") gr.Examples(examples=[[2015, 12, 18], [2023, 3, 7]], label="入力例", inputs=[year_input_tide, month_input_tide, day_input_tide]) with gr.Accordion("使用データ", open = False): gr.Markdown(""" ##### 東京卸売市場が発表している上記項目、また海上保安庁から潮汐データを使用している。 """) fish_data_button.click(get_fish_data, inputs=[year_input_fish, month_input_fish, day_input_fish, drop_input_data], outputs=[drop_output_data, line_fig]) tide_button.click(get_tide, inputs=[year_input_tide, month_input_tide, day_input_tide], outputs=[output_tide, output_hinode, output_hinoiri, output_high_tide1, output_high_tide_height1, output_high_tide2, output_high_tide_height2, output_low_tide1, output_low_tide_height1, output_low_tide2, output_low_tide_height2]) # 起動 demo.launch()