package hub import ( "encoding/json" "time" "github.com/gorilla/websocket" ) // Peer represents an individual peer / connection into a room. type Peer struct { // Peer's chat handle. ID string Handle string ws *websocket.Conn // Channel for outbound messages. dataQ chan []byte // Peer's room. room *Room // Rate limiting. numMessages int lastMessage time.Time } type peerInfo struct { ID string `json:"id"` Handle string `json:"handle"` } // newPeer returns a new instance of Peer. func newPeer(id, handle string, ws *websocket.Conn, room *Room) *Peer { return &Peer{ ID: id, Handle: handle, ws: ws, dataQ: make(chan []byte, 100), room: room, } } // RunListener is a blocking function that reads incoming messages from a peer's // WS connection until its dropped or there's an error. This should be invoked // as a goroutine. func (p *Peer) RunListener() { p.ws.SetReadLimit(int64(p.room.hub.cfg.MaxMessageLen)) for { _, m, err := p.ws.ReadMessage() if err != nil { break } p.processMessage(m) } // WS connection is closed. p.ws.Close() p.room.queuePeerReq(TypePeerLeave, p) } // RunWriter is a blocking function that writes messages in a peer's queue to the // peer's WS connection. This should be invoked as a goroutine. func (p *Peer) RunWriter() { defer p.ws.Close() for { select { // Wait for outgoing message to appear in the channel. case message, ok := <-p.dataQ: if !ok { p.writeWSData(websocket.CloseMessage, []byte{}) return } if err := p.writeWSData(websocket.TextMessage, message); err != nil { return } } } } // SendData queues a message to be written to the peer's WS. func (p *Peer) SendData(b []byte) { p.dataQ <- b } // writeWSData writes the given payload to the peer's WS connection. func (p *Peer) writeWSData(msgType int, payload []byte) error { p.ws.SetWriteDeadline(time.Now().Add(p.room.hub.cfg.WSTimeout)) return p.ws.WriteMessage(msgType, payload) } // writeWSControl writes the given control payload to the peer's WS connection. func (p *Peer) writeWSControl(control int, payload []byte) error { return p.ws.WriteControl(websocket.CloseMessage, payload, time.Time{}) } // processMessage processes incoming messages from peers. func (p *Peer) processMessage(b []byte) { var m payloadMsgWrap if err := json.Unmarshal(b, &m); err != nil { // TODO: Respond return } switch m.Type { // Message to the room. case TypeMessage: // Check rate limits and update counters. now := time.Now() if p.numMessages > 0 { if (p.numMessages%p.room.hub.cfg.RateLimitMessages+1) >= p.room.hub.cfg.RateLimitMessages && time.Since(p.lastMessage) < p.room.hub.cfg.RateLimitInterval { p.room.hub.Store.RemoveSession(p.ID, p.room.ID) p.writeWSControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, TypePeerRateLimited)) p.ws.Close() return } } p.lastMessage = now p.numMessages++ msg, ok := m.Data.(string) if !ok { // TODO: Respond return } p.room.Broadcast(p.room.makeMessagePayload(msg, p), true) // "Typing" status. case TypeTyping: p.room.Broadcast(p.room.makePeerUpdatePayload(p, TypeTyping), false) // Request for peers list case TypePeerList: p.room.sendPeerList(p) // Dipose of a room. case TypeRoomDispose: p.room.Dispose() default: } }