change the teardown process

This commit is contained in:
Zhi Wang 2022-01-10 09:35:05 -05:00
parent 598e01c1e9
commit 38f8f279a0
2 changed files with 57 additions and 44 deletions

27
reg.go
View File

@ -11,33 +11,32 @@ import (
// a simple registry for actors and their channels. It is possible to
// design this using channels, but it is simple enough with mutex
type Registry struct {
mtx sync.Mutex
doers map[string]*TermConn
mtx sync.Mutex
players map[string]*TermConn
}
var registry Registry
func (reg *Registry) init() {
reg.doers = make(map[string]*TermConn)
reg.players = make(map[string]*TermConn)
}
func (d *Registry) addDoer(name string, tc *TermConn) {
func (d *Registry) addPlayer(name string, tc *TermConn) {
d.mtx.Lock()
if val, ok := d.doers[name]; ok {
log.Println(name, "already exist in the dispatcher", val, tc)
delete(d.doers, name)
val.release(false) // do not unregister in release, otherwise it is a deadlock
if _, ok := d.players[name]; ok {
log.Println(name, "already exist in the dispatcher, skip registration")
} else {
d.players[name] = tc
}
d.doers[name] = tc
d.mtx.Unlock()
}
func (d *Registry) delDoer(name string) error {
func (d *Registry) removePlayer(name string) error {
d.mtx.Lock()
var err error = errors.New("not found")
if _, ok := d.doers[name]; ok {
delete(d.doers, name)
if _, ok := d.players[name]; ok {
delete(d.players, name)
err = nil
}
@ -46,9 +45,9 @@ func (d *Registry) delDoer(name string) error {
}
// we do not want to return the channel to viewer so it won't be used out of the critical section
func (d *Registry) sendToDoer(name string, ws *websocket.Conn) bool {
func (d *Registry) sendToPlayer(name string, ws *websocket.Conn) bool {
d.mtx.Lock()
tc, ok := d.doers[name]
tc, ok := d.players[name]
if ok {
tc.vchan <- ws

View File

@ -15,10 +15,12 @@ import (
const (
// Time allowed to write a message to the peer.
writeWait = 5 * time.Second
readWait = 10 * time.Second
writeWait = 10 * time.Second
viewWait = 3 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 30 * time.Second
pongWait = 10 * time.Second
// Maximum message size allowed from peer.
maxMessageSize = 8192
@ -102,10 +104,11 @@ func (tc *TermConn) ping() {
if err != nil {
log.Println("Failed to write ping message:", err)
return
}
case <-tc.done:
log.Println("Exit ping routine as stdout is going away")
log.Println("Exit ping routine as pty/ws is going away")
return
}
}
@ -147,7 +150,9 @@ func (tc *TermConn) wsToPtyStdin() {
func (tc *TermConn) ptyStdoutToWs() {
var viewers []*websocket.Conn
readBuf := make([]byte, 4096)
closed := false
out:
for {
n, err := tc.ptmx.Read(readBuf)
@ -158,17 +163,22 @@ func (tc *TermConn) ptyStdoutToWs() {
// handle viewers, we want to use non-blocking receive
select {
case watcher := <-tc.vchan:
log.Println("Received viewer", watcher)
viewers = append(viewers, watcher)
default:
//log.Println("no viewer received")
case viewer := <-tc.vchan:
log.Println("Received viewer")
viewers = append(viewers, viewer)
case <-tc.done:
log.Println("Websocket is closed by main goroutine")
closed = true
break out
default: // do not block on these two channels
}
// We could add ws to viewers as well (then we can use io.MultiWriter),
// but we want to handle errors differently
tc.ws.SetWriteDeadline(time.Now().Add(writeWait))
if tc.ws.WriteMessage(websocket.BinaryMessage, readBuf[0:n]); err != nil {
if err = tc.ws.WriteMessage(websocket.BinaryMessage, readBuf[0:n]); err != nil {
log.Println("Failed to write message: ", err)
break
}
@ -179,11 +189,12 @@ func (tc *TermConn) ptyStdoutToWs() {
}
// if the viewer exits, we will just ignore the error
w.SetWriteDeadline(time.Now().Add(viewWait))
if err = w.WriteMessage(websocket.BinaryMessage, readBuf[0:n]); err != nil {
log.Println("Failed to write message to watcher: ", err)
log.Println("Failed to write message to viewer: ", err)
viewers[i] = nil
w.Close()
w.Close() // we own the socket and need to close it
}
}
}
@ -195,19 +206,19 @@ func (tc *TermConn) ptyStdoutToWs() {
}
}
close(tc.done)
tc.ws.SetWriteDeadline(time.Now().Add(writeWait))
tc.ws.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Pty closed"))
time.Sleep(closeGracePeriod)
if !closed { // If the error is caused by pty, try to close the socket
tc.ws.SetWriteDeadline(time.Now().Add(writeWait))
tc.ws.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Pty closed"))
time.Sleep(closeGracePeriod)
}
}
func (tc *TermConn) release(unreg bool) {
log.Println("releasing", tc.name)
// this function should be executed by the main goroutine for the connection
func (tc *TermConn) release() {
log.Println("Releasing terminal connection", tc.name)
if unreg {
registry.delDoer(tc.name)
}
registry.removePlayer(tc.name)
if tc.ptmx != nil {
// cleanup the pty and its related process
@ -236,14 +247,14 @@ func (tc *TermConn) release(unreg bool) {
}
close(tc.vchan)
close(tc.done)
}
tc.ws.Close()
}
// handle websockets
func wsHandleDoer(w http.ResponseWriter, r *http.Request) {
func wsHandlePlayer(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
@ -256,7 +267,7 @@ func wsHandleDoer(w http.ResponseWriter, r *http.Request) {
name: "main",
}
defer tc.release(true)
defer tc.release()
log.Println("\n\nCreated the websocket")
if err := tc.createPty(cmdToExec); err != nil {
@ -267,13 +278,16 @@ func wsHandleDoer(w http.ResponseWriter, r *http.Request) {
tc.done = make(chan struct{})
tc.vchan = make(chan *websocket.Conn)
registry.addDoer("main", &tc)
registry.addPlayer("main", &tc)
// main event loop to shovel data between ws and pty
// do not call ptyStdoutToWs in this goroutine, otherwise
// the websocket will not close. This is because ptyStdoutToWs
// is usually blocked in the pty.Read
go tc.ping()
go tc.wsToPtyStdin()
go tc.ptyStdoutToWs()
tc.ptyStdoutToWs()
tc.wsToPtyStdin()
}
// handle websockets
@ -286,15 +300,15 @@ func wsHandleViewer(w http.ResponseWriter, r *http.Request) {
}
log.Println("\n\nCreated the websocket")
if !registry.sendToDoer("main", ws) {
log.Println("Failed to send websocket to doer, close it")
if !registry.sendToPlayer("main", ws) {
log.Println("Failed to send websocket to player, close it")
ws.Close()
}
}
func wsHandler(w http.ResponseWriter, r *http.Request, isViewer bool) {
if !isViewer {
wsHandleDoer(w, r)
wsHandlePlayer(w, r)
} else {
wsHandleViewer(w, r)
}