diff --git a/irc/client.go b/irc/client.go index d309102a..7e2cdde8 100644 --- a/irc/client.go +++ b/irc/client.go @@ -90,7 +90,6 @@ func NewClient(server *Server, conn net.Conn, isTLS bool) *Client { limits := server.Limits() fullLineLenLimit := limits.LineLen.Tags + limits.LineLen.Rest socket := NewSocket(conn, fullLineLenLimit*2, server.MaxSendQBytes()) - go socket.RunSocketWriter() client := &Client{ atime: now, authorized: server.Password() == nil, diff --git a/irc/socket.go b/irc/socket.go index 0f98a6f9..f4ac6917 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -31,23 +31,26 @@ type Socket struct { maxSendQBytes int - // coordination system for asynchronous writes - buffer []byte - lineToSendExists chan bool + // this is a trylock enforcing that only one goroutine can write to `conn` at a time + writerSlotOpen chan bool + buffer []byte closed bool sendQExceeded bool finalData string // what to send when we die + finalized bool } // NewSocket returns a new Socket. func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket { - return Socket{ - conn: conn, - reader: bufio.NewReaderSize(conn, maxReadQBytes), - maxSendQBytes: maxSendQBytes, - lineToSendExists: make(chan bool, 1), + result := Socket{ + conn: conn, + reader: bufio.NewReaderSize(conn, maxReadQBytes), + maxSendQBytes: maxSendQBytes, + writerSlotOpen: make(chan bool, 1), } + result.writerSlotOpen <- true + return result } // Close stops a Socket from being able to send/receive any more data. @@ -56,7 +59,7 @@ func (socket *Socket) Close() { socket.closed = true socket.Unlock() - socket.wakeWriter() + go socket.send() } // CertFP returns the fingerprint of the certificate provided by the client. @@ -114,7 +117,11 @@ func (socket *Socket) Read() (string, error) { return line, nil } -// Write sends the given string out of Socket. +// Write sends the given string out of Socket. Requirements: +// 1. MUST NOT block for macroscopic amounts of time +// 2. MUST NOT reorder messages +// 3. MUST provide mutual exclusion for socket.conn.Write +// 4. SHOULD NOT tie up additional goroutines, beyond the one blocked on socket.conn.Write func (socket *Socket) Write(data string) (err error) { socket.Lock() if socket.closed { @@ -127,19 +134,10 @@ func (socket *Socket) Write(data string) (err error) { } socket.Unlock() - socket.wakeWriter() + go socket.send() return } -// wakeWriter wakes up the goroutine that actually performs the write, without blocking -func (socket *Socket) wakeWriter() { - // nonblocking send to the channel, no-op if it's full - select { - case socket.lineToSendExists <- true: - default: - } -} - // SetFinalData sets the final data to send when the SocketWriter closes. func (socket *Socket) SetFinalData(data string) { socket.Lock() @@ -154,32 +152,53 @@ func (socket *Socket) IsClosed() bool { return socket.closed } -// RunSocketWriter starts writing messages to the outgoing socket. -func (socket *Socket) RunSocketWriter() { - localBuffer := make([]byte, 0) - shouldStop := false - for !shouldStop { - // wait for new lines +// is there data to write? +func (socket *Socket) readyToWrite() bool { + socket.Lock() + defer socket.Unlock() + // on the first time observing socket.closed, we still have to write socket.finalData + return !socket.finalized && (len(socket.buffer) > 0 || socket.closed || socket.sendQExceeded) +} + +// send actually writes messages to socket.Conn; it may block +func (socket *Socket) send() { + // one of these checks happens-after every call to Write(), so we can't miss writes + for socket.readyToWrite() { select { - case <-socket.lineToSendExists: - // retrieve the buffered data, clear the buffer - socket.Lock() - localBuffer = append(localBuffer, socket.buffer...) - socket.buffer = socket.buffer[:0] - socket.Unlock() - - _, err := socket.conn.Write(localBuffer) - localBuffer = localBuffer[:0] - - socket.Lock() - shouldStop = (err != nil) || socket.closed || socket.sendQExceeded - socket.Unlock() + case <-socket.writerSlotOpen: + // got the trylock: actually do the write + socket.performWrite() + socket.writerSlotOpen <- true + default: + // another goroutine is in progress; exit and wait for them to loop back around + // and observe readyToWrite() again + return } } +} + +// write the contents of the buffer, then see if we need to close +func (socket *Socket) performWrite() { + // retrieve the buffered data, clear the buffer + socket.Lock() + buffer := socket.buffer + socket.buffer = nil + socket.Unlock() + + _, err := socket.conn.Write(buffer) + + socket.Lock() + shouldClose := (err != nil) || socket.closed || socket.sendQExceeded + socket.Unlock() + + if !shouldClose { + return + } // mark the socket closed (if someone hasn't already), then write error lines socket.Lock() socket.closed = true + socket.finalized = true finalData := socket.finalData if socket.sendQExceeded { finalData = "\r\nERROR :SendQ Exceeded\r\n"