diff --git a/irc/channel.go b/irc/channel.go index 753f1220..b231ff46 100644 --- a/irc/channel.go +++ b/irc/channel.go @@ -779,6 +779,9 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp modestr = fmt.Sprintf("+%v", givenMode) } + // cache the most common case (JOIN without extended-join) + var cache MessageCache + cache.Initialize(channel.server, message.Time, message.Msgid, details.nickMask, details.accountName, nil, "JOIN", chname) isAway, awayMessage := client.Away() for _, member := range channel.Members() { if respectAuditorium { @@ -799,7 +802,7 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp if session.capabilities.Has(caps.ExtendedJoin) { session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, nil, "JOIN", chname, details.accountName, details.realname) } else { - session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, nil, "JOIN", chname) + cache.Send(session) } if givenMode != 0 { session.Send(nil, client.server.name, "MODE", chname, modestr, details.nick) @@ -933,6 +936,8 @@ func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer) } respectAuditorium := channel.flags.HasMode(modes.Auditorium) && clientModes.HighestChannelUserMode() == modes.Mode(0) + var cache MessageCache + cache.Initialize(channel.server, splitMessage.Time, splitMessage.Msgid, details.nickMask, details.accountName, nil, "PART", params...) for _, member := range channel.Members() { if respectAuditorium { channel.stateMutex.RLock() @@ -942,7 +947,9 @@ func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer) continue } } - member.sendFromClientInternal(false, splitMessage.Time, splitMessage.Msgid, details.nickMask, details.accountName, nil, "PART", params...) + for _, session := range member.Sessions() { + cache.Send(session) + } } rb.AddFromClient(splitMessage.Time, splitMessage.Msgid, details.nickMask, details.accountName, nil, "PART", params...) for _, session := range client.Sessions() { @@ -1322,6 +1329,8 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod // send echo-message rb.addEchoMessage(clientOnlyTags, details.nickMask, details.accountName, command, chname, message) + var cache MessageCache + cache.InitializeSplitMessage(channel.server, details.nickMask, details.accountName, clientOnlyTags, command, chname, message) for _, member := range channel.Members() { if minPrefixMode != modes.Mode(0) && !channel.ClientIsAtLeast(member, minPrefixMode) { // STATUSMSG or OpModerated @@ -1337,18 +1346,7 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod continue // #753 } - var tagsToUse map[string]string - if session.capabilities.Has(caps.MessageTags) { - tagsToUse = clientOnlyTags - } else if histType == history.Tagmsg { - continue - } - - if histType == history.Tagmsg { - session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, tagsToUse, command, chname) - } else { - session.sendSplitMsgFromClientInternal(false, details.nickMask, details.accountName, tagsToUse, command, chname, message) - } + cache.Send(session) } } diff --git a/irc/client.go b/irc/client.go index 693d14b8..a6fa6096 100644 --- a/irc/client.go +++ b/irc/client.go @@ -1551,8 +1551,12 @@ func (client *Client) destroy(session *Session) { if quitMessage == "" { quitMessage = "Exited" } + var cache MessageCache + cache.Initialize(client.server, splitQuitMessage.Time, splitQuitMessage.Msgid, details.nickMask, details.accountName, nil, "QUIT", quitMessage) for friend := range friends { - friend.sendFromClientInternal(false, splitQuitMessage.Time, splitQuitMessage.Msgid, details.nickMask, details.accountName, nil, "QUIT", quitMessage) + for _, session := range friend.Sessions() { + cache.Send(session) + } } if registered { @@ -1567,7 +1571,7 @@ func (session *Session) sendSplitMsgFromClientInternal(blocking bool, nickmask, session.sendFromClientInternal(blocking, message.Time, message.Msgid, nickmask, accountName, tags, command, target, message.Message) } else { if session.capabilities.Has(caps.Multiline) { - for _, msg := range session.composeMultilineBatch(nickmask, accountName, tags, command, target, message) { + for _, msg := range composeMultilineBatch(session.generateBatchID(), nickmask, accountName, tags, command, target, message) { session.SendRawMessage(msg, blocking) } } else { @@ -1614,12 +1618,11 @@ func (session *Session) sendFromClientInternal(blocking bool, serverTime time.Ti return session.SendRawMessage(msg, blocking) } -func (session *Session) composeMultilineBatch(fromNickMask, fromAccount string, tags map[string]string, command, target string, message utils.SplitMessage) (result []ircmsg.IrcMessage) { - batchID := session.generateBatchID() +func composeMultilineBatch(batchID, fromNickMask, fromAccount string, tags map[string]string, command, target string, message utils.SplitMessage) (result []ircmsg.IrcMessage) { batchStart := ircmsg.MakeMessage(tags, fromNickMask, "BATCH", "+"+batchID, caps.MultilineBatchType, target) batchStart.SetTag("time", message.Time.Format(IRCv3TimestampFormat)) batchStart.SetTag("msgid", message.Msgid) - if session.capabilities.Has(caps.AccountTag) && fromAccount != "*" { + if fromAccount != "*" { batchStart.SetTag("account", fromAccount) } result = append(result, batchStart) @@ -1680,6 +1683,10 @@ func (session *Session) SendRawMessage(message ircmsg.IrcMessage, blocking bool) return err } + return session.sendBytes(line, blocking) +} + +func (session *Session) sendBytes(line []byte, blocking bool) (err error) { if session.client.server.logger.IsLoggingRawIO() { logline := string(line[:len(line)-2]) // strip "\r\n" session.client.server.logger.Debug("useroutput", session.client.Nick(), " ->", logline) diff --git a/irc/message_cache.go b/irc/message_cache.go new file mode 100644 index 00000000..3bc59a10 --- /dev/null +++ b/irc/message_cache.go @@ -0,0 +1,197 @@ +// Copyright (c) 2020 Shivaram Lingamneni +// released under the MIT license + +package irc + +import ( + "time" + + "github.com/goshuirc/irc-go/ircmsg" + + "github.com/oragono/oragono/irc/caps" + "github.com/oragono/oragono/irc/utils" +) + +// MessageCache caches serialized IRC messages. +// First call Initialize or InitializeSplitMessage, which records +// the parameters and builds the cache. Then call Send, which will +// either send a cached version of the message or dispatch to another +// send routine that can synthesize the necessary version on the fly. +type MessageCache struct { + // these cache a single-line message (e.g., JOIN, or PRIVMSG with a 512-byte message) + // one version is "plain" (legacy clients with no tags) and one is "full" (client has + // the message-tags cap) + plain []byte + fullTags []byte + // these cache a multiline message (a PRIVMSG that was sent as a multiline batch) + // one version is "plain" (legacy clients with no tags) and one is "full" (client has + // the multiline cap) + plainMultiline [][]byte + fullTagsMultiline [][]byte + + time time.Time + msgid string + accountName string + tags map[string]string + source string + command string + + params []string + + target string + splitMessage utils.SplitMessage +} + +func addAllTags(msg *ircmsg.IrcMessage, tags map[string]string, serverTime time.Time, msgid, accountName string) { + msg.UpdateTags(tags) + msg.SetTag("time", serverTime.Format(IRCv3TimestampFormat)) + if accountName != "*" { + msg.SetTag("account", accountName) + } + if msgid != "" { + msg.SetTag("msgid", msgid) + } +} + +func (m *MessageCache) handleErr(server *Server, err error) bool { + if err != nil { + server.logger.Error("internal", "Error assembling message for sending", err.Error()) + // blank these out so Send will be a no-op + m.fullTags = nil + m.fullTagsMultiline = nil + return true + } + return false +} + +func (m *MessageCache) Initialize(server *Server, serverTime time.Time, msgid string, nickmask, accountName string, tags map[string]string, command string, params ...string) (err error) { + m.time = serverTime + m.msgid = msgid + m.source = nickmask + m.accountName = accountName + m.tags = tags + m.command = command + m.params = params + + var msg ircmsg.IrcMessage + config := server.Config() + if config.Server.Compatibility.forceTrailing && commandsThatMustUseTrailing[command] { + msg.ForceTrailing() + } + msg.Prefix = nickmask + msg.Command = command + msg.Params = make([]string, len(params)) + copy(msg.Params, params) + m.plain, err = msg.LineBytesStrict(false, MaxLineLen) + if m.handleErr(server, err) { + return + } + + addAllTags(&msg, tags, serverTime, msgid, accountName) + m.fullTags, err = msg.LineBytesStrict(false, MaxLineLen) + if m.handleErr(server, err) { + return + } + return +} + +func (m *MessageCache) InitializeSplitMessage(server *Server, nickmask, accountName string, tags map[string]string, command, target string, message utils.SplitMessage) (err error) { + m.time = message.Time + m.msgid = message.Msgid + m.source = nickmask + m.accountName = accountName + m.tags = tags + m.command = command + m.target = target + m.splitMessage = message + + config := server.Config() + forceTrailing := config.Server.Compatibility.forceTrailing && commandsThatMustUseTrailing[command] + + if message.Is512() { + isTagmsg := command == "TAGMSG" + var msg ircmsg.IrcMessage + if forceTrailing { + msg.ForceTrailing() + } + + msg.Prefix = nickmask + msg.Command = command + if isTagmsg { + msg.Params = []string{target} + } else { + msg.Params = []string{target, message.Message} + } + m.params = msg.Params + if !isTagmsg { + m.plain, err = msg.LineBytesStrict(false, MaxLineLen) + if m.handleErr(server, err) { + return + } + } + + addAllTags(&msg, tags, message.Time, message.Msgid, accountName) + m.fullTags, err = msg.LineBytesStrict(false, MaxLineLen) + if m.handleErr(server, err) { + return + } + } else { + var msg ircmsg.IrcMessage + if forceTrailing { + msg.ForceTrailing() + } + msg.Prefix = nickmask + msg.Command = command + msg.Params = make([]string, 2) + msg.Params[0] = target + m.plainMultiline = make([][]byte, len(message.Split)) + for i, pair := range message.Split { + msg.Params[1] = pair.Message + m.plainMultiline[i], err = msg.LineBytesStrict(false, MaxLineLen) + if m.handleErr(server, err) { + return + } + } + + // we need to send the same batch ID to all recipient sessions; + // use a uuidv4-alike to ensure that it won't collide + batch := composeMultilineBatch(utils.GenerateSecretToken(), nickmask, accountName, tags, command, target, message) + m.fullTagsMultiline = make([][]byte, len(batch)) + for i, msg := range batch { + if forceTrailing { + msg.ForceTrailing() + } + m.fullTagsMultiline[i], err = msg.LineBytesStrict(false, MaxLineLen) + if m.handleErr(server, err) { + return + } + } + } + return +} + +func (m *MessageCache) Send(session *Session) { + if m.fullTags != nil { + if session.capabilities.Has(caps.MessageTags) { + session.sendBytes(m.fullTags, false) + } else if !(session.capabilities.Has(caps.ServerTime) || session.capabilities.Has(caps.AccountTag)) { + if m.plain != nil { + session.sendBytes(m.plain, false) + } + } else { + session.sendFromClientInternal(false, m.time, m.msgid, m.source, m.accountName, nil, m.command, m.params...) + } + } else if m.fullTagsMultiline != nil { + if session.capabilities.Has(caps.Multiline) { + for _, line := range m.fullTagsMultiline { + session.sendBytes(line, false) + } + } else if !(session.capabilities.Has(caps.ServerTime) || session.capabilities.Has(caps.AccountTag)) { + for _, line := range m.plainMultiline { + session.sendBytes(line, false) + } + } else { + session.sendSplitMsgFromClientInternal(false, m.source, m.accountName, m.tags, m.command, m.target, m.splitMessage) + } + } +} diff --git a/irc/responsebuffer.go b/irc/responsebuffer.go index ab1fa551..f981d618 100644 --- a/irc/responsebuffer.go +++ b/irc/responsebuffer.go @@ -127,7 +127,7 @@ func (rb *ResponseBuffer) AddSplitMessageFromClient(fromNickMask string, fromAcc } } else { if rb.session.capabilities.Has(caps.Multiline) { - batch := rb.session.composeMultilineBatch(fromNickMask, fromAccount, tags, command, target, message) + batch := composeMultilineBatch(rb.session.generateBatchID(), fromNickMask, fromAccount, tags, command, target, message) rb.setNestedBatchTag(&batch[0]) rb.setNestedBatchTag(&batch[len(batch)-1]) rb.messages = append(rb.messages, batch...) diff --git a/irc/utils/bitset.go b/irc/utils/bitset.go index cd73377e..2e478a69 100644 --- a/irc/utils/bitset.go +++ b/irc/utils/bitset.go @@ -17,6 +17,14 @@ func BitsetGet(set []uint32, position uint) bool { return (block & (1 << bit)) != 0 } +// BitsetGetLocal returns whether a given bit of the bitset is set, +// without synchronization. +func BitsetGetLocal(set []uint32, position uint) bool { + idx := position / 32 + bit := position % 32 + return (set[idx] & (1 << bit)) != 0 +} + // BitsetSet sets a given bit of the bitset to 0 or 1, returning whether it changed. func BitsetSet(set []uint32, position uint, on bool) (changed bool) { idx := position / 32 @@ -79,6 +87,15 @@ func BitsetCopy(set []uint32, other []uint32) { } } +// BitsetCopyLocal copies the contents of `other` over `set`, +// without synchronizing the writes to `set`. +func BitsetCopyLocal(set []uint32, other []uint32) { + for i := 0; i < len(set); i++ { + data := atomic.LoadUint32(&other[i]) + set[i] = data + } +} + // BitsetSubtract modifies `set` to subtract the contents of `other`. // Similar caveats about race conditions as with `BitsetUnion` apply. func BitsetSubtract(set []uint32, other []uint32) { diff --git a/irc/utils/bitset_test.go b/irc/utils/bitset_test.go index 8d8f1e2a..dc50cccd 100644 --- a/irc/utils/bitset_test.go +++ b/irc/utils/bitset_test.go @@ -80,4 +80,14 @@ func TestSets(t *testing.T) { if !BitsetGet(t3s, 0) || BitsetGet(t3s, 72) || !BitsetGet(t3s, 74) || BitsetGet(t3s, 71) { t.Error("subtract doesn't work") } + + var tlocal testBitset + tlocals := tlocal[:] + BitsetCopyLocal(tlocals, t1s) + for i = 0; i < 128; i++ { + expected := (i != 72) + if BitsetGetLocal(tlocals, i) != expected { + t.Error("all bits should be set except 72") + } + } }