| package handlers |
|
|
| import ( |
| "log" |
| "net" |
| "sync" |
| "time" |
|
|
| "github.com/play-with-docker/play-with-docker/event" |
| "github.com/play-with-docker/play-with-docker/pwd/types" |
|
|
| "golang.org/x/text/encoding" |
| ) |
|
|
| type terminal struct { |
| conn net.Conn |
| write chan []byte |
| instance *types.Instance |
| } |
|
|
| func (t *terminal) Go(ch chan info, ech chan *types.Instance) { |
| go func() { |
| for d := range t.write { |
| _, err := t.conn.Write(d) |
| if err != nil { |
| ech <- t.instance |
| return |
| } |
| } |
| }() |
| go func() { |
| encoder := encoding.Replacement.NewEncoder() |
| buf := make([]byte, 1024) |
| for { |
| n, err := t.conn.Read(buf) |
| if err != nil { |
| ech <- t.instance |
| return |
| } |
| b, err := encoder.Bytes(buf[:n]) |
| if err != nil { |
| ech <- t.instance |
| return |
| } |
| ch <- info{name: t.instance.Name, data: b} |
| } |
| }() |
| } |
|
|
| type info struct { |
| name string |
| data []byte |
| } |
|
|
| type state struct { |
| name string |
| status string |
| } |
|
|
| type manager struct { |
| session *types.Session |
| sendCh chan info |
| receiveCh chan info |
| stateCh chan state |
| terminals map[string]*terminal |
| errorCh chan *types.Instance |
| instances map[string]*types.Instance |
| sync.Mutex |
| } |
|
|
| func (m *manager) Send(name string, data []byte) { |
| m.sendCh <- info{name: name, data: data} |
| } |
| func (m *manager) Receive(cb func(name string, data []byte)) { |
| for i := range m.receiveCh { |
| cb(i.name, i.data) |
| } |
| } |
| func (m *manager) Status(cb func(name, status string)) { |
| for s := range m.stateCh { |
| cb(s.name, s.status) |
| } |
| } |
|
|
| func (m *manager) connect(instance *types.Instance) error { |
| if !m.trackingInstance(instance) { |
| return nil |
| } |
|
|
| return m.connectTerminal(instance) |
| } |
|
|
| func (m *manager) connectTerminal(instance *types.Instance) error { |
| m.Lock() |
| defer m.Unlock() |
|
|
| conn, err := core.InstanceGetTerminal(instance) |
| if err != nil { |
| return err |
| } |
| chw := make(chan []byte, 10) |
| t := terminal{conn: conn, write: chw, instance: instance} |
| m.terminals[instance.Name] = &t |
| t.Go(m.receiveCh, m.errorCh) |
| m.stateCh <- state{name: instance.Name, status: "connect"} |
|
|
| return nil |
| } |
|
|
| func (m *manager) disconnectTerminal(instance *types.Instance) { |
| m.Lock() |
| defer m.Unlock() |
|
|
| t := m.terminals[instance.Name] |
| if t != nil { |
| if t.write != nil { |
| close(t.write) |
| } |
| if t.conn != nil { |
| t.conn.Close() |
| } |
| delete(m.terminals, instance.Name) |
| } |
| } |
|
|
| func (m *manager) getTerminal(instanceName string) *terminal { |
| return m.terminals[instanceName] |
| } |
|
|
| func (m *manager) trackInstance(instance *types.Instance) { |
| m.Lock() |
| defer m.Unlock() |
|
|
| m.instances[instance.Name] = instance |
|
|
| } |
| func (m *manager) untrackInstance(instance *types.Instance) { |
| m.Lock() |
| defer m.Unlock() |
|
|
| delete(m.instances, instance.Name) |
| } |
| func (m *manager) trackingInstance(instance *types.Instance) bool { |
| m.Lock() |
| defer m.Unlock() |
| _, found := m.instances[instance.Name] |
|
|
| return found |
| } |
|
|
| func (m *manager) disconnect(instance *types.Instance) { |
| if !m.trackingInstance(instance) { |
| return |
| } |
|
|
| m.disconnectTerminal(instance) |
| m.untrackInstance(instance) |
| } |
|
|
| func (m *manager) process() { |
| for { |
| select { |
| case i := <-m.sendCh: |
| t := m.getTerminal(i.name) |
| if t != nil { |
| t.write <- i.data |
| } |
| case instance := <-m.errorCh: |
| |
| i := core.InstanceGet(&types.Session{Id: instance.SessionId}, instance.Name) |
| if i == nil { |
| log.Println("Instance doesn't exist anymore. Won't reconnect") |
| continue |
| } |
| m.stateCh <- state{name: instance.Name, status: "reconnect"} |
| time.AfterFunc(time.Second, func() { |
| m.connect(instance) |
| }) |
| } |
| } |
| } |
| func (m *manager) Close() { |
| for _, i := range m.instances { |
| m.disconnect(i) |
| } |
| } |
|
|
| func (m *manager) Start() error { |
| instances, err := core.InstanceFindBySession(m.session) |
| if err != nil { |
| return err |
| } |
| for _, i := range instances { |
| m.instances[i.Name] = i |
| m.connect(i) |
| } |
| go m.process() |
| return nil |
| } |
|
|
| func NewManager(s *types.Session) (*manager, error) { |
| m := &manager{ |
| session: s, |
| sendCh: make(chan info, 10), |
| receiveCh: make(chan info, 10), |
| stateCh: make(chan state, 10), |
| terminals: make(map[string]*terminal), |
| errorCh: make(chan *types.Instance, 10), |
| instances: make(map[string]*types.Instance), |
| } |
|
|
| e.On(event.INSTANCE_NEW, func(sessionId string, args ...interface{}) { |
| if sessionId != s.Id { |
| return |
| } |
|
|
| |
| instanceName := args[0].(string) |
| instance := core.InstanceGet(s, instanceName) |
| if instance == nil { |
| log.Printf("Instance [%s] was not found in session [%s]\n", instanceName, sessionId) |
| return |
| } |
| m.trackInstance(instance) |
| m.connect(instance) |
| }) |
|
|
| e.On(event.INSTANCE_DELETE, func(sessionId string, args ...interface{}) { |
| if sessionId != s.Id { |
| return |
| } |
|
|
| |
| instanceName := args[0].(string) |
| instance := &types.Instance{Name: instanceName} |
| m.disconnect(instance) |
| }) |
|
|
| return m, nil |
| } |
|
|