diff --git a/irc/client.go b/irc/client.go index 6baec8ec..3eabceb2 100644 --- a/irc/client.go +++ b/irc/client.go @@ -90,7 +90,6 @@ func NewClient(server *Server, conn net.Conn, isTLS bool) *Client { limits := server.Limits() fullLineLenLimit := limits.LineLen.Tags + limits.LineLen.Rest socket := NewSocket(conn, fullLineLenLimit*2, server.MaxSendQBytes()) - go socket.RunSocketWriter() client := &Client{ atime: now, authorized: server.Password() == nil, @@ -101,7 +100,7 @@ func NewClient(server *Server, conn net.Conn, isTLS bool) *Client { ctime: now, flags: make(map[modes.Mode]bool), server: server, - socket: &socket, + socket: socket, nick: "*", // * is used until actual nick is given nickCasefolded: "*", nickMaskString: "*", // * is used until actual nick is given diff --git a/irc/socket.go b/irc/socket.go index 0f98a6f9..5b0d2e19 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -31,23 +31,26 @@ type Socket struct { maxSendQBytes int - // coordination system for asynchronous writes - buffer []byte - lineToSendExists chan bool + // this is a trylock enforcing that only one goroutine can write to `conn` at a time + writerSlotOpen chan bool + buffer []byte closed bool sendQExceeded bool finalData string // what to send when we die + finalized bool } // NewSocket returns a new Socket. -func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket { - return Socket{ - conn: conn, - reader: bufio.NewReaderSize(conn, maxReadQBytes), - maxSendQBytes: maxSendQBytes, - lineToSendExists: make(chan bool, 1), +func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) *Socket { + result := Socket{ + conn: conn, + reader: bufio.NewReaderSize(conn, maxReadQBytes), + maxSendQBytes: maxSendQBytes, + writerSlotOpen: make(chan bool, 1), } + result.writerSlotOpen <- true + return &result } // Close stops a Socket from being able to send/receive any more data. @@ -114,7 +117,11 @@ func (socket *Socket) Read() (string, error) { return line, nil } -// Write sends the given string out of Socket. +// Write sends the given string out of Socket. Requirements: +// 1. MUST NOT block for macroscopic amounts of time +// 2. MUST NOT reorder messages +// 3. MUST provide mutual exclusion for socket.conn.Write +// 4. SHOULD NOT tie up additional goroutines, beyond the one blocked on socket.conn.Write func (socket *Socket) Write(data string) (err error) { socket.Lock() if socket.closed { @@ -131,12 +138,15 @@ func (socket *Socket) Write(data string) (err error) { return } -// wakeWriter wakes up the goroutine that actually performs the write, without blocking +// wakeWriter starts the goroutine that actually performs the write, without blocking func (socket *Socket) wakeWriter() { - // nonblocking send to the channel, no-op if it's full + // attempt to acquire the trylock select { - case socket.lineToSendExists <- true: + case <-socket.writerSlotOpen: + // acquired the trylock; send() will release it + go socket.send() default: + // failed to acquire; the holder will check for more data after releasing it } } @@ -154,32 +164,59 @@ func (socket *Socket) IsClosed() bool { return socket.closed } -// RunSocketWriter starts writing messages to the outgoing socket. -func (socket *Socket) RunSocketWriter() { - localBuffer := make([]byte, 0) - shouldStop := false - for !shouldStop { - // wait for new lines - select { - case <-socket.lineToSendExists: - // retrieve the buffered data, clear the buffer - socket.Lock() - localBuffer = append(localBuffer, socket.buffer...) - socket.buffer = socket.buffer[:0] - socket.Unlock() +// is there data to write? +func (socket *Socket) readyToWrite() bool { + socket.Lock() + defer socket.Unlock() + // on the first time observing socket.closed, we still have to write socket.finalData + return !socket.finalized && (len(socket.buffer) > 0 || socket.closed || socket.sendQExceeded) +} - _, err := socket.conn.Write(localBuffer) - localBuffer = localBuffer[:0] - - socket.Lock() - shouldStop = (err != nil) || socket.closed || socket.sendQExceeded - socket.Unlock() +// send actually writes messages to socket.Conn; it may block +func (socket *Socket) send() { + for { + // we are holding the trylock: actually do the write + socket.performWrite() + // surrender the trylock, avoiding a race where a write comes in after we've + // checked readyToWrite() and it returned false, but while we still hold the trylock: + socket.writerSlotOpen <- true + // check if more data came in while we held the trylock: + if !socket.readyToWrite() { + return } + select { + case <-socket.writerSlotOpen: + // got the trylock, loop back around and write + default: + // failed to acquire; exit and wait for the holder to observe readyToWrite() + // after releasing it + return + } + } +} + +// write the contents of the buffer, then see if we need to close +func (socket *Socket) performWrite() { + // retrieve the buffered data, clear the buffer + socket.Lock() + buffer := socket.buffer + socket.buffer = nil + socket.Unlock() + + _, err := socket.conn.Write(buffer) + + socket.Lock() + shouldClose := (err != nil) || socket.closed || socket.sendQExceeded + socket.Unlock() + + if !shouldClose { + return } // mark the socket closed (if someone hasn't already), then write error lines socket.Lock() socket.closed = true + socket.finalized = true finalData := socket.finalData if socket.sendQExceeded { finalData = "\r\nERROR :SendQ Exceeded\r\n"