diff --git a/irc/client.go b/irc/client.go index 0e1b15de..ef15e237 100644 --- a/irc/client.go +++ b/irc/client.go @@ -675,6 +675,11 @@ func (client *Client) destroy(beingResumed bool) { return } + // see #235: deduplicating the list of PART recipients uses (comparatively speaking) + // a lot of RAM, so limit concurrency to avoid thrashing + client.server.semaphores.ClientDestroy.Acquire() + defer client.server.semaphores.ClientDestroy.Release() + if beingResumed { client.server.logger.Debug("quit", fmt.Sprintf("%s is being resumed", client.nick)) } else { diff --git a/irc/semaphores.go b/irc/semaphores.go new file mode 100644 index 00000000..a9ce309f --- /dev/null +++ b/irc/semaphores.go @@ -0,0 +1,81 @@ +// Copyright (c) 2018 Shivaram Lingamneni + +package irc + +import ( + "log" + "runtime" + "runtime/debug" +) + +// See #237 for context. Operations that might allocate large amounts of temporary +// garbage, or temporarily tie up some other resource, may cause thrashing unless +// their concurrency is artificially restricted. We use `chan bool` as a +// (regrettably, unary-encoded) counting semaphore to enforce these restrictions. + +const ( + // this is a tradeoff between exploiting CPU-level parallelism (higher values better) + // and not thrashing the allocator (lower values better). really this is all just + // guesswork. oragono *can* make use of cores beyond this limit --- just not for + // the protected operations. + MaxServerSemaphoreCapacity = 32 +) + +// Semaphore is a counting semaphore. Note that a capacity of n requires O(n) storage. +type Semaphore (chan bool) + +// ServerSemaphores includes a named Semaphore corresponding to each concurrency-limited +// sever operation. +type ServerSemaphores struct { + // each distinct operation MUST have its own semaphore; + // methods that acquire a semaphore MUST NOT call methods that acquire another + ClientDestroy Semaphore +} + +// NewServerSemaphores creates a new ServerSemaphores. +func NewServerSemaphores() (result *ServerSemaphores) { + capacity := runtime.NumCPU() + if capacity > MaxServerSemaphoreCapacity { + capacity = MaxServerSemaphoreCapacity + } + result = new(ServerSemaphores) + result.ClientDestroy.Initialize(capacity) + return +} + +// Initialize initializes a semaphore to a given capacity. +func (semaphore *Semaphore) Initialize(capacity int) { + *semaphore = make(chan bool, capacity) + for i := 0; i < capacity; i++ { + (*semaphore) <- true + } +} + +// Acquire acquires a semaphore, blocking if necessary. +func (semaphore *Semaphore) Acquire() { + <-(*semaphore) +} + +// TryAcquire tries to acquire a semaphore, returning whether the acquire was +// successful. It never blocks. +func (semaphore *Semaphore) TryAcquire() (acquired bool) { + select { + case <-(*semaphore): + return true + default: + return false + } +} + +// Release releases a semaphore. It never blocks. (This is not a license +// to program spurious releases.) +func (semaphore *Semaphore) Release() { + select { + case (*semaphore) <- true: + // good + default: + // spurious release + log.Printf("spurious semaphore release (full to capacity %d)", cap(*semaphore)) + debug.PrintStack() + } +} diff --git a/irc/server.go b/irc/server.go index c0342f18..625bfd43 100644 --- a/irc/server.go +++ b/irc/server.go @@ -131,6 +131,7 @@ type Server struct { webirc []webircConfig whoWas *WhoWasList stats *Stats + semaphores *ServerSemaphores } var ( @@ -165,6 +166,7 @@ func NewServer(config *Config, logger *logger.Manager) (*Server, error) { snomasks: NewSnoManager(), whoWas: NewWhoWasList(config.Limits.WhowasEntries), stats: NewStats(), + semaphores: NewServerSemaphores(), } if err := server.applyConfig(config, true); err != nil { diff --git a/irc/socket.go b/irc/socket.go index 5b0d2e19..6382e498 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -32,7 +32,7 @@ type Socket struct { maxSendQBytes int // this is a trylock enforcing that only one goroutine can write to `conn` at a time - writerSlotOpen chan bool + writerSemaphore Semaphore buffer []byte closed bool @@ -44,12 +44,11 @@ type Socket struct { // NewSocket returns a new Socket. func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) *Socket { result := Socket{ - conn: conn, - reader: bufio.NewReaderSize(conn, maxReadQBytes), - maxSendQBytes: maxSendQBytes, - writerSlotOpen: make(chan bool, 1), + conn: conn, + reader: bufio.NewReaderSize(conn, maxReadQBytes), + maxSendQBytes: maxSendQBytes, } - result.writerSlotOpen <- true + result.writerSemaphore.Initialize(1) return &result } @@ -140,14 +139,11 @@ func (socket *Socket) Write(data string) (err error) { // wakeWriter starts the goroutine that actually performs the write, without blocking func (socket *Socket) wakeWriter() { - // attempt to acquire the trylock - select { - case <-socket.writerSlotOpen: + if socket.writerSemaphore.TryAcquire() { // acquired the trylock; send() will release it go socket.send() - default: - // failed to acquire; the holder will check for more data after releasing it } + // else: do nothing, the holder will check for more data after releasing it } // SetFinalData sets the final data to send when the SocketWriter closes. @@ -179,19 +175,17 @@ func (socket *Socket) send() { socket.performWrite() // surrender the trylock, avoiding a race where a write comes in after we've // checked readyToWrite() and it returned false, but while we still hold the trylock: - socket.writerSlotOpen <- true + socket.writerSemaphore.Release() // check if more data came in while we held the trylock: if !socket.readyToWrite() { return } - select { - case <-socket.writerSlotOpen: - // got the trylock, loop back around and write - default: + if !socket.writerSemaphore.TryAcquire() { // failed to acquire; exit and wait for the holder to observe readyToWrite() // after releasing it return } + // got the lock again, loop back around and write } }