mirror of
https://github.com/ergochat/ergo.git
synced 2025-01-10 12:12:37 +01:00
Merge pull request #1400 from slingamn/issue1387_messagecaching.4
fix #1387
This commit is contained in:
commit
0fcaf778e0
@ -779,6 +779,9 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp
|
|||||||
modestr = fmt.Sprintf("+%v", givenMode)
|
modestr = fmt.Sprintf("+%v", givenMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cache the most common case (JOIN without extended-join)
|
||||||
|
var cache MessageCache
|
||||||
|
cache.Initialize(channel.server, message.Time, message.Msgid, details.nickMask, details.accountName, nil, "JOIN", chname)
|
||||||
isAway, awayMessage := client.Away()
|
isAway, awayMessage := client.Away()
|
||||||
for _, member := range channel.Members() {
|
for _, member := range channel.Members() {
|
||||||
if respectAuditorium {
|
if respectAuditorium {
|
||||||
@ -799,7 +802,7 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp
|
|||||||
if session.capabilities.Has(caps.ExtendedJoin) {
|
if session.capabilities.Has(caps.ExtendedJoin) {
|
||||||
session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, nil, "JOIN", chname, details.accountName, details.realname)
|
session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, nil, "JOIN", chname, details.accountName, details.realname)
|
||||||
} else {
|
} else {
|
||||||
session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, nil, "JOIN", chname)
|
cache.Send(session)
|
||||||
}
|
}
|
||||||
if givenMode != 0 {
|
if givenMode != 0 {
|
||||||
session.Send(nil, client.server.name, "MODE", chname, modestr, details.nick)
|
session.Send(nil, client.server.name, "MODE", chname, modestr, details.nick)
|
||||||
@ -933,6 +936,8 @@ func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer)
|
|||||||
}
|
}
|
||||||
respectAuditorium := channel.flags.HasMode(modes.Auditorium) &&
|
respectAuditorium := channel.flags.HasMode(modes.Auditorium) &&
|
||||||
clientModes.HighestChannelUserMode() == modes.Mode(0)
|
clientModes.HighestChannelUserMode() == modes.Mode(0)
|
||||||
|
var cache MessageCache
|
||||||
|
cache.Initialize(channel.server, splitMessage.Time, splitMessage.Msgid, details.nickMask, details.accountName, nil, "PART", params...)
|
||||||
for _, member := range channel.Members() {
|
for _, member := range channel.Members() {
|
||||||
if respectAuditorium {
|
if respectAuditorium {
|
||||||
channel.stateMutex.RLock()
|
channel.stateMutex.RLock()
|
||||||
@ -942,7 +947,9 @@ func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer)
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
member.sendFromClientInternal(false, splitMessage.Time, splitMessage.Msgid, details.nickMask, details.accountName, nil, "PART", params...)
|
for _, session := range member.Sessions() {
|
||||||
|
cache.Send(session)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rb.AddFromClient(splitMessage.Time, splitMessage.Msgid, details.nickMask, details.accountName, nil, "PART", params...)
|
rb.AddFromClient(splitMessage.Time, splitMessage.Msgid, details.nickMask, details.accountName, nil, "PART", params...)
|
||||||
for _, session := range client.Sessions() {
|
for _, session := range client.Sessions() {
|
||||||
@ -1322,6 +1329,8 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod
|
|||||||
// send echo-message
|
// send echo-message
|
||||||
rb.addEchoMessage(clientOnlyTags, details.nickMask, details.accountName, command, chname, message)
|
rb.addEchoMessage(clientOnlyTags, details.nickMask, details.accountName, command, chname, message)
|
||||||
|
|
||||||
|
var cache MessageCache
|
||||||
|
cache.InitializeSplitMessage(channel.server, details.nickMask, details.accountName, clientOnlyTags, command, chname, message)
|
||||||
for _, member := range channel.Members() {
|
for _, member := range channel.Members() {
|
||||||
if minPrefixMode != modes.Mode(0) && !channel.ClientIsAtLeast(member, minPrefixMode) {
|
if minPrefixMode != modes.Mode(0) && !channel.ClientIsAtLeast(member, minPrefixMode) {
|
||||||
// STATUSMSG or OpModerated
|
// STATUSMSG or OpModerated
|
||||||
@ -1337,18 +1346,7 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod
|
|||||||
continue // #753
|
continue // #753
|
||||||
}
|
}
|
||||||
|
|
||||||
var tagsToUse map[string]string
|
cache.Send(session)
|
||||||
if session.capabilities.Has(caps.MessageTags) {
|
|
||||||
tagsToUse = clientOnlyTags
|
|
||||||
} else if histType == history.Tagmsg {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if histType == history.Tagmsg {
|
|
||||||
session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, tagsToUse, command, chname)
|
|
||||||
} else {
|
|
||||||
session.sendSplitMsgFromClientInternal(false, details.nickMask, details.accountName, tagsToUse, command, chname, message)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1551,8 +1551,12 @@ func (client *Client) destroy(session *Session) {
|
|||||||
if quitMessage == "" {
|
if quitMessage == "" {
|
||||||
quitMessage = "Exited"
|
quitMessage = "Exited"
|
||||||
}
|
}
|
||||||
|
var cache MessageCache
|
||||||
|
cache.Initialize(client.server, splitQuitMessage.Time, splitQuitMessage.Msgid, details.nickMask, details.accountName, nil, "QUIT", quitMessage)
|
||||||
for friend := range friends {
|
for friend := range friends {
|
||||||
friend.sendFromClientInternal(false, splitQuitMessage.Time, splitQuitMessage.Msgid, details.nickMask, details.accountName, nil, "QUIT", quitMessage)
|
for _, session := range friend.Sessions() {
|
||||||
|
cache.Send(session)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if registered {
|
if registered {
|
||||||
@ -1567,7 +1571,7 @@ func (session *Session) sendSplitMsgFromClientInternal(blocking bool, nickmask,
|
|||||||
session.sendFromClientInternal(blocking, message.Time, message.Msgid, nickmask, accountName, tags, command, target, message.Message)
|
session.sendFromClientInternal(blocking, message.Time, message.Msgid, nickmask, accountName, tags, command, target, message.Message)
|
||||||
} else {
|
} else {
|
||||||
if session.capabilities.Has(caps.Multiline) {
|
if session.capabilities.Has(caps.Multiline) {
|
||||||
for _, msg := range session.composeMultilineBatch(nickmask, accountName, tags, command, target, message) {
|
for _, msg := range composeMultilineBatch(session.generateBatchID(), nickmask, accountName, tags, command, target, message) {
|
||||||
session.SendRawMessage(msg, blocking)
|
session.SendRawMessage(msg, blocking)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1614,12 +1618,11 @@ func (session *Session) sendFromClientInternal(blocking bool, serverTime time.Ti
|
|||||||
return session.SendRawMessage(msg, blocking)
|
return session.SendRawMessage(msg, blocking)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (session *Session) composeMultilineBatch(fromNickMask, fromAccount string, tags map[string]string, command, target string, message utils.SplitMessage) (result []ircmsg.IrcMessage) {
|
func composeMultilineBatch(batchID, fromNickMask, fromAccount string, tags map[string]string, command, target string, message utils.SplitMessage) (result []ircmsg.IrcMessage) {
|
||||||
batchID := session.generateBatchID()
|
|
||||||
batchStart := ircmsg.MakeMessage(tags, fromNickMask, "BATCH", "+"+batchID, caps.MultilineBatchType, target)
|
batchStart := ircmsg.MakeMessage(tags, fromNickMask, "BATCH", "+"+batchID, caps.MultilineBatchType, target)
|
||||||
batchStart.SetTag("time", message.Time.Format(IRCv3TimestampFormat))
|
batchStart.SetTag("time", message.Time.Format(IRCv3TimestampFormat))
|
||||||
batchStart.SetTag("msgid", message.Msgid)
|
batchStart.SetTag("msgid", message.Msgid)
|
||||||
if session.capabilities.Has(caps.AccountTag) && fromAccount != "*" {
|
if fromAccount != "*" {
|
||||||
batchStart.SetTag("account", fromAccount)
|
batchStart.SetTag("account", fromAccount)
|
||||||
}
|
}
|
||||||
result = append(result, batchStart)
|
result = append(result, batchStart)
|
||||||
@ -1680,6 +1683,10 @@ func (session *Session) SendRawMessage(message ircmsg.IrcMessage, blocking bool)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return session.sendBytes(line, blocking)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (session *Session) sendBytes(line []byte, blocking bool) (err error) {
|
||||||
if session.client.server.logger.IsLoggingRawIO() {
|
if session.client.server.logger.IsLoggingRawIO() {
|
||||||
logline := string(line[:len(line)-2]) // strip "\r\n"
|
logline := string(line[:len(line)-2]) // strip "\r\n"
|
||||||
session.client.server.logger.Debug("useroutput", session.client.Nick(), " ->", logline)
|
session.client.server.logger.Debug("useroutput", session.client.Nick(), " ->", logline)
|
||||||
|
197
irc/message_cache.go
Normal file
197
irc/message_cache.go
Normal file
@ -0,0 +1,197 @@
|
|||||||
|
// Copyright (c) 2020 Shivaram Lingamneni <slingamn@cs.stanford.edu>
|
||||||
|
// released under the MIT license
|
||||||
|
|
||||||
|
package irc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/goshuirc/irc-go/ircmsg"
|
||||||
|
|
||||||
|
"github.com/oragono/oragono/irc/caps"
|
||||||
|
"github.com/oragono/oragono/irc/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MessageCache caches serialized IRC messages.
|
||||||
|
// First call Initialize or InitializeSplitMessage, which records
|
||||||
|
// the parameters and builds the cache. Then call Send, which will
|
||||||
|
// either send a cached version of the message or dispatch to another
|
||||||
|
// send routine that can synthesize the necessary version on the fly.
|
||||||
|
type MessageCache struct {
|
||||||
|
// these cache a single-line message (e.g., JOIN, or PRIVMSG with a 512-byte message)
|
||||||
|
// one version is "plain" (legacy clients with no tags) and one is "full" (client has
|
||||||
|
// the message-tags cap)
|
||||||
|
plain []byte
|
||||||
|
fullTags []byte
|
||||||
|
// these cache a multiline message (a PRIVMSG that was sent as a multiline batch)
|
||||||
|
// one version is "plain" (legacy clients with no tags) and one is "full" (client has
|
||||||
|
// the multiline cap)
|
||||||
|
plainMultiline [][]byte
|
||||||
|
fullTagsMultiline [][]byte
|
||||||
|
|
||||||
|
time time.Time
|
||||||
|
msgid string
|
||||||
|
accountName string
|
||||||
|
tags map[string]string
|
||||||
|
source string
|
||||||
|
command string
|
||||||
|
|
||||||
|
params []string
|
||||||
|
|
||||||
|
target string
|
||||||
|
splitMessage utils.SplitMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
func addAllTags(msg *ircmsg.IrcMessage, tags map[string]string, serverTime time.Time, msgid, accountName string) {
|
||||||
|
msg.UpdateTags(tags)
|
||||||
|
msg.SetTag("time", serverTime.Format(IRCv3TimestampFormat))
|
||||||
|
if accountName != "*" {
|
||||||
|
msg.SetTag("account", accountName)
|
||||||
|
}
|
||||||
|
if msgid != "" {
|
||||||
|
msg.SetTag("msgid", msgid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MessageCache) handleErr(server *Server, err error) bool {
|
||||||
|
if err != nil {
|
||||||
|
server.logger.Error("internal", "Error assembling message for sending", err.Error())
|
||||||
|
// blank these out so Send will be a no-op
|
||||||
|
m.fullTags = nil
|
||||||
|
m.fullTagsMultiline = nil
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MessageCache) Initialize(server *Server, serverTime time.Time, msgid string, nickmask, accountName string, tags map[string]string, command string, params ...string) (err error) {
|
||||||
|
m.time = serverTime
|
||||||
|
m.msgid = msgid
|
||||||
|
m.source = nickmask
|
||||||
|
m.accountName = accountName
|
||||||
|
m.tags = tags
|
||||||
|
m.command = command
|
||||||
|
m.params = params
|
||||||
|
|
||||||
|
var msg ircmsg.IrcMessage
|
||||||
|
config := server.Config()
|
||||||
|
if config.Server.Compatibility.forceTrailing && commandsThatMustUseTrailing[command] {
|
||||||
|
msg.ForceTrailing()
|
||||||
|
}
|
||||||
|
msg.Prefix = nickmask
|
||||||
|
msg.Command = command
|
||||||
|
msg.Params = make([]string, len(params))
|
||||||
|
copy(msg.Params, params)
|
||||||
|
m.plain, err = msg.LineBytesStrict(false, MaxLineLen)
|
||||||
|
if m.handleErr(server, err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
addAllTags(&msg, tags, serverTime, msgid, accountName)
|
||||||
|
m.fullTags, err = msg.LineBytesStrict(false, MaxLineLen)
|
||||||
|
if m.handleErr(server, err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MessageCache) InitializeSplitMessage(server *Server, nickmask, accountName string, tags map[string]string, command, target string, message utils.SplitMessage) (err error) {
|
||||||
|
m.time = message.Time
|
||||||
|
m.msgid = message.Msgid
|
||||||
|
m.source = nickmask
|
||||||
|
m.accountName = accountName
|
||||||
|
m.tags = tags
|
||||||
|
m.command = command
|
||||||
|
m.target = target
|
||||||
|
m.splitMessage = message
|
||||||
|
|
||||||
|
config := server.Config()
|
||||||
|
forceTrailing := config.Server.Compatibility.forceTrailing && commandsThatMustUseTrailing[command]
|
||||||
|
|
||||||
|
if message.Is512() {
|
||||||
|
isTagmsg := command == "TAGMSG"
|
||||||
|
var msg ircmsg.IrcMessage
|
||||||
|
if forceTrailing {
|
||||||
|
msg.ForceTrailing()
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.Prefix = nickmask
|
||||||
|
msg.Command = command
|
||||||
|
if isTagmsg {
|
||||||
|
msg.Params = []string{target}
|
||||||
|
} else {
|
||||||
|
msg.Params = []string{target, message.Message}
|
||||||
|
}
|
||||||
|
m.params = msg.Params
|
||||||
|
if !isTagmsg {
|
||||||
|
m.plain, err = msg.LineBytesStrict(false, MaxLineLen)
|
||||||
|
if m.handleErr(server, err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addAllTags(&msg, tags, message.Time, message.Msgid, accountName)
|
||||||
|
m.fullTags, err = msg.LineBytesStrict(false, MaxLineLen)
|
||||||
|
if m.handleErr(server, err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var msg ircmsg.IrcMessage
|
||||||
|
if forceTrailing {
|
||||||
|
msg.ForceTrailing()
|
||||||
|
}
|
||||||
|
msg.Prefix = nickmask
|
||||||
|
msg.Command = command
|
||||||
|
msg.Params = make([]string, 2)
|
||||||
|
msg.Params[0] = target
|
||||||
|
m.plainMultiline = make([][]byte, len(message.Split))
|
||||||
|
for i, pair := range message.Split {
|
||||||
|
msg.Params[1] = pair.Message
|
||||||
|
m.plainMultiline[i], err = msg.LineBytesStrict(false, MaxLineLen)
|
||||||
|
if m.handleErr(server, err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we need to send the same batch ID to all recipient sessions;
|
||||||
|
// use a uuidv4-alike to ensure that it won't collide
|
||||||
|
batch := composeMultilineBatch(utils.GenerateSecretToken(), nickmask, accountName, tags, command, target, message)
|
||||||
|
m.fullTagsMultiline = make([][]byte, len(batch))
|
||||||
|
for i, msg := range batch {
|
||||||
|
if forceTrailing {
|
||||||
|
msg.ForceTrailing()
|
||||||
|
}
|
||||||
|
m.fullTagsMultiline[i], err = msg.LineBytesStrict(false, MaxLineLen)
|
||||||
|
if m.handleErr(server, err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MessageCache) Send(session *Session) {
|
||||||
|
if m.fullTags != nil {
|
||||||
|
if session.capabilities.Has(caps.MessageTags) {
|
||||||
|
session.sendBytes(m.fullTags, false)
|
||||||
|
} else if !(session.capabilities.Has(caps.ServerTime) || session.capabilities.Has(caps.AccountTag)) {
|
||||||
|
if m.plain != nil {
|
||||||
|
session.sendBytes(m.plain, false)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
session.sendFromClientInternal(false, m.time, m.msgid, m.source, m.accountName, nil, m.command, m.params...)
|
||||||
|
}
|
||||||
|
} else if m.fullTagsMultiline != nil {
|
||||||
|
if session.capabilities.Has(caps.Multiline) {
|
||||||
|
for _, line := range m.fullTagsMultiline {
|
||||||
|
session.sendBytes(line, false)
|
||||||
|
}
|
||||||
|
} else if !(session.capabilities.Has(caps.ServerTime) || session.capabilities.Has(caps.AccountTag)) {
|
||||||
|
for _, line := range m.plainMultiline {
|
||||||
|
session.sendBytes(line, false)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
session.sendSplitMsgFromClientInternal(false, m.source, m.accountName, m.tags, m.command, m.target, m.splitMessage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -127,7 +127,7 @@ func (rb *ResponseBuffer) AddSplitMessageFromClient(fromNickMask string, fromAcc
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if rb.session.capabilities.Has(caps.Multiline) {
|
if rb.session.capabilities.Has(caps.Multiline) {
|
||||||
batch := rb.session.composeMultilineBatch(fromNickMask, fromAccount, tags, command, target, message)
|
batch := composeMultilineBatch(rb.session.generateBatchID(), fromNickMask, fromAccount, tags, command, target, message)
|
||||||
rb.setNestedBatchTag(&batch[0])
|
rb.setNestedBatchTag(&batch[0])
|
||||||
rb.setNestedBatchTag(&batch[len(batch)-1])
|
rb.setNestedBatchTag(&batch[len(batch)-1])
|
||||||
rb.messages = append(rb.messages, batch...)
|
rb.messages = append(rb.messages, batch...)
|
||||||
|
@ -17,6 +17,14 @@ func BitsetGet(set []uint32, position uint) bool {
|
|||||||
return (block & (1 << bit)) != 0
|
return (block & (1 << bit)) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BitsetGetLocal returns whether a given bit of the bitset is set,
|
||||||
|
// without synchronization.
|
||||||
|
func BitsetGetLocal(set []uint32, position uint) bool {
|
||||||
|
idx := position / 32
|
||||||
|
bit := position % 32
|
||||||
|
return (set[idx] & (1 << bit)) != 0
|
||||||
|
}
|
||||||
|
|
||||||
// BitsetSet sets a given bit of the bitset to 0 or 1, returning whether it changed.
|
// BitsetSet sets a given bit of the bitset to 0 or 1, returning whether it changed.
|
||||||
func BitsetSet(set []uint32, position uint, on bool) (changed bool) {
|
func BitsetSet(set []uint32, position uint, on bool) (changed bool) {
|
||||||
idx := position / 32
|
idx := position / 32
|
||||||
@ -79,6 +87,15 @@ func BitsetCopy(set []uint32, other []uint32) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BitsetCopyLocal copies the contents of `other` over `set`,
|
||||||
|
// without synchronizing the writes to `set`.
|
||||||
|
func BitsetCopyLocal(set []uint32, other []uint32) {
|
||||||
|
for i := 0; i < len(set); i++ {
|
||||||
|
data := atomic.LoadUint32(&other[i])
|
||||||
|
set[i] = data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// BitsetSubtract modifies `set` to subtract the contents of `other`.
|
// BitsetSubtract modifies `set` to subtract the contents of `other`.
|
||||||
// Similar caveats about race conditions as with `BitsetUnion` apply.
|
// Similar caveats about race conditions as with `BitsetUnion` apply.
|
||||||
func BitsetSubtract(set []uint32, other []uint32) {
|
func BitsetSubtract(set []uint32, other []uint32) {
|
||||||
|
@ -80,4 +80,14 @@ func TestSets(t *testing.T) {
|
|||||||
if !BitsetGet(t3s, 0) || BitsetGet(t3s, 72) || !BitsetGet(t3s, 74) || BitsetGet(t3s, 71) {
|
if !BitsetGet(t3s, 0) || BitsetGet(t3s, 72) || !BitsetGet(t3s, 74) || BitsetGet(t3s, 71) {
|
||||||
t.Error("subtract doesn't work")
|
t.Error("subtract doesn't work")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var tlocal testBitset
|
||||||
|
tlocals := tlocal[:]
|
||||||
|
BitsetCopyLocal(tlocals, t1s)
|
||||||
|
for i = 0; i < 128; i++ {
|
||||||
|
expected := (i != 72)
|
||||||
|
if BitsetGetLocal(tlocals, i) != expected {
|
||||||
|
t.Error("all bits should be set except 72")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user