3
0
mirror of https://github.com/ergochat/ergo.git synced 2024-11-29 07:29:31 +01:00

Merge pull request #260 from slingamn/perftesting.12

optimizations related to #237
This commit is contained in:
Daniel Oaks 2018-04-26 21:40:55 +10:00 committed by GitHub
commit 00949442e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 234 additions and 156 deletions

View File

@ -6,6 +6,7 @@
package irc package irc
import ( import (
"bytes"
"crypto/subtle" "crypto/subtle"
"fmt" "fmt"
"strconv" "strconv"
@ -24,8 +25,7 @@ type Channel struct {
lists map[modes.Mode]*UserMaskSet lists map[modes.Mode]*UserMaskSet
key string key string
members MemberSet members MemberSet
membersCache []*Client // allow iteration over channel members without holding the lock membersCache []*Client // allow iteration over channel members without holding the lock
membersCacheMutex sync.Mutex // tier 2; see `regenerateMembersCache`
name string name string
nameCasefolded string nameCasefolded string
server *Server server *Server
@ -33,6 +33,7 @@ type Channel struct {
registeredFounder string registeredFounder string
registeredTime time.Time registeredTime time.Time
stateMutex sync.RWMutex // tier 1 stateMutex sync.RWMutex // tier 1
joinPartMutex sync.Mutex // tier 3
topic string topic string
topicSetBy string topicSetBy string
topicSetTime time.Time topicSetTime time.Time
@ -163,58 +164,64 @@ func (channel *Channel) IsRegistered() bool {
return channel.registeredFounder != "" return channel.registeredFounder != ""
} }
func (channel *Channel) regenerateMembersCache(noLocksNeeded bool) { func (channel *Channel) regenerateMembersCache() {
// this is eventually consistent even without holding stateMutex.Lock() channel.stateMutex.RLock()
// throughout the update; all updates to `members` while holding Lock()
// have a serial order, so the call to `regenerateMembersCache` that
// happens-after the last one will see *all* the updates. then,
// `membersCacheMutex` ensures that this final read is correctly paired
// with the final write to `membersCache`.
if !noLocksNeeded {
channel.membersCacheMutex.Lock()
defer channel.membersCacheMutex.Unlock()
channel.stateMutex.RLock()
}
result := make([]*Client, len(channel.members)) result := make([]*Client, len(channel.members))
i := 0 i := 0
for client := range channel.members { for client := range channel.members {
result[i] = client result[i] = client
i++ i++
} }
if !noLocksNeeded { channel.stateMutex.RUnlock()
channel.stateMutex.RUnlock()
channel.stateMutex.Lock() channel.stateMutex.Lock()
}
channel.membersCache = result channel.membersCache = result
if !noLocksNeeded { channel.stateMutex.Unlock()
channel.stateMutex.Unlock()
}
} }
// Names sends the list of users joined to the channel to the given client. // Names sends the list of users joined to the channel to the given client.
func (channel *Channel) Names(client *Client, rb *ResponseBuffer) { func (channel *Channel) Names(client *Client, rb *ResponseBuffer) {
currentNicks := channel.nicks(client) isMultiPrefix := client.capabilities.Has(caps.MultiPrefix)
// assemble and send replies isUserhostInNames := client.capabilities.Has(caps.UserhostInNames)
maxNamLen := 480 - len(client.server.name) - len(client.nick)
var buffer string maxNamLen := 480 - len(client.server.name) - len(client.Nick())
for _, nick := range currentNicks { var namesLines []string
if buffer == "" { var buffer bytes.Buffer
buffer += nick for _, target := range channel.Members() {
var nick string
if isUserhostInNames {
nick = target.NickMaskString()
} else {
nick = target.Nick()
}
channel.stateMutex.RLock()
modes := channel.members[target]
channel.stateMutex.RUnlock()
if modes == nil {
continue continue
} }
prefix := modes.Prefixes(isMultiPrefix)
if len(buffer)+1+len(nick) > maxNamLen { if buffer.Len()+len(nick)+len(prefix)+1 > maxNamLen {
rb.Add(nil, client.server.name, RPL_NAMREPLY, client.nick, "=", channel.name, buffer) namesLines = append(namesLines, buffer.String())
buffer = nick // memset(&buffer, 0, sizeof(bytes.Buffer));
continue var newBuffer bytes.Buffer
buffer = newBuffer
} }
if buffer.Len() > 0 {
buffer += " " buffer.WriteString(" ")
buffer += nick }
buffer.WriteString(prefix)
buffer.WriteString(nick)
}
if buffer.Len() > 0 {
namesLines = append(namesLines, buffer.String())
} }
rb.Add(nil, client.server.name, RPL_NAMREPLY, client.nick, "=", channel.name, buffer) for _, line := range namesLines {
if buffer.Len() > 0 {
rb.Add(nil, client.server.name, RPL_NAMREPLY, client.nick, "=", channel.name, line)
}
}
rb.Add(nil, client.server.name, RPL_ENDOFNAMES, client.nick, channel.name, client.t("End of NAMES list")) rb.Add(nil, client.server.name, RPL_ENDOFNAMES, client.nick, channel.name, client.t("End of NAMES list"))
} }
@ -277,37 +284,6 @@ func (channel *Channel) ClientHasPrivsOver(client *Client, target *Client) bool
return result return result
} }
func (channel *Channel) nicks(target *Client) []string {
isMultiPrefix := (target != nil) && target.capabilities.Has(caps.MultiPrefix)
isUserhostInNames := (target != nil) && target.capabilities.Has(caps.UserhostInNames)
// slightly cumbersome: get the mutex and copy both the client pointers and
// the mode prefixes
channel.stateMutex.RLock()
length := len(channel.members)
clients := make([]*Client, length)
result := make([]string, length)
i := 0
for client, modes := range channel.members {
clients[i] = client
result[i] = modes.Prefixes(isMultiPrefix)
i++
}
channel.stateMutex.RUnlock()
i = 0
for i < length {
if isUserhostInNames {
result[i] += clients[i].NickMaskString()
} else {
result[i] += clients[i].Nick()
}
i++
}
return result
}
func (channel *Channel) hasClient(client *Client) bool { func (channel *Channel) hasClient(client *Client) bool {
channel.stateMutex.RLock() channel.stateMutex.RLock()
defer channel.stateMutex.RUnlock() defer channel.stateMutex.RUnlock()
@ -381,110 +357,119 @@ func (channel *Channel) Join(client *Client, key string, rb *ResponseBuffer) {
return return
} }
chname := channel.Name()
if channel.IsFull() { if channel.IsFull() {
rb.Add(nil, client.server.name, ERR_CHANNELISFULL, channel.name, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "l")) rb.Add(nil, client.server.name, ERR_CHANNELISFULL, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "l"))
return return
} }
if !channel.CheckKey(key) { if !channel.CheckKey(key) {
rb.Add(nil, client.server.name, ERR_BADCHANNELKEY, channel.name, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "k")) rb.Add(nil, client.server.name, ERR_BADCHANNELKEY, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "k"))
return return
} }
isInvited := channel.lists[modes.InviteMask].Match(client.nickMaskCasefolded) isInvited := channel.lists[modes.InviteMask].Match(client.nickMaskCasefolded)
if channel.flags.HasMode(modes.InviteOnly) && !isInvited { if channel.flags.HasMode(modes.InviteOnly) && !isInvited {
rb.Add(nil, client.server.name, ERR_INVITEONLYCHAN, channel.name, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "i")) rb.Add(nil, client.server.name, ERR_INVITEONLYCHAN, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "i"))
return return
} }
if channel.lists[modes.BanMask].Match(client.nickMaskCasefolded) && if channel.lists[modes.BanMask].Match(client.nickMaskCasefolded) &&
!isInvited && !isInvited &&
!channel.lists[modes.ExceptMask].Match(client.nickMaskCasefolded) { !channel.lists[modes.ExceptMask].Match(client.nickMaskCasefolded) {
rb.Add(nil, client.server.name, ERR_BANNEDFROMCHAN, channel.name, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "b")) rb.Add(nil, client.server.name, ERR_BANNEDFROMCHAN, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "b"))
return return
} }
client.server.logger.Debug("join", fmt.Sprintf("%s joined channel %s", client.nick, channel.name)) client.server.logger.Debug("join", fmt.Sprintf("%s joined channel %s", client.nick, chname))
for _, member := range channel.Members() { newChannel, givenMode := func() (newChannel bool, givenMode modes.Mode) {
if member == client { channel.joinPartMutex.Lock()
if member.capabilities.Has(caps.ExtendedJoin) { defer channel.joinPartMutex.Unlock()
rb.Add(nil, client.nickMaskString, "JOIN", channel.name, client.AccountName(), client.realname)
} else {
rb.Add(nil, client.nickMaskString, "JOIN", channel.name)
}
} else {
if member.capabilities.Has(caps.ExtendedJoin) {
member.Send(nil, client.nickMaskString, "JOIN", channel.name, client.AccountName(), client.realname)
} else {
member.Send(nil, client.nickMaskString, "JOIN", channel.name)
}
}
}
channel.stateMutex.Lock() func() {
channel.members.Add(client) account := client.Account()
firstJoin := len(channel.members) == 1 channel.stateMutex.Lock()
channel.stateMutex.Unlock() defer channel.stateMutex.Unlock()
channel.regenerateMembersCache(false)
channel.members.Add(client)
firstJoin := len(channel.members) == 1
newChannel = firstJoin && channel.registeredFounder == ""
if newChannel {
givenMode = modes.ChannelOperator
} else {
givenMode = channel.accountToUMode[account]
}
if givenMode != 0 {
channel.members[client].SetMode(givenMode, true)
}
}()
channel.regenerateMembersCache()
return
}()
client.addChannel(channel) client.addChannel(channel)
account := client.Account() nick := client.Nick()
nickmask := client.NickMaskString()
realname := client.Realname()
accountName := client.AccountName()
var modestr string
if givenMode != 0 {
modestr = fmt.Sprintf("+%v", givenMode)
}
// give channel mode if necessary for _, member := range channel.Members() {
channel.stateMutex.Lock() if member == client {
newChannel := firstJoin && channel.registeredFounder == "" continue
mode, persistentModeExists := channel.accountToUMode[account] }
var givenMode *modes.Mode if member.capabilities.Has(caps.ExtendedJoin) {
if persistentModeExists { member.Send(nil, nickmask, "JOIN", chname, accountName, realname)
givenMode = &mode } else {
} else if newChannel { member.Send(nil, nickmask, "JOIN", chname)
givenMode = &modes.ChannelOperator }
if givenMode != 0 {
member.Send(nil, client.server.name, "MODE", chname, modestr, nick)
}
} }
if givenMode != nil {
channel.members[client].SetMode(*givenMode, true)
}
channel.stateMutex.Unlock()
if client.capabilities.Has(caps.ExtendedJoin) { if client.capabilities.Has(caps.ExtendedJoin) {
rb.Add(nil, client.nickMaskString, "JOIN", channel.name, client.AccountName(), client.realname) rb.Add(nil, nickmask, "JOIN", chname, accountName, realname)
} else { } else {
rb.Add(nil, client.nickMaskString, "JOIN", channel.name) rb.Add(nil, nickmask, "JOIN", chname)
} }
// don't send topic when it's an entirely new channel // don't send topic when it's an entirely new channel
if !newChannel { if !newChannel {
channel.SendTopic(client, rb) channel.SendTopic(client, rb)
} }
channel.Names(client, rb) channel.Names(client, rb)
if givenMode != nil {
for _, member := range channel.Members() { if givenMode != 0 {
if member == client { rb.Add(nil, client.server.name, "MODE", chname, modestr, nick)
rb.Add(nil, client.server.name, "MODE", channel.name, fmt.Sprintf("+%v", *givenMode), client.nick)
} else {
member.Send(nil, client.server.name, "MODE", channel.name, fmt.Sprintf("+%v", *givenMode), client.nick)
}
}
} }
} }
// Part parts the given client from this channel, with the given message. // Part parts the given client from this channel, with the given message.
func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer) { func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer) {
chname := channel.Name()
if !channel.hasClient(client) { if !channel.hasClient(client) {
rb.Add(nil, client.server.name, ERR_NOTONCHANNEL, channel.name, client.t("You're not on that channel")) rb.Add(nil, client.server.name, ERR_NOTONCHANNEL, chname, client.t("You're not on that channel"))
return return
} }
for _, member := range channel.Members() {
if member == client {
rb.Add(nil, client.nickMaskString, "PART", channel.name, message)
} else {
member.Send(nil, client.nickMaskString, "PART", channel.name, message)
}
}
channel.Quit(client) channel.Quit(client)
client.server.logger.Debug("part", fmt.Sprintf("%s left channel %s", client.nick, channel.name)) nickmask := client.NickMaskString()
for _, member := range channel.Members() {
member.Send(nil, nickmask, "PART", chname, message)
}
rb.Add(nil, nickmask, "PART", chname, message)
client.server.logger.Debug("part", fmt.Sprintf("%s left channel %s", client.nick, chname))
} }
// SendTopic sends the channel topic to the given client. // SendTopic sends the channel topic to the given client.
@ -762,17 +747,22 @@ func (channel *Channel) applyModeMask(client *Client, mode modes.Mode, op modes.
// Quit removes the given client from the channel // Quit removes the given client from the channel
func (channel *Channel) Quit(client *Client) { func (channel *Channel) Quit(client *Client) {
channel.stateMutex.Lock() channelEmpty := func() bool {
channel.members.Remove(client) channel.joinPartMutex.Lock()
empty := len(channel.members) == 0 defer channel.joinPartMutex.Unlock()
channel.stateMutex.Unlock()
channel.regenerateMembersCache(false)
client.removeChannel(channel) channel.stateMutex.Lock()
channel.members.Remove(client)
channelEmpty := len(channel.members) == 0
channel.stateMutex.Unlock()
channel.regenerateMembersCache()
return channelEmpty
}()
if empty { if channelEmpty {
client.server.channels.Cleanup(channel) client.server.channels.Cleanup(channel)
} }
client.removeChannel(channel)
} }
func (channel *Channel) Kick(client *Client, target *Client, comment string, rb *ResponseBuffer) { func (channel *Channel) Kick(client *Client, target *Client, comment string, rb *ResponseBuffer) {

View File

@ -409,16 +409,20 @@ func (client *Client) TryResume() {
client.nick = oldClient.nick client.nick = oldClient.nick
client.updateNickMaskNoMutex() client.updateNickMaskNoMutex()
for channel := range oldClient.channels { rejoinChannel := func(channel *Channel) {
channel.stateMutex.Lock() channel.joinPartMutex.Lock()
defer channel.joinPartMutex.Unlock()
channel.stateMutex.Lock()
client.channels[channel] = true client.channels[channel] = true
client.resumeDetails.SendFakeJoinsFor = append(client.resumeDetails.SendFakeJoinsFor, channel.name) client.resumeDetails.SendFakeJoinsFor = append(client.resumeDetails.SendFakeJoinsFor, channel.name)
oldModeSet := channel.members[oldClient] oldModeSet := channel.members[oldClient]
channel.members.Remove(oldClient) channel.members.Remove(oldClient)
channel.members[client] = oldModeSet channel.members[client] = oldModeSet
channel.regenerateMembersCache(true) channel.stateMutex.Unlock()
channel.regenerateMembersCache()
// construct fake modestring if necessary // construct fake modestring if necessary
oldModes := oldModeSet.String() oldModes := oldModeSet.String()
@ -447,8 +451,10 @@ func (client *Client) TryResume() {
member.Send(nil, server.name, "MODE", params...) member.Send(nil, server.name, "MODE", params...)
} }
} }
}
channel.stateMutex.Unlock() for channel := range oldClient.channels {
rejoinChannel(channel)
} }
server.clients.byNick[oldnick] = client server.clients.byNick[oldnick] = client
@ -669,6 +675,11 @@ func (client *Client) destroy(beingResumed bool) {
return return
} }
// see #235: deduplicating the list of PART recipients uses (comparatively speaking)
// a lot of RAM, so limit concurrency to avoid thrashing
client.server.semaphores.ClientDestroy.Acquire()
defer client.server.semaphores.ClientDestroy.Release()
if beingResumed { if beingResumed {
client.server.logger.Debug("quit", fmt.Sprintf("%s is being resumed", client.nick)) client.server.logger.Debug("quit", fmt.Sprintf("%s is being resumed", client.nick))
} else { } else {
@ -678,8 +689,6 @@ func (client *Client) destroy(beingResumed bool) {
// send quit/error message to client if they haven't been sent already // send quit/error message to client if they haven't been sent already
client.Quit("Connection closed") client.Quit("Connection closed")
friends := client.Friends()
friends.Remove(client)
if !beingResumed { if !beingResumed {
client.server.whoWas.Append(client) client.server.whoWas.Append(client)
} }
@ -697,6 +706,7 @@ func (client *Client) destroy(beingResumed bool) {
client.server.monitorManager.RemoveAll(client) client.server.monitorManager.RemoveAll(client)
// clean up channels // clean up channels
friends := make(ClientSet)
for _, channel := range client.Channels() { for _, channel := range client.Channels() {
if !beingResumed { if !beingResumed {
channel.Quit(client) channel.Quit(client)
@ -705,6 +715,7 @@ func (client *Client) destroy(beingResumed bool) {
friends.Add(member) friends.Add(member)
} }
} }
friends.Remove(client)
// clean up server // clean up server
if !beingResumed { if !beingResumed {

81
irc/semaphores.go Normal file
View File

@ -0,0 +1,81 @@
// Copyright (c) 2018 Shivaram Lingamneni
package irc
import (
"log"
"runtime"
"runtime/debug"
)
// See #237 for context. Operations that might allocate large amounts of temporary
// garbage, or temporarily tie up some other resource, may cause thrashing unless
// their concurrency is artificially restricted. We use `chan bool` as a
// (regrettably, unary-encoded) counting semaphore to enforce these restrictions.
const (
// this is a tradeoff between exploiting CPU-level parallelism (higher values better)
// and not thrashing the allocator (lower values better). really this is all just
// guesswork. oragono *can* make use of cores beyond this limit --- just not for
// the protected operations.
MaxServerSemaphoreCapacity = 32
)
// Semaphore is a counting semaphore. Note that a capacity of n requires O(n) storage.
type Semaphore (chan bool)
// ServerSemaphores includes a named Semaphore corresponding to each concurrency-limited
// sever operation.
type ServerSemaphores struct {
// each distinct operation MUST have its own semaphore;
// methods that acquire a semaphore MUST NOT call methods that acquire another
ClientDestroy Semaphore
}
// NewServerSemaphores creates a new ServerSemaphores.
func NewServerSemaphores() (result *ServerSemaphores) {
capacity := runtime.NumCPU()
if capacity > MaxServerSemaphoreCapacity {
capacity = MaxServerSemaphoreCapacity
}
result = new(ServerSemaphores)
result.ClientDestroy.Initialize(capacity)
return
}
// Initialize initializes a semaphore to a given capacity.
func (semaphore *Semaphore) Initialize(capacity int) {
*semaphore = make(chan bool, capacity)
for i := 0; i < capacity; i++ {
(*semaphore) <- true
}
}
// Acquire acquires a semaphore, blocking if necessary.
func (semaphore *Semaphore) Acquire() {
<-(*semaphore)
}
// TryAcquire tries to acquire a semaphore, returning whether the acquire was
// successful. It never blocks.
func (semaphore *Semaphore) TryAcquire() (acquired bool) {
select {
case <-(*semaphore):
return true
default:
return false
}
}
// Release releases a semaphore. It never blocks. (This is not a license
// to program spurious releases.)
func (semaphore *Semaphore) Release() {
select {
case (*semaphore) <- true:
// good
default:
// spurious release
log.Printf("spurious semaphore release (full to capacity %d)", cap(*semaphore))
debug.PrintStack()
}
}

View File

@ -131,6 +131,7 @@ type Server struct {
webirc []webircConfig webirc []webircConfig
whoWas *WhoWasList whoWas *WhoWasList
stats *Stats stats *Stats
semaphores *ServerSemaphores
} }
var ( var (
@ -165,6 +166,7 @@ func NewServer(config *Config, logger *logger.Manager) (*Server, error) {
snomasks: NewSnoManager(), snomasks: NewSnoManager(),
whoWas: NewWhoWasList(config.Limits.WhowasEntries), whoWas: NewWhoWasList(config.Limits.WhowasEntries),
stats: NewStats(), stats: NewStats(),
semaphores: NewServerSemaphores(),
} }
if err := server.applyConfig(config, true); err != nil { if err := server.applyConfig(config, true); err != nil {

View File

@ -32,7 +32,7 @@ type Socket struct {
maxSendQBytes int maxSendQBytes int
// this is a trylock enforcing that only one goroutine can write to `conn` at a time // this is a trylock enforcing that only one goroutine can write to `conn` at a time
writerSlotOpen chan bool writerSemaphore Semaphore
buffer []byte buffer []byte
closed bool closed bool
@ -44,12 +44,11 @@ type Socket struct {
// NewSocket returns a new Socket. // NewSocket returns a new Socket.
func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) *Socket { func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) *Socket {
result := Socket{ result := Socket{
conn: conn, conn: conn,
reader: bufio.NewReaderSize(conn, maxReadQBytes), reader: bufio.NewReaderSize(conn, maxReadQBytes),
maxSendQBytes: maxSendQBytes, maxSendQBytes: maxSendQBytes,
writerSlotOpen: make(chan bool, 1),
} }
result.writerSlotOpen <- true result.writerSemaphore.Initialize(1)
return &result return &result
} }
@ -140,14 +139,11 @@ func (socket *Socket) Write(data string) (err error) {
// wakeWriter starts the goroutine that actually performs the write, without blocking // wakeWriter starts the goroutine that actually performs the write, without blocking
func (socket *Socket) wakeWriter() { func (socket *Socket) wakeWriter() {
// attempt to acquire the trylock if socket.writerSemaphore.TryAcquire() {
select {
case <-socket.writerSlotOpen:
// acquired the trylock; send() will release it // acquired the trylock; send() will release it
go socket.send() go socket.send()
default:
// failed to acquire; the holder will check for more data after releasing it
} }
// else: do nothing, the holder will check for more data after releasing it
} }
// SetFinalData sets the final data to send when the SocketWriter closes. // SetFinalData sets the final data to send when the SocketWriter closes.
@ -179,19 +175,17 @@ func (socket *Socket) send() {
socket.performWrite() socket.performWrite()
// surrender the trylock, avoiding a race where a write comes in after we've // surrender the trylock, avoiding a race where a write comes in after we've
// checked readyToWrite() and it returned false, but while we still hold the trylock: // checked readyToWrite() and it returned false, but while we still hold the trylock:
socket.writerSlotOpen <- true socket.writerSemaphore.Release()
// check if more data came in while we held the trylock: // check if more data came in while we held the trylock:
if !socket.readyToWrite() { if !socket.readyToWrite() {
return return
} }
select { if !socket.writerSemaphore.TryAcquire() {
case <-socket.writerSlotOpen:
// got the trylock, loop back around and write
default:
// failed to acquire; exit and wait for the holder to observe readyToWrite() // failed to acquire; exit and wait for the holder to observe readyToWrite()
// after releasing it // after releasing it
return return
} }
// got the lock again, loop back around and write
} }
} }