From b97ae00fe379c55a92f5c2124d22e5d008fdce6d Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Tue, 24 Apr 2018 03:11:11 -0400 Subject: [PATCH 1/4] refactor channel join and part --- irc/channel.go | 172 ++++++++++++++++++++++++------------------------- irc/client.go | 14 ++-- 2 files changed, 96 insertions(+), 90 deletions(-) diff --git a/irc/channel.go b/irc/channel.go index 4c626fb5..fd93df0e 100644 --- a/irc/channel.go +++ b/irc/channel.go @@ -25,7 +25,6 @@ type Channel struct { key string members MemberSet membersCache []*Client // allow iteration over channel members without holding the lock - membersCacheMutex sync.Mutex // tier 2; see `regenerateMembersCache` name string nameCasefolded string server *Server @@ -33,6 +32,7 @@ type Channel struct { registeredFounder string registeredTime time.Time stateMutex sync.RWMutex // tier 1 + joinPartMutex sync.Mutex // tier 3 topic string topicSetBy string topicSetTime time.Time @@ -163,33 +163,19 @@ func (channel *Channel) IsRegistered() bool { return channel.registeredFounder != "" } -func (channel *Channel) regenerateMembersCache(noLocksNeeded bool) { - // this is eventually consistent even without holding stateMutex.Lock() - // throughout the update; all updates to `members` while holding Lock() - // have a serial order, so the call to `regenerateMembersCache` that - // happens-after the last one will see *all* the updates. then, - // `membersCacheMutex` ensures that this final read is correctly paired - // with the final write to `membersCache`. - if !noLocksNeeded { - channel.membersCacheMutex.Lock() - defer channel.membersCacheMutex.Unlock() - channel.stateMutex.RLock() - } - +func (channel *Channel) regenerateMembersCache() { + channel.stateMutex.RLock() result := make([]*Client, len(channel.members)) i := 0 for client := range channel.members { result[i] = client i++ } - if !noLocksNeeded { - channel.stateMutex.RUnlock() - channel.stateMutex.Lock() - } + channel.stateMutex.RUnlock() + + channel.stateMutex.Lock() channel.membersCache = result - if !noLocksNeeded { - channel.stateMutex.Unlock() - } + channel.stateMutex.Unlock() } // Names sends the list of users joined to the channel to the given client. @@ -381,110 +367,119 @@ func (channel *Channel) Join(client *Client, key string, rb *ResponseBuffer) { return } + chname := channel.Name() + if channel.IsFull() { - rb.Add(nil, client.server.name, ERR_CHANNELISFULL, channel.name, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "l")) + rb.Add(nil, client.server.name, ERR_CHANNELISFULL, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "l")) return } if !channel.CheckKey(key) { - rb.Add(nil, client.server.name, ERR_BADCHANNELKEY, channel.name, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "k")) + rb.Add(nil, client.server.name, ERR_BADCHANNELKEY, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "k")) return } isInvited := channel.lists[modes.InviteMask].Match(client.nickMaskCasefolded) if channel.flags.HasMode(modes.InviteOnly) && !isInvited { - rb.Add(nil, client.server.name, ERR_INVITEONLYCHAN, channel.name, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "i")) + rb.Add(nil, client.server.name, ERR_INVITEONLYCHAN, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "i")) return } if channel.lists[modes.BanMask].Match(client.nickMaskCasefolded) && !isInvited && !channel.lists[modes.ExceptMask].Match(client.nickMaskCasefolded) { - rb.Add(nil, client.server.name, ERR_BANNEDFROMCHAN, channel.name, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "b")) + rb.Add(nil, client.server.name, ERR_BANNEDFROMCHAN, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "b")) return } - client.server.logger.Debug("join", fmt.Sprintf("%s joined channel %s", client.nick, channel.name)) + client.server.logger.Debug("join", fmt.Sprintf("%s joined channel %s", client.nick, chname)) - for _, member := range channel.Members() { - if member == client { - if member.capabilities.Has(caps.ExtendedJoin) { - rb.Add(nil, client.nickMaskString, "JOIN", channel.name, client.AccountName(), client.realname) - } else { - rb.Add(nil, client.nickMaskString, "JOIN", channel.name) - } - } else { - if member.capabilities.Has(caps.ExtendedJoin) { - member.Send(nil, client.nickMaskString, "JOIN", channel.name, client.AccountName(), client.realname) - } else { - member.Send(nil, client.nickMaskString, "JOIN", channel.name) - } - } - } + newChannel, givenMode := func() (newChannel bool, givenMode modes.Mode) { + channel.joinPartMutex.Lock() + defer channel.joinPartMutex.Unlock() - channel.stateMutex.Lock() - channel.members.Add(client) - firstJoin := len(channel.members) == 1 - channel.stateMutex.Unlock() - channel.regenerateMembersCache(false) + func() { + account := client.Account() + channel.stateMutex.Lock() + defer channel.stateMutex.Unlock() + + channel.members.Add(client) + firstJoin := len(channel.members) == 1 + newChannel = firstJoin && channel.registeredFounder == "" + if newChannel { + givenMode = modes.ChannelOperator + } else { + givenMode = channel.accountToUMode[account] + } + if givenMode != 0 { + channel.members[client].SetMode(givenMode, true) + } + }() + + channel.regenerateMembersCache() + return + }() client.addChannel(channel) - account := client.Account() + nick := client.Nick() + nickmask := client.NickMaskString() + realname := client.Realname() + accountName := client.AccountName() + var modestr string + if givenMode != 0 { + modestr = fmt.Sprintf("+%v", givenMode) + } - // give channel mode if necessary - channel.stateMutex.Lock() - newChannel := firstJoin && channel.registeredFounder == "" - mode, persistentModeExists := channel.accountToUMode[account] - var givenMode *modes.Mode - if persistentModeExists { - givenMode = &mode - } else if newChannel { - givenMode = &modes.ChannelOperator + for _, member := range channel.Members() { + if member == client { + continue + } + if member.capabilities.Has(caps.ExtendedJoin) { + member.Send(nil, nickmask, "JOIN", chname, accountName, realname) + } else { + member.Send(nil, nickmask, "JOIN", chname) + } + if givenMode != 0 { + member.Send(nil, client.server.name, "MODE", chname, modestr, nick) + } } - if givenMode != nil { - channel.members[client].SetMode(*givenMode, true) - } - channel.stateMutex.Unlock() if client.capabilities.Has(caps.ExtendedJoin) { - rb.Add(nil, client.nickMaskString, "JOIN", channel.name, client.AccountName(), client.realname) + rb.Add(nil, nickmask, "JOIN", chname, accountName, realname) } else { - rb.Add(nil, client.nickMaskString, "JOIN", channel.name) + rb.Add(nil, nickmask, "JOIN", chname) } + // don't send topic when it's an entirely new channel if !newChannel { channel.SendTopic(client, rb) } + channel.Names(client, rb) - if givenMode != nil { - for _, member := range channel.Members() { - if member == client { - rb.Add(nil, client.server.name, "MODE", channel.name, fmt.Sprintf("+%v", *givenMode), client.nick) - } else { - member.Send(nil, client.server.name, "MODE", channel.name, fmt.Sprintf("+%v", *givenMode), client.nick) - } - } + + if givenMode != 0 { + rb.Add(nil, client.server.name, "MODE", chname, modestr, nick) } } // Part parts the given client from this channel, with the given message. func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer) { + chname := channel.Name() if !channel.hasClient(client) { - rb.Add(nil, client.server.name, ERR_NOTONCHANNEL, channel.name, client.t("You're not on that channel")) + rb.Add(nil, client.server.name, ERR_NOTONCHANNEL, chname, client.t("You're not on that channel")) return } - for _, member := range channel.Members() { - if member == client { - rb.Add(nil, client.nickMaskString, "PART", channel.name, message) - } else { - member.Send(nil, client.nickMaskString, "PART", channel.name, message) - } - } channel.Quit(client) - client.server.logger.Debug("part", fmt.Sprintf("%s left channel %s", client.nick, channel.name)) + nickmask := client.NickMaskString() + for _, member := range channel.Members() { + member.Send(nil, nickmask, "PART", chname, message) + } + rb.Add(nil, nickmask, "PART", chname, message) + + client.server.logger.Debug("part", fmt.Sprintf("%s left channel %s", client.nick, chname)) } // SendTopic sends the channel topic to the given client. @@ -762,17 +757,22 @@ func (channel *Channel) applyModeMask(client *Client, mode modes.Mode, op modes. // Quit removes the given client from the channel func (channel *Channel) Quit(client *Client) { - channel.stateMutex.Lock() - channel.members.Remove(client) - empty := len(channel.members) == 0 - channel.stateMutex.Unlock() - channel.regenerateMembersCache(false) + channelEmpty := func() bool { + channel.joinPartMutex.Lock() + defer channel.joinPartMutex.Unlock() - client.removeChannel(channel) + channel.stateMutex.Lock() + channel.members.Remove(client) + channelEmpty := len(channel.members) == 0 + channel.stateMutex.Unlock() + channel.regenerateMembersCache() + return channelEmpty + }() - if empty { + if channelEmpty { client.server.channels.Cleanup(channel) } + client.removeChannel(channel) } func (channel *Channel) Kick(client *Client, target *Client, comment string, rb *ResponseBuffer) { diff --git a/irc/client.go b/irc/client.go index 9d808ff2..d45598ce 100644 --- a/irc/client.go +++ b/irc/client.go @@ -409,16 +409,20 @@ func (client *Client) TryResume() { client.nick = oldClient.nick client.updateNickMaskNoMutex() - for channel := range oldClient.channels { - channel.stateMutex.Lock() + rejoinChannel := func(channel *Channel) { + channel.joinPartMutex.Lock() + defer channel.joinPartMutex.Unlock() + channel.stateMutex.Lock() client.channels[channel] = true client.resumeDetails.SendFakeJoinsFor = append(client.resumeDetails.SendFakeJoinsFor, channel.name) oldModeSet := channel.members[oldClient] channel.members.Remove(oldClient) channel.members[client] = oldModeSet - channel.regenerateMembersCache(true) + channel.stateMutex.Unlock() + + channel.regenerateMembersCache() // construct fake modestring if necessary oldModes := oldModeSet.String() @@ -447,8 +451,10 @@ func (client *Client) TryResume() { member.Send(nil, server.name, "MODE", params...) } } + } - channel.stateMutex.Unlock() + for channel := range oldClient.channels { + rejoinChannel(channel) } server.clients.byNick[oldnick] = client From ef35c587fc1528050ce306848127bb79a51ba7ba Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Tue, 24 Apr 2018 20:23:01 -0400 Subject: [PATCH 2/4] remove redundant friends computation --- irc/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/irc/client.go b/irc/client.go index d45598ce..0e1b15de 100644 --- a/irc/client.go +++ b/irc/client.go @@ -684,8 +684,6 @@ func (client *Client) destroy(beingResumed bool) { // send quit/error message to client if they haven't been sent already client.Quit("Connection closed") - friends := client.Friends() - friends.Remove(client) if !beingResumed { client.server.whoWas.Append(client) } @@ -703,6 +701,7 @@ func (client *Client) destroy(beingResumed bool) { client.server.monitorManager.RemoveAll(client) // clean up channels + friends := make(ClientSet) for _, channel := range client.Channels() { if !beingResumed { channel.Quit(client) @@ -711,6 +710,7 @@ func (client *Client) destroy(beingResumed bool) { friends.Add(member) } } + friends.Remove(client) // clean up server if !beingResumed { From 65338938630a29e07fac660d78db6be9f9ddbd4c Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Tue, 24 Apr 2018 05:46:01 -0400 Subject: [PATCH 3/4] optimized implementation of Channel.Names() --- irc/channel.go | 86 ++++++++++++++++++++++---------------------------- 1 file changed, 38 insertions(+), 48 deletions(-) diff --git a/irc/channel.go b/irc/channel.go index fd93df0e..a48e7d8b 100644 --- a/irc/channel.go +++ b/irc/channel.go @@ -6,6 +6,7 @@ package irc import ( + "bytes" "crypto/subtle" "fmt" "strconv" @@ -24,7 +25,7 @@ type Channel struct { lists map[modes.Mode]*UserMaskSet key string members MemberSet - membersCache []*Client // allow iteration over channel members without holding the lock + membersCache []*Client // allow iteration over channel members without holding the lock name string nameCasefolded string server *Server @@ -180,27 +181,47 @@ func (channel *Channel) regenerateMembersCache() { // Names sends the list of users joined to the channel to the given client. func (channel *Channel) Names(client *Client, rb *ResponseBuffer) { - currentNicks := channel.nicks(client) - // assemble and send replies - maxNamLen := 480 - len(client.server.name) - len(client.nick) - var buffer string - for _, nick := range currentNicks { - if buffer == "" { - buffer += nick + isMultiPrefix := client.capabilities.Has(caps.MultiPrefix) + isUserhostInNames := client.capabilities.Has(caps.UserhostInNames) + + maxNamLen := 480 - len(client.server.name) - len(client.Nick()) + var namesLines []string + var buffer bytes.Buffer + for _, target := range channel.Members() { + var nick string + if isUserhostInNames { + nick = target.NickMaskString() + } else { + nick = target.Nick() + } + channel.stateMutex.RLock() + modes := channel.members[target] + channel.stateMutex.RUnlock() + if modes == nil { continue } - - if len(buffer)+1+len(nick) > maxNamLen { - rb.Add(nil, client.server.name, RPL_NAMREPLY, client.nick, "=", channel.name, buffer) - buffer = nick - continue + prefix := modes.Prefixes(isMultiPrefix) + if buffer.Len()+len(nick)+len(prefix)+1 > maxNamLen { + namesLines = append(namesLines, buffer.String()) + // memset(&buffer, 0, sizeof(bytes.Buffer)); + var newBuffer bytes.Buffer + buffer = newBuffer } - - buffer += " " - buffer += nick + if buffer.Len() > 0 { + buffer.WriteString(" ") + } + buffer.WriteString(prefix) + buffer.WriteString(nick) + } + if buffer.Len() > 0 { + namesLines = append(namesLines, buffer.String()) } - rb.Add(nil, client.server.name, RPL_NAMREPLY, client.nick, "=", channel.name, buffer) + for _, line := range namesLines { + if buffer.Len() > 0 { + rb.Add(nil, client.server.name, RPL_NAMREPLY, client.nick, "=", channel.name, line) + } + } rb.Add(nil, client.server.name, RPL_ENDOFNAMES, client.nick, channel.name, client.t("End of NAMES list")) } @@ -263,37 +284,6 @@ func (channel *Channel) ClientHasPrivsOver(client *Client, target *Client) bool return result } -func (channel *Channel) nicks(target *Client) []string { - isMultiPrefix := (target != nil) && target.capabilities.Has(caps.MultiPrefix) - isUserhostInNames := (target != nil) && target.capabilities.Has(caps.UserhostInNames) - - // slightly cumbersome: get the mutex and copy both the client pointers and - // the mode prefixes - channel.stateMutex.RLock() - length := len(channel.members) - clients := make([]*Client, length) - result := make([]string, length) - i := 0 - for client, modes := range channel.members { - clients[i] = client - result[i] = modes.Prefixes(isMultiPrefix) - i++ - } - channel.stateMutex.RUnlock() - - i = 0 - for i < length { - if isUserhostInNames { - result[i] += clients[i].NickMaskString() - } else { - result[i] += clients[i].Nick() - } - i++ - } - - return result -} - func (channel *Channel) hasClient(client *Client) bool { channel.stateMutex.RLock() defer channel.stateMutex.RUnlock() From ebfef1e848eb72af83139ed04cf361cf5276315d Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Tue, 24 Apr 2018 20:34:28 -0400 Subject: [PATCH 4/4] add Semaphore and ServerSemaphores --- irc/client.go | 5 +++ irc/semaphores.go | 81 +++++++++++++++++++++++++++++++++++++++++++++++ irc/server.go | 2 ++ irc/socket.go | 26 ++++++--------- 4 files changed, 98 insertions(+), 16 deletions(-) create mode 100644 irc/semaphores.go diff --git a/irc/client.go b/irc/client.go index 0e1b15de..ef15e237 100644 --- a/irc/client.go +++ b/irc/client.go @@ -675,6 +675,11 @@ func (client *Client) destroy(beingResumed bool) { return } + // see #235: deduplicating the list of PART recipients uses (comparatively speaking) + // a lot of RAM, so limit concurrency to avoid thrashing + client.server.semaphores.ClientDestroy.Acquire() + defer client.server.semaphores.ClientDestroy.Release() + if beingResumed { client.server.logger.Debug("quit", fmt.Sprintf("%s is being resumed", client.nick)) } else { diff --git a/irc/semaphores.go b/irc/semaphores.go new file mode 100644 index 00000000..a9ce309f --- /dev/null +++ b/irc/semaphores.go @@ -0,0 +1,81 @@ +// Copyright (c) 2018 Shivaram Lingamneni + +package irc + +import ( + "log" + "runtime" + "runtime/debug" +) + +// See #237 for context. Operations that might allocate large amounts of temporary +// garbage, or temporarily tie up some other resource, may cause thrashing unless +// their concurrency is artificially restricted. We use `chan bool` as a +// (regrettably, unary-encoded) counting semaphore to enforce these restrictions. + +const ( + // this is a tradeoff between exploiting CPU-level parallelism (higher values better) + // and not thrashing the allocator (lower values better). really this is all just + // guesswork. oragono *can* make use of cores beyond this limit --- just not for + // the protected operations. + MaxServerSemaphoreCapacity = 32 +) + +// Semaphore is a counting semaphore. Note that a capacity of n requires O(n) storage. +type Semaphore (chan bool) + +// ServerSemaphores includes a named Semaphore corresponding to each concurrency-limited +// sever operation. +type ServerSemaphores struct { + // each distinct operation MUST have its own semaphore; + // methods that acquire a semaphore MUST NOT call methods that acquire another + ClientDestroy Semaphore +} + +// NewServerSemaphores creates a new ServerSemaphores. +func NewServerSemaphores() (result *ServerSemaphores) { + capacity := runtime.NumCPU() + if capacity > MaxServerSemaphoreCapacity { + capacity = MaxServerSemaphoreCapacity + } + result = new(ServerSemaphores) + result.ClientDestroy.Initialize(capacity) + return +} + +// Initialize initializes a semaphore to a given capacity. +func (semaphore *Semaphore) Initialize(capacity int) { + *semaphore = make(chan bool, capacity) + for i := 0; i < capacity; i++ { + (*semaphore) <- true + } +} + +// Acquire acquires a semaphore, blocking if necessary. +func (semaphore *Semaphore) Acquire() { + <-(*semaphore) +} + +// TryAcquire tries to acquire a semaphore, returning whether the acquire was +// successful. It never blocks. +func (semaphore *Semaphore) TryAcquire() (acquired bool) { + select { + case <-(*semaphore): + return true + default: + return false + } +} + +// Release releases a semaphore. It never blocks. (This is not a license +// to program spurious releases.) +func (semaphore *Semaphore) Release() { + select { + case (*semaphore) <- true: + // good + default: + // spurious release + log.Printf("spurious semaphore release (full to capacity %d)", cap(*semaphore)) + debug.PrintStack() + } +} diff --git a/irc/server.go b/irc/server.go index c0342f18..625bfd43 100644 --- a/irc/server.go +++ b/irc/server.go @@ -131,6 +131,7 @@ type Server struct { webirc []webircConfig whoWas *WhoWasList stats *Stats + semaphores *ServerSemaphores } var ( @@ -165,6 +166,7 @@ func NewServer(config *Config, logger *logger.Manager) (*Server, error) { snomasks: NewSnoManager(), whoWas: NewWhoWasList(config.Limits.WhowasEntries), stats: NewStats(), + semaphores: NewServerSemaphores(), } if err := server.applyConfig(config, true); err != nil { diff --git a/irc/socket.go b/irc/socket.go index 5b0d2e19..6382e498 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -32,7 +32,7 @@ type Socket struct { maxSendQBytes int // this is a trylock enforcing that only one goroutine can write to `conn` at a time - writerSlotOpen chan bool + writerSemaphore Semaphore buffer []byte closed bool @@ -44,12 +44,11 @@ type Socket struct { // NewSocket returns a new Socket. func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) *Socket { result := Socket{ - conn: conn, - reader: bufio.NewReaderSize(conn, maxReadQBytes), - maxSendQBytes: maxSendQBytes, - writerSlotOpen: make(chan bool, 1), + conn: conn, + reader: bufio.NewReaderSize(conn, maxReadQBytes), + maxSendQBytes: maxSendQBytes, } - result.writerSlotOpen <- true + result.writerSemaphore.Initialize(1) return &result } @@ -140,14 +139,11 @@ func (socket *Socket) Write(data string) (err error) { // wakeWriter starts the goroutine that actually performs the write, without blocking func (socket *Socket) wakeWriter() { - // attempt to acquire the trylock - select { - case <-socket.writerSlotOpen: + if socket.writerSemaphore.TryAcquire() { // acquired the trylock; send() will release it go socket.send() - default: - // failed to acquire; the holder will check for more data after releasing it } + // else: do nothing, the holder will check for more data after releasing it } // SetFinalData sets the final data to send when the SocketWriter closes. @@ -179,19 +175,17 @@ func (socket *Socket) send() { socket.performWrite() // surrender the trylock, avoiding a race where a write comes in after we've // checked readyToWrite() and it returned false, but while we still hold the trylock: - socket.writerSlotOpen <- true + socket.writerSemaphore.Release() // check if more data came in while we held the trylock: if !socket.readyToWrite() { return } - select { - case <-socket.writerSlotOpen: - // got the trylock, loop back around and write - default: + if !socket.writerSemaphore.TryAcquire() { // failed to acquire; exit and wait for the holder to observe readyToWrite() // after releasing it return } + // got the lock again, loop back around and write } }