Spaces:
Sleeping
Sleeping
| import logging | |
| import re | |
| import os | |
| import pickle | |
| from datetime import datetime | |
| from concurrent.futures import ThreadPoolExecutor | |
| from tqdm import tqdm | |
| from web3 import Web3 | |
| from typing import Optional | |
| import pandas as pd | |
| from pathlib import Path | |
| from functools import partial | |
| from markets import ( | |
| etl as mkt_etl, | |
| DEFAULT_FILENAME as MARKETS_FILENAME, | |
| ) | |
| from tools import ( | |
| etl as tools_etl, | |
| DEFAULT_FILENAME as TOOLS_FILENAME, | |
| ) | |
| from profitability import run_profitability_analysis | |
| import gc | |
| logging.basicConfig(level=logging.INFO) | |
| SCRIPTS_DIR = Path(__file__).parent | |
| ROOT_DIR = SCRIPTS_DIR.parent | |
| DATA_DIR = ROOT_DIR / "data" | |
| def get_question(text: str) -> str: | |
| """Get the question from a text.""" | |
| # Regex to find text within double quotes | |
| pattern = r'"([^"]*)"' | |
| # Find all occurrences | |
| questions = re.findall(pattern, text) | |
| # Assuming you want the first question if there are multiple | |
| question = questions[0] if questions else None | |
| return question | |
| def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]: | |
| """Get the current answer for a question.""" | |
| row = fpmms[fpmms['title'] == text] | |
| if row.shape[0] == 0: | |
| return None | |
| return row['currentAnswer'].values[0] | |
| def block_number_to_timestamp(block_number: int, web3: Web3) -> str: | |
| """Convert a block number to a timestamp.""" | |
| block = web3.eth.get_block(block_number) | |
| timestamp = datetime.utcfromtimestamp(block['timestamp']) | |
| return timestamp.strftime('%Y-%m-%d %H:%M:%S') | |
| def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list: | |
| """Parallelize the timestamp conversion.""" | |
| block_numbers = df['request_block'].tolist() | |
| with ThreadPoolExecutor(max_workers=10) as executor: | |
| results = list(tqdm(executor.map(function, block_numbers), total=len(block_numbers))) | |
| return results | |
| def weekly_analysis(): | |
| """Run weekly analysis for the FPMMS project.""" | |
| rpc = "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a" | |
| web3 = Web3(Web3.HTTPProvider(rpc)) | |
| # Run markets ETL | |
| logging.info("Running markets ETL") | |
| mkt_etl(MARKETS_FILENAME) | |
| logging.info("Markets ETL completed") | |
| # Run tools ETL | |
| logging.info("Running tools ETL") | |
| tools_etl( | |
| rpcs=[rpc], | |
| filename=TOOLS_FILENAME, | |
| full_contents=True, | |
| ) | |
| logging.info("Tools ETL completed") | |
| # Run profitability analysis | |
| logging.info("Running profitability analysis") | |
| if os.path.exists(DATA_DIR / "fpmmTrades.csv"): | |
| os.remove(DATA_DIR / "fpmmTrades.csv") | |
| run_profitability_analysis( | |
| rpc=rpc, | |
| ) | |
| logging.info("Profitability analysis completed") | |
| # Get currentAnswer from FPMMS | |
| fpmms = pd.read_csv(DATA_DIR / MARKETS_FILENAME) | |
| tools = pd.read_csv(DATA_DIR / TOOLS_FILENAME) | |
| # Get the question from the tools | |
| logging.info("Getting the question and current answer for the tools") | |
| tools['title'] = tools['prompt_request'].apply(lambda x: get_question(x)) | |
| tools['currentAnswer'] = tools['title'].apply(lambda x: current_answer(x, fpmms)) | |
| tools['currentAnswer'] = tools['currentAnswer'].str.replace('yes', 'Yes') | |
| tools['currentAnswer'] = tools['currentAnswer'].str.replace('no', 'No') | |
| # Convert block number to timestamp | |
| logging.info("Converting block number to timestamp") | |
| t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb")) | |
| tools['request_time'] = tools['request_block'].map(t_map) | |
| # tools with missing request_time | |
| tools_missing_time = tools[tools['request_time'].isna()] | |
| partial_block_number_to_timestamp = partial(block_number_to_timestamp, web3=web3) | |
| missing_timestamps = parallelize_timestamp_conversion(tools_missing_time, partial_block_number_to_timestamp) | |
| tools_missing_time['request_time'] = missing_timestamps | |
| # merge tools with missing request_time | |
| tools = pd.concat([tools, tools_missing_time]) | |
| tools['request_month_year'] = pd.to_datetime(tools['request_time']).dt.strftime('%Y-%m') | |
| tools['request_month_year_week'] = pd.to_datetime(tools['request_time']).dt.to_period('W').astype(str) | |
| # Save the tools | |
| tools.to_csv(DATA_DIR / TOOLS_FILENAME, index=False) | |
| # Update t_map | |
| t_map.update(tools[['request_block', 'request_time']].set_index('request_block').to_dict()['request_time']) | |
| with open(DATA_DIR / "t_map.pkl", "wb") as f: | |
| pickle.dump(t_map, f) | |
| # clean and release all memory | |
| del tools | |
| del fpmms | |
| del t_map | |
| gc.collect() | |
| logging.info("Weekly analysis files generated and saved") | |
| if __name__ == "__main__": | |
| weekly_analysis() | |