Spaces:
Runtime error
Runtime error
| import re | |
| from typing import List, Dict | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.callbacks import get_openai_callback | |
| from pydantic import BaseModel | |
| from collections import defaultdict | |
| from langchain.schema import ( | |
| AIMessage, | |
| HumanMessage, | |
| SystemMessage | |
| ) | |
| import random | |
| import inflect | |
| from .bidder_base import Bidder | |
| from .human_bidder import HumanBidder | |
| from .item_base import Item | |
| from .prompt_base import PARSE_BID_INSTRUCTION | |
| p = inflect.engine() | |
| class Auctioneer(BaseModel): | |
| enable_discount: bool = False | |
| items: List[Item] = [] | |
| cur_item: Item = None | |
| highest_bidder: Bidder = None | |
| highest_bid: int = -1 | |
| bidding_history = defaultdict(list) # history about the bidding war of one item | |
| items_queue: List[Item] = [] # updates when a item is taken. | |
| auction_logs = defaultdict(list) # history about the bidding war of all items | |
| openai_cost = 0 | |
| prev_round_max_bid: int = -1 | |
| min_bid: int = 0 | |
| fail_to_sell = False | |
| min_markup_pct = 0.1 | |
| class Config: | |
| arbitrary_types_allowed = True | |
| def init_items(self, items: List[Item]): | |
| for item in items: | |
| # reset discounted price | |
| item.reset_price() | |
| self.items = items | |
| self.items_queue = items.copy() | |
| def summarize_items_info(self): | |
| desc = '' | |
| for item in self.items: | |
| desc += f"- {item.get_desc()}\n" | |
| return desc.strip() | |
| def present_item(self): | |
| cur_item = self.items_queue.pop(0) | |
| self.cur_item = cur_item | |
| return cur_item | |
| def shuffle_items(self): | |
| random.shuffle(self.items) | |
| self.items_queue = self.items.copy() | |
| def record_bid(self, bid_info: dict, bid_round: int): | |
| ''' | |
| Save the bidding history for each round, log the highest bidder and highest bidding | |
| ''' | |
| # bid_info: {'bidder': xxx, 'bid': xxx, 'raw_msg': xxx} | |
| self.bidding_history[bid_round].append(bid_info) | |
| for hist in self.bidding_history[bid_round]: | |
| if hist['bid'] > 0: | |
| if self.highest_bid < hist['bid']: | |
| self.highest_bid = hist['bid'] | |
| self.highest_bidder = hist['bidder'] | |
| elif self.highest_bid == hist['bid']: | |
| # random if there's a tie | |
| self.highest_bidder = random.choice([self.highest_bidder, hist['bidder']]) | |
| self.auction_logs[f"{self.cur_item.get_desc()}"].append( | |
| {'bidder': bid_info['bidder'], | |
| 'bid': bid_info['bid'], | |
| 'bid_round': bid_round}) | |
| def _biddings_to_string(self, bid_round: int): | |
| ''' | |
| Return a string that summarizes the bidding history in a round | |
| ''' | |
| # bid_hist_text = '' if bid_round == 0 else f'- {self.highest_bidder}: ${self.highest_bid}\n' | |
| bid_hist_text = '' | |
| for js in self.bidding_history[bid_round]: | |
| if js['bid'] < 0: | |
| bid_hist_text += f"- {js['bidder']} withdrew\n" | |
| else: | |
| bid_hist_text += f"- {js['bidder']}: ${js['bid']}\n" | |
| return bid_hist_text.strip() | |
| def all_bidding_history_to_string(self): | |
| bid_hist_text = '' | |
| for bid_round in self.bidding_history: | |
| bid_hist_text += f"Round {bid_round}:\n{self._biddings_to_string(bid_round)}\n\n" | |
| return bid_hist_text.strip() | |
| def ask_for_bid(self, bid_round: int): | |
| ''' | |
| Ask for bid, return the message to be sent to bidders | |
| ''' | |
| if self.highest_bidder is None: | |
| if bid_round > 0: | |
| msg = f"Seeing as we've had no takers at the initial price, we're going to lower the starting bid to ${self.cur_item.price} for {self.cur_item.name} to spark some interest! Do I have any takers?" | |
| else: | |
| remaining_items = [self.cur_item.name] + [item.name for item in self.items_queue] | |
| msg = f"Attention, bidders! {len(remaining_items)} item(s) left, they are: {', '.join(remaining_items)}.\n\nNow, please bid on {self.cur_item}. The starting price for bidding for {self.cur_item} is ${self.cur_item.price}. Anyone interested in this item?" | |
| else: | |
| bidding_history = self._biddings_to_string(bid_round - 1) | |
| msg = f"Thank you! This is the {p.ordinal(bid_round)} round of bidding for this item:\n{bidding_history}\n\nNow we have ${self.highest_bid} from {self.highest_bidder.name} for {self.cur_item.name}. The minimum increase over this highest bid is ${int(self.cur_item.price * self.min_markup_pct)}. Do I have any advance on ${self.highest_bid}?" | |
| return msg | |
| def ask_for_rebid(self, fail_msg: str, bid_price: int): | |
| return f"Your bid of ${bid_price} failed, because {fail_msg}: You must reconsider your bid." | |
| def get_hammer_msg(self): | |
| if self.highest_bidder is None: | |
| return f"Since no one bid on {self.cur_item.name}, we'll move on to the next item." | |
| else: | |
| return f"Sold! {self.cur_item} to {self.highest_bidder} at ${self.highest_bid}! The true value for {self.cur_item} is ${self.cur_item.true_value}."# Thus {self.highest_bidder}'s profit by winning this item is ${self.cur_item.true_value - self.highest_bid}." | |
| def check_hammer(self, bid_round: int): | |
| # check if the item is sold | |
| self.fail_to_sell = False | |
| num_bid = self._num_bids_in_round(bid_round) | |
| # highest_bidder has already been updated in record_bid(). | |
| # so when num_bid == 0 & highest_bidder is None, it means no one bid on this item | |
| if self.highest_bidder is None: | |
| if num_bid == 0: | |
| # failed to sell, as there is no highest bidder | |
| self.fail_to_sell = True | |
| if self.enable_discount and bid_round < 3: | |
| # lower the starting price by 50%. discoutn only applies to the first 3 rounds | |
| self.cur_item.lower_price(0.5) | |
| is_sold = False | |
| else: | |
| is_sold = True | |
| else: | |
| # won't happen | |
| raise ValueError(f"highest_bidder is None but num_bid is {num_bid}") | |
| else: | |
| if self.prev_round_max_bid < 0 and num_bid == 1: | |
| # only one bidder in the first round | |
| is_sold = True | |
| else: | |
| self.prev_round_max_bid = self.highest_bid | |
| is_sold = self._num_bids_in_round(bid_round) == 0 | |
| return is_sold | |
| def _num_bids_in_round(self, bid_round: int): | |
| # check if there is no bid in the current round | |
| cnt = 0 | |
| for hist in self.bidding_history[bid_round]: | |
| if hist['bid'] > 0: | |
| cnt += 1 | |
| return cnt | |
| def hammer_fall(self): | |
| print(f'* Sold! {self.cur_item} (${self.cur_item.true_value}) goes to {self.highest_bidder} at ${self.highest_bid}.') | |
| self.auction_logs[f"{self.cur_item.get_desc()}"].append({ | |
| 'bidder': self.highest_bidder, | |
| 'bid': f"{self.highest_bid} (${self.cur_item.true_value})", # no need for the first $, as it will be added in the self.log() | |
| 'bid_round': 'Hammer price (true value)'}) | |
| self.cur_item = None | |
| self.highest_bidder = None | |
| self.highest_bid = -1 | |
| self.bidding_history = defaultdict(list) | |
| self.prev_round_max_bid = -1 | |
| self.fail_to_sell = False | |
| def end_auction(self): | |
| return len(self.items_queue) == 0 | |
| def gather_all_status(self, bidders: List[Bidder]): | |
| status = {} | |
| for bidder in bidders: | |
| status[bidder.name] = { | |
| 'profit': bidder.profit, | |
| 'items_won': bidder.items_won | |
| } | |
| return status | |
| def parse_bid(self, text: str): | |
| prompt = PARSE_BID_INSTRUCTION.format(response=text) | |
| with get_openai_callback() as cb: | |
| llm = ChatOpenAI(model='gpt-3.5-turbo-0613', temperature=0) | |
| result = llm([HumanMessage(content=prompt)]).content | |
| self.openai_cost += cb.total_cost | |
| bid_number = re.findall(r'\$?\d+', result.replace(',', '')) | |
| # find number in the result | |
| if '-1' in result: | |
| return -1 | |
| elif len(bid_number) > 0: | |
| return int(bid_number[-1].replace('$', '')) | |
| else: | |
| print('* Rebid:', text) | |
| return None | |
| def log(self, bidder_personal_reports: list = [], show_model_name=True): | |
| ''' example | |
| Apparatus H, starting at $1000. | |
| 1st bid: | |
| Bidder 1 (gpt-3.5-turbo-16k-0613): $1200 | |
| Bidder 2 (gpt-3.5-turbo-16k-0613): $1100 | |
| Bidder 3 (gpt-3.5-turbo-16k-0613): Withdrawn | |
| Bidder 4 (gpt-3.5-turbo-16k-0613): $1200 | |
| 2nd bid: | |
| Bidder 1 (gpt-3.5-turbo-16k-0613): Withdrawn | |
| Bidder 2 (gpt-3.5-turbo-16k-0613): Withdrawn | |
| Hammer price: | |
| Bidder 4 (gpt-3.5-turbo-16k-0613): $1200 | |
| ''' | |
| markdown_output = "## Auction Log\n\n" | |
| for i, (item, bids) in enumerate(self.auction_logs.items()): | |
| markdown_output += f"### {i+1}. {item}\n\n" | |
| cur_bid_round = -1 | |
| for i, bid in enumerate(bids): | |
| if bid['bid_round'] != cur_bid_round: | |
| cur_bid_round = bid['bid_round'] | |
| if isinstance(bid['bid_round'], int): | |
| markdown_output += f"\n#### {p.ordinal(bid['bid_round']+1)} bid:\n\n" | |
| else: | |
| markdown_output += f"\n#### {bid['bid_round']}:\n\n" | |
| bid_price = f"${bid['bid']}" if bid['bid'] != -1 else 'Withdrew' | |
| if isinstance(bid['bidder'], Bidder) or isinstance(bid['bidder'], HumanBidder): | |
| if show_model_name: | |
| markdown_output += f"* {bid['bidder']} ({bid['bidder'].model_name}): {bid_price}\n" | |
| else: | |
| markdown_output += f"* {bid['bidder']}: {bid_price}\n" | |
| else: | |
| markdown_output += f"* None bid\n" | |
| markdown_output += "\n" | |
| if len(bidder_personal_reports) != 0: | |
| markdown_output += f"\n## Personal Report" | |
| for report in bidder_personal_reports: | |
| markdown_output += f"\n\n{report}" | |
| return markdown_output.strip() | |
| def finish_auction(self): | |
| self.auction_logs = defaultdict(list) | |
| self.cur_item = None | |
| self.highest_bidder = None | |
| self.highest_bid = -1 | |
| self.bidding_history = defaultdict(list) | |
| self.items_queue = [] | |
| self.items = [] | |
| self.prev_round_max_bid = -1 | |
| self.fail_to_sell = False | |
| self.min_bid = 0 | |