witty/term_conn/relay.go

368 lines
8.7 KiB
Go
Raw Normal View History

2022-01-09 14:56:43 +01:00
//This file contains code to relay traffic between websocket and pty
2022-01-10 23:16:22 +01:00
package term_conn
2022-01-09 14:56:43 +01:00
import (
"log"
"net/http"
"os"
"os/exec"
"sync"
2022-01-09 14:56:43 +01:00
"time"
"github.com/creack/pty"
2022-01-12 03:18:19 +01:00
"github.com/dchest/uniuri"
2022-01-09 14:56:43 +01:00
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
2022-01-10 15:35:05 +01:00
readWait = 10 * time.Second
writeWait = 10 * time.Second
viewWait = 3 * time.Second
2022-01-09 14:56:43 +01:00
// Time allowed to read the next pong message from the peer.
2022-01-10 15:35:05 +01:00
pongWait = 10 * time.Second
2022-01-09 14:56:43 +01:00
// Maximum message size allowed from peer.
maxMessageSize = 4096
readBufferSize = 1024
WriteBufferSize = 1024
2022-01-09 14:56:43 +01:00
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
var upgrader = websocket.Upgrader{
ReadBufferSize: readBufferSize,
WriteBufferSize: WriteBufferSize,
CheckOrigin: func(r *http.Request) bool {
2022-01-10 23:16:22 +01:00
return true
},
}
2022-01-10 04:04:23 +01:00
// TermConn represents the connected websocket and pty.
// if isViewer is true
type TermConn struct {
2022-01-12 03:18:19 +01:00
Name string
Ip string
2022-01-10 04:04:23 +01:00
2022-01-12 03:18:19 +01:00
ws *websocket.Conn
ptmx *os.File // the pty that runs the command
cmd *exec.Cmd // represents the process, we need it to terminate the process
vchan chan *websocket.Conn // channel to receive viewers
ws_done chan struct{} // ws is closed, only close this chan in ws reader
pty_done chan struct{} // pty is closed, close this chan in pty reader
2022-01-10 04:04:23 +01:00
}
func (tc *TermConn) createPty(cmdline []string) error {
2022-01-09 14:56:43 +01:00
// Create a shell command.
cmd := exec.Command(cmdline[0], cmdline[1:]...)
// Start the command with a pty.
ptmx, err := pty.Start(cmd)
if err != nil {
2022-01-10 04:04:23 +01:00
return err
2022-01-09 14:56:43 +01:00
}
// Use fixed size, the xterm is initalized as 122x37,
// But we set pty to 120x36. Using fullsize will lead
2022-01-10 04:04:23 +01:00
// some program to misbehave.
2022-01-09 14:56:43 +01:00
pty.Setsize(ptmx, &pty.Winsize{
Cols: 120,
Rows: 36,
})
2022-01-10 04:04:23 +01:00
tc.ptmx = ptmx
tc.cmd = cmd
2022-01-09 14:56:43 +01:00
log.Printf("Create shell process %v (%v)", cmdline, cmd.Process.Pid)
2022-01-10 04:04:23 +01:00
return nil
2022-01-09 14:56:43 +01:00
}
// Periodically send ping message to detect the status of the ws
func (tc *TermConn) ping(wg *sync.WaitGroup) {
defer wg.Done()
2022-01-09 14:56:43 +01:00
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
out:
2022-01-09 14:56:43 +01:00
for {
select {
case <-ticker.C:
2022-01-10 04:04:23 +01:00
err := tc.ws.WriteControl(websocket.PingMessage,
2022-01-09 14:56:43 +01:00
[]byte{}, time.Now().Add(writeWait))
if err != nil {
log.Println("Failed to write ping message:", err)
break out
2022-01-09 14:56:43 +01:00
}
case <-tc.pty_done:
log.Println("Exit ping routine as pty is going away")
break out
2022-01-09 14:56:43 +01:00
case <-tc.ws_done:
log.Println("Exit ping routine as ws is going away")
break out
2022-01-09 14:56:43 +01:00
}
}
log.Println("Ping routine exited")
2022-01-09 14:56:43 +01:00
}
// shovel data from websocket to pty stdin
func (tc *TermConn) wsToPtyStdin(wg *sync.WaitGroup) {
defer wg.Done()
2022-01-10 04:04:23 +01:00
tc.ws.SetReadLimit(maxMessageSize)
2022-01-09 14:56:43 +01:00
// set the readdeadline. The idea here is simple,
// as long as we keep receiving pong message,
// the readdeadline will keep updating. Otherwise
// read will timeout.
2022-01-10 04:04:23 +01:00
tc.ws.SetReadDeadline(time.Now().Add(pongWait))
tc.ws.SetPongHandler(func(string) error {
tc.ws.SetReadDeadline(time.Now().Add(pongWait))
2022-01-09 14:56:43 +01:00
return nil
})
bufChan := make(chan []byte)
2022-01-09 14:56:43 +01:00
go func() { //create a goroutine to read from ws
for {
_, buf, err := tc.ws.ReadMessage()
2022-01-09 14:56:43 +01:00
if err != nil {
log.Println("Failed to receive data from ws:", err)
close(bufChan) // close chan by producer
close(tc.ws_done)
break
}
2022-01-09 14:56:43 +01:00
bufChan <- buf
}
}()
// we do not need to forward user input to viewers, only the stdout
out:
for {
select {
case buf, ok := <-bufChan:
if !ok {
log.Println("Exit wsToPtyStdin routine pty stdin error")
break out
}
_, err := tc.ptmx.Write(buf)
if err != nil {
log.Println("Failed to send data to pty stdin: ", err)
break out
}
case <-tc.ws_done:
log.Println("Exit wsToPtyStdin routine as ws is going away")
break out
case <-tc.pty_done:
log.Println("Exit wsToPtyStdin routine as pty is going away")
break out
2022-01-09 14:56:43 +01:00
}
}
log.Println("wsToPtyStdin routine exited")
2022-01-09 14:56:43 +01:00
}
// shovel data from pty Stdout to WS
func (tc *TermConn) ptyStdoutToWs(wg *sync.WaitGroup) {
2022-01-10 04:04:23 +01:00
var viewers []*websocket.Conn
2022-01-09 14:56:43 +01:00
defer wg.Done()
bufChan := make(chan []byte)
2022-01-09 14:56:43 +01:00
go func() { //create a goroutine to read from pty
for {
readBuf := make([]byte, 1024) //pty reads in 1024 blocks
n, err := tc.ptmx.Read(readBuf)
if err != nil {
log.Println("Failed to read from pty stdout: ", err)
close(bufChan)
close(tc.pty_done)
break
}
readBuf = readBuf[:n] // slice the buffer so that it is exact the size of data read.
bufChan <- readBuf
2022-01-09 14:56:43 +01:00
}
}()
2022-01-09 14:56:43 +01:00
out:
for {
2022-01-10 04:04:23 +01:00
// handle viewers, we want to use non-blocking receive
2022-01-09 14:56:43 +01:00
select {
case buf, ok := <-bufChan:
if !ok {
tc.ws.SetWriteDeadline(time.Now().Add(writeWait))
tc.ws.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Pty closed"))
2022-01-10 15:35:05 +01:00
break out
}
// We could add ws to viewers as well (then we can use io.MultiWriter),
// but we want to handle errors differently
tc.ws.SetWriteDeadline(time.Now().Add(writeWait))
if err := tc.ws.WriteMessage(websocket.BinaryMessage, buf); err != nil {
log.Println("Failed to write message: ", err)
break out
}
2022-01-10 15:35:05 +01:00
//write to the viewer
for i, w := range viewers {
if w == nil {
continue
}
2022-01-09 14:56:43 +01:00
// if the viewer exits, we will just ignore the error
w.SetWriteDeadline(time.Now().Add(viewWait))
if err := w.WriteMessage(websocket.BinaryMessage, buf); err != nil {
log.Println("Failed to write message to viewer: ", err)
2022-01-09 14:56:43 +01:00
viewers[i] = nil
w.Close() // we own the socket and need to close it
}
2022-01-09 14:56:43 +01:00
}
case viewer := <-tc.vchan:
2022-01-12 04:52:59 +01:00
log.Println("Received viewer", viewer.RemoteAddr().String() )
viewers = append(viewers, viewer)
2022-01-09 14:56:43 +01:00
case <-tc.ws_done:
log.Println("Exit ptyStdoutToWs routine as ws is going away")
break out
case <-tc.pty_done:
log.Println("Exit ptyStdoutToWs routine as pty is going away")
break out // do not block on these two channels
2022-01-09 14:56:43 +01:00
}
2022-01-09 14:56:43 +01:00
}
// close the watcher
2022-01-10 04:04:23 +01:00
for _, w := range viewers {
2022-01-09 14:56:43 +01:00
if w != nil {
w.Close()
}
}
log.Println("ptyStdoutToWs routine exited")
2022-01-09 14:56:43 +01:00
}
2022-01-10 15:35:05 +01:00
// this function should be executed by the main goroutine for the connection
func (tc *TermConn) release() {
2022-01-12 03:18:19 +01:00
log.Println("Releasing terminal connection", tc.Name)
2022-01-10 04:23:52 +01:00
2022-01-12 03:18:19 +01:00
registry.removePlayer(tc.Name)
2022-01-10 04:04:23 +01:00
if tc.ptmx != nil {
// cleanup the pty and its related process
tc.ptmx.Close()
// terminate the command line process
proc := tc.cmd.Process
// send an interrupt, this will cause the shell process to
// return from syscalls if any is pending
if err := proc.Signal(os.Interrupt); err != nil {
log.Printf("Failed to send Interrupt to shell process(%v): %v ", proc.Pid, err)
}
// Wait for a second for shell process to interrupt before kill it
time.Sleep(time.Second)
log.Printf("Try to kill the shell process(%v)", proc.Pid)
if err := proc.Signal(os.Kill); err != nil {
log.Printf("Failed to send KILL to shell process(%v): %v", proc.Pid, err)
}
if _, err := proc.Wait(); err != nil {
log.Printf("Failed to wait for shell process(%v): %v", proc.Pid, err)
}
close(tc.vchan)
}
tc.ws.Close()
}
2022-01-09 14:56:43 +01:00
// handle websockets
2022-01-10 23:16:22 +01:00
func handlePlayer(w http.ResponseWriter, r *http.Request, cmdline []string) {
2022-01-09 14:56:43 +01:00
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("Failed to create websocket: ", err)
return
}
2022-01-10 04:04:23 +01:00
tc := TermConn{
ws: ws,
2022-01-12 03:18:19 +01:00
Name: uniuri.New(),
Ip: ws.RemoteAddr().String(),
2022-01-10 04:04:23 +01:00
}
2022-01-09 14:56:43 +01:00
2022-01-10 15:35:05 +01:00
defer tc.release()
2022-01-12 04:57:23 +01:00
log.Println("Created the websocket to", ws.RemoteAddr().String())
2022-01-09 14:56:43 +01:00
tc.ws_done = make(chan struct{})
tc.pty_done = make(chan struct{})
tc.vchan = make(chan *websocket.Conn)
2022-01-10 23:16:22 +01:00
if err := tc.createPty(cmdline); err != nil {
2022-01-09 14:56:43 +01:00
log.Println("Failed to create PTY: ", err)
return
}
2022-01-12 03:18:19 +01:00
registry.addPlayer(&tc)
2022-01-09 14:56:43 +01:00
2022-01-10 04:04:23 +01:00
// main event loop to shovel data between ws and pty
2022-01-10 15:35:05 +01:00
// do not call ptyStdoutToWs in this goroutine, otherwise
// the websocket will not close. This is because ptyStdoutToWs
// is usually blocked in the pty.Read
var wg sync.WaitGroup
wg.Add(3)
go tc.ping(&wg)
go tc.ptyStdoutToWs(&wg)
go tc.wsToPtyStdin(&wg)
2022-01-09 14:56:43 +01:00
wg.Wait()
log.Println("Wait returned")
2022-01-09 14:56:43 +01:00
}
// handle websockets
2022-01-12 03:18:19 +01:00
func handleViewer(w http.ResponseWriter, r *http.Request, path string) {
2022-01-09 14:56:43 +01:00
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("Failed to create websocket: ", err)
return
}
2022-01-12 04:57:23 +01:00
log.Println("Created the websocket to", ws.RemoteAddr().String())
2022-01-12 03:18:19 +01:00
if !registry.sendToPlayer(path, ws) {
2022-01-10 15:35:05 +01:00
log.Println("Failed to send websocket to player, close it")
2022-01-09 14:56:43 +01:00
ws.Close()
}
}
2022-01-12 03:18:19 +01:00
func ConnectTerm(w http.ResponseWriter, r *http.Request, isViewer bool, path string, cmdline []string) {
2022-01-10 04:04:23 +01:00
if !isViewer {
2022-01-10 23:16:22 +01:00
handlePlayer(w, r, cmdline)
2022-01-09 14:56:43 +01:00
} else {
2022-01-12 03:18:19 +01:00
handleViewer(w, r, path)
2022-01-09 14:56:43 +01:00
}
}
2022-01-10 23:16:22 +01:00
func Init(checkOrigin func(r *http.Request) bool) {
upgrader.CheckOrigin = checkOrigin
registry.init()
}