mirror of
https://github.com/ergochat/ergo.git
synced 2025-01-23 10:44:11 +01:00
Merge pull request #320 from slingamn/replay.1
history replay enhancements
This commit is contained in:
commit
cd339281e4
@ -129,7 +129,7 @@ Make sure to setup [SASL](https://freenode.net/kb/answer/sasl) in your client to
|
||||
* Niels Freier, added WebSocket support to Ergonomadic, <https://github.com/stumpyfr>
|
||||
* Daniel Oakley, maintainer of Oragono, <https://github.com/DanielOaks>
|
||||
* Euan Kemp, contributor to Oragono and lots of useful fixes, <https://github.com/euank>
|
||||
* Shivaram Lingamneni, has contributed a ton of fixes, refactoring, and general improvements, <https://github.com/slingamn>
|
||||
* Shivaram Lingamneni, co-maintainer of Oragono, <https://github.com/slingamn>
|
||||
* James Mills, contributed Docker support, <https://github.com/prologic>
|
||||
* Vegax, implementing some commands and helping when Oragono was just getting started, <https://github.com/vegax87>
|
||||
* Sean Enck, transitioned us from using a custom script to a proper Makefile, <https://github.com/enckse>
|
||||
|
78
irc/batch.go
78
irc/batch.go
@ -1,78 +0,0 @@
|
||||
// Copyright (c) 2017 Daniel Oaks <daniel@danieloaks.net>
|
||||
// released under the MIT license
|
||||
|
||||
package irc
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/goshuirc/irc-go/ircmsg"
|
||||
"github.com/oragono/oragono/irc/caps"
|
||||
)
|
||||
|
||||
const (
|
||||
// maxBatchID is the maximum ID the batch counter can get to before it rotates.
|
||||
//
|
||||
// Batch IDs are made up of the current unix timestamp plus a rolling int ID that's
|
||||
// incremented for every new batch. It's an alright solution and will work unless we get
|
||||
// more than maxId batches per nanosecond. Later on when we have S2S linking, the batch
|
||||
// ID will also contain the server ID to ensure they stay unique.
|
||||
maxBatchID uint64 = 60000
|
||||
)
|
||||
|
||||
// BatchManager helps generate new batches and new batch IDs.
|
||||
type BatchManager struct {
|
||||
idCounter uint64
|
||||
}
|
||||
|
||||
// NewBatchManager returns a new Manager.
|
||||
func NewBatchManager() *BatchManager {
|
||||
return &BatchManager{}
|
||||
}
|
||||
|
||||
// NewID returns a new batch ID that should be unique.
|
||||
func (bm *BatchManager) NewID() string {
|
||||
bm.idCounter++
|
||||
if maxBatchID < bm.idCounter {
|
||||
bm.idCounter = 0
|
||||
}
|
||||
|
||||
return strconv.FormatInt(time.Now().UnixNano(), 36) + strconv.FormatUint(bm.idCounter, 36)
|
||||
}
|
||||
|
||||
// Batch represents an IRCv3 batch.
|
||||
type Batch struct {
|
||||
ID string
|
||||
Type string
|
||||
Params []string
|
||||
}
|
||||
|
||||
// New returns a new batch.
|
||||
func (bm *BatchManager) New(batchType string, params ...string) *Batch {
|
||||
newBatch := Batch{
|
||||
ID: bm.NewID(),
|
||||
Type: batchType,
|
||||
Params: params,
|
||||
}
|
||||
|
||||
return &newBatch
|
||||
}
|
||||
|
||||
// Start sends the batch start message to this client
|
||||
func (b *Batch) Start(client *Client, tags *map[string]ircmsg.TagValue) {
|
||||
if client.capabilities.Has(caps.Batch) {
|
||||
params := []string{"+" + b.ID, b.Type}
|
||||
for _, param := range b.Params {
|
||||
params = append(params, param)
|
||||
}
|
||||
client.Send(tags, client.server.name, "BATCH", params...)
|
||||
}
|
||||
}
|
||||
|
||||
// End sends the batch end message to this client
|
||||
func (b *Batch) End(client *Client) {
|
||||
if client.capabilities.Has(caps.Batch) {
|
||||
client.Send(nil, client.server.name, "BATCH", "-"+b.ID)
|
||||
}
|
||||
}
|
108
irc/channel.go
108
irc/channel.go
@ -38,7 +38,7 @@ type Channel struct {
|
||||
topic string
|
||||
topicSetBy string
|
||||
topicSetTime time.Time
|
||||
userLimit uint64
|
||||
userLimit int
|
||||
accountToUMode map[string]modes.Mode
|
||||
history history.Buffer
|
||||
}
|
||||
@ -332,25 +332,12 @@ func (channel *Channel) modeStrings(client *Client) (result []string) {
|
||||
result = append(result, channel.key)
|
||||
}
|
||||
if showUserLimit {
|
||||
result = append(result, strconv.FormatUint(channel.userLimit, 10))
|
||||
result = append(result, strconv.Itoa(channel.userLimit))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// IsFull returns true if this channel is at its' members limit.
|
||||
func (channel *Channel) IsFull() bool {
|
||||
channel.stateMutex.RLock()
|
||||
defer channel.stateMutex.RUnlock()
|
||||
return (channel.userLimit > 0) && (uint64(len(channel.members)) >= channel.userLimit)
|
||||
}
|
||||
|
||||
// CheckKey returns true if the key is not set or matches the given key.
|
||||
func (channel *Channel) CheckKey(key string) bool {
|
||||
chkey := channel.Key()
|
||||
return chkey == "" || utils.SecretTokensMatch(chkey, key)
|
||||
}
|
||||
|
||||
func (channel *Channel) IsEmpty() bool {
|
||||
channel.stateMutex.RLock()
|
||||
defer channel.stateMutex.RUnlock()
|
||||
@ -359,26 +346,31 @@ func (channel *Channel) IsEmpty() bool {
|
||||
|
||||
// Join joins the given client to this channel (if they can be joined).
|
||||
func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *ResponseBuffer) {
|
||||
if channel.hasClient(client) {
|
||||
// already joined, no message needs to be sent
|
||||
return
|
||||
}
|
||||
|
||||
channel.stateMutex.RLock()
|
||||
chname := channel.name
|
||||
chcfname := channel.nameCasefolded
|
||||
founder := channel.registeredFounder
|
||||
chkey := channel.key
|
||||
limit := channel.userLimit
|
||||
chcount := len(channel.members)
|
||||
_, alreadyJoined := channel.members[client]
|
||||
channel.stateMutex.RUnlock()
|
||||
|
||||
if alreadyJoined {
|
||||
// no message needs to be sent
|
||||
return
|
||||
}
|
||||
|
||||
account := client.Account()
|
||||
nickMaskCasefolded := client.NickMaskCasefolded()
|
||||
hasPrivs := isSajoin || (founder != "" && founder == account)
|
||||
|
||||
if !hasPrivs && channel.IsFull() {
|
||||
if !hasPrivs && limit != 0 && chcount >= limit {
|
||||
rb.Add(nil, client.server.name, ERR_CHANNELISFULL, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "l"))
|
||||
return
|
||||
}
|
||||
|
||||
if !hasPrivs && !channel.CheckKey(key) {
|
||||
if !hasPrivs && chkey != "" && !utils.SecretTokensMatch(chkey, key) {
|
||||
rb.Add(nil, client.server.name, ERR_BADCHANNELKEY, chname, fmt.Sprintf(client.t("Cannot join channel (+%s)"), "k"))
|
||||
return
|
||||
}
|
||||
@ -469,7 +461,18 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp
|
||||
Type: history.Join,
|
||||
Nick: nickmask,
|
||||
AccountName: accountName,
|
||||
Msgid: realname,
|
||||
})
|
||||
|
||||
// TODO #259 can be implemented as Flush(false) (i.e., nonblocking) while holding joinPartMutex
|
||||
rb.Flush(true)
|
||||
|
||||
replayLimit := channel.server.Config().History.AutoreplayOnJoin
|
||||
if replayLimit > 0 {
|
||||
items := channel.history.Latest(replayLimit)
|
||||
channel.replayHistoryItems(rb, items)
|
||||
rb.Flush(true)
|
||||
}
|
||||
}
|
||||
|
||||
// Part parts the given client from this channel, with the given message.
|
||||
@ -506,7 +509,7 @@ func (channel *Channel) Resume(newClient, oldClient *Client, timestamp time.Time
|
||||
now := time.Now()
|
||||
channel.resumeAndAnnounce(newClient, oldClient)
|
||||
if !timestamp.IsZero() {
|
||||
channel.replayHistory(newClient, timestamp, now)
|
||||
channel.replayHistoryForResume(newClient, timestamp, now)
|
||||
}
|
||||
}
|
||||
|
||||
@ -560,7 +563,6 @@ func (channel *Channel) resumeAndAnnounce(newClient, oldClient *Client) {
|
||||
|
||||
rb := NewResponseBuffer(newClient)
|
||||
// use blocking i/o to synchronize with the later history replay
|
||||
rb.SetBlocking(true)
|
||||
if newClient.capabilities.Has(caps.ExtendedJoin) {
|
||||
rb.Add(nil, nickMask, "JOIN", channel.name, accountName, realName)
|
||||
} else {
|
||||
@ -571,41 +573,55 @@ func (channel *Channel) resumeAndAnnounce(newClient, oldClient *Client) {
|
||||
if 0 < len(oldModes) {
|
||||
rb.Add(nil, newClient.server.name, "MODE", channel.name, oldModes, nick)
|
||||
}
|
||||
rb.Send()
|
||||
rb.Send(true)
|
||||
}
|
||||
|
||||
func (channel *Channel) replayHistory(newClient *Client, after time.Time, before time.Time) {
|
||||
chname := channel.Name()
|
||||
extendedJoin := newClient.capabilities.Has(caps.ExtendedJoin)
|
||||
|
||||
func (channel *Channel) replayHistoryForResume(newClient *Client, after time.Time, before time.Time) {
|
||||
items, complete := channel.history.Between(after, before)
|
||||
rb := NewResponseBuffer(newClient)
|
||||
channel.replayHistoryItems(rb, items)
|
||||
if !complete && !newClient.resumeDetails.HistoryIncomplete {
|
||||
// warn here if we didn't warn already
|
||||
rb.Add(nil, "HistServ", "NOTICE", channel.Name(), newClient.t("Some additional message history may have been lost"))
|
||||
}
|
||||
rb.Send(true)
|
||||
}
|
||||
|
||||
func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.Item) {
|
||||
chname := channel.Name()
|
||||
client := rb.target
|
||||
extendedJoin := client.capabilities.Has(caps.ExtendedJoin)
|
||||
serverTime := client.capabilities.Has(caps.ServerTime)
|
||||
|
||||
for _, item := range items {
|
||||
var tags Tags
|
||||
if serverTime {
|
||||
tags = ensureTag(tags, "time", item.Time.Format(IRCv3TimestampFormat))
|
||||
}
|
||||
|
||||
switch item.Type {
|
||||
case history.Privmsg:
|
||||
newClient.sendSplitMsgFromClientInternal(true, item.Time, item.Msgid, item.Nick, item.AccountName, nil, "PRIVMSG", chname, item.Message)
|
||||
rb.AddSplitMessageFromClient(item.Msgid, item.Nick, item.AccountName, tags, "PRIVMSG", chname, item.Message)
|
||||
case history.Notice:
|
||||
newClient.sendSplitMsgFromClientInternal(true, item.Time, item.Msgid, item.Nick, item.AccountName, nil, "NOTICE", chname, item.Message)
|
||||
rb.AddSplitMessageFromClient(item.Msgid, item.Nick, item.AccountName, tags, "NOTICE", chname, item.Message)
|
||||
case history.Join:
|
||||
if extendedJoin {
|
||||
newClient.sendInternal(true, item.Time, nil, item.Nick, "JOIN", chname, item.AccountName, "")
|
||||
// XXX Msgid is the realname in this case
|
||||
rb.Add(tags, item.Nick, "JOIN", chname, item.AccountName, item.Msgid)
|
||||
} else {
|
||||
newClient.sendInternal(true, item.Time, nil, item.Nick, "JOIN", chname)
|
||||
rb.Add(tags, item.Nick, "JOIN", chname)
|
||||
}
|
||||
case history.Quit:
|
||||
// XXX: send QUIT as PART to avoid having to correctly deduplicate and synchronize
|
||||
// QUIT messages across channels
|
||||
fallthrough
|
||||
case history.Part:
|
||||
newClient.sendInternal(true, item.Time, nil, item.Nick, "PART", chname, item.Message.Original)
|
||||
rb.Add(tags, item.Nick, "PART", chname, item.Message.Original)
|
||||
case history.Kick:
|
||||
newClient.sendInternal(true, item.Time, nil, item.Nick, "KICK", chname, item.Msgid, item.Message.Original)
|
||||
// XXX Msgid is the kick target
|
||||
rb.Add(tags, item.Nick, "KICK", chname, item.Msgid, item.Message.Original)
|
||||
}
|
||||
}
|
||||
|
||||
if !complete && !newClient.resumeDetails.HistoryIncomplete {
|
||||
// warn here if we didn't warn already
|
||||
newClient.sendInternal(true, time.Time{}, nil, "HistServ", "NOTICE", chname, newClient.t("Some additional message history may have been lost"))
|
||||
}
|
||||
}
|
||||
|
||||
// SendTopic sends the channel topic to the given client.
|
||||
@ -707,10 +723,12 @@ func (channel *Channel) sendMessage(msgid, cmd string, requiredCaps []caps.Capab
|
||||
messageTagsToUse = clientOnlyTags
|
||||
}
|
||||
|
||||
nickMaskString := client.NickMaskString()
|
||||
accountName := client.AccountName()
|
||||
if message == nil {
|
||||
rb.AddFromClient(msgid, client, messageTagsToUse, cmd, channel.name)
|
||||
rb.AddFromClient(msgid, nickMaskString, accountName, messageTagsToUse, cmd, channel.name)
|
||||
} else {
|
||||
rb.AddFromClient(msgid, client, messageTagsToUse, cmd, channel.name, *message)
|
||||
rb.AddFromClient(msgid, nickMaskString, accountName, messageTagsToUse, cmd, channel.name, *message)
|
||||
}
|
||||
}
|
||||
for _, member := range channel.Members() {
|
||||
@ -773,10 +791,12 @@ func (channel *Channel) sendSplitMessage(msgid, cmd string, histType history.Ite
|
||||
if client.capabilities.Has(caps.MessageTags) {
|
||||
tagsToUse = clientOnlyTags
|
||||
}
|
||||
nickMaskString := client.NickMaskString()
|
||||
accountName := client.AccountName()
|
||||
if message == nil {
|
||||
rb.AddFromClient(msgid, client, tagsToUse, cmd, channel.name)
|
||||
rb.AddFromClient(msgid, nickMaskString, accountName, tagsToUse, cmd, channel.name)
|
||||
} else {
|
||||
rb.AddSplitMessageFromClient(msgid, client, tagsToUse, cmd, channel.name, *message)
|
||||
rb.AddSplitMessageFromClient(msgid, nickMaskString, accountName, tagsToUse, cmd, channel.name, *message)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -446,17 +446,19 @@ func (client *Client) TryResume() {
|
||||
}
|
||||
}
|
||||
}
|
||||
personalHistory := oldClient.history.All()
|
||||
privmsgMatcher := func(item history.Item) bool {
|
||||
return item.Type == history.Privmsg || item.Type == history.Notice
|
||||
}
|
||||
privmsgHistory := oldClient.history.Match(privmsgMatcher, 0)
|
||||
lastDiscarded := oldClient.history.LastDiscarded()
|
||||
if lastDiscarded.Before(oldestLostMessage) {
|
||||
oldestLostMessage = lastDiscarded
|
||||
}
|
||||
for _, item := range personalHistory {
|
||||
if item.Type == history.Privmsg || item.Type == history.Notice {
|
||||
sender := server.clients.Get(item.Nick)
|
||||
if sender != nil {
|
||||
friends.Add(sender)
|
||||
}
|
||||
for _, item := range privmsgHistory {
|
||||
// TODO this is the nickmask, fix that
|
||||
sender := server.clients.Get(item.Nick)
|
||||
if sender != nil {
|
||||
friends.Add(sender)
|
||||
}
|
||||
}
|
||||
|
||||
@ -482,10 +484,10 @@ func (client *Client) TryResume() {
|
||||
}
|
||||
|
||||
if client.resumeDetails.HistoryIncomplete {
|
||||
client.Send(nil, "RESUME", "WARN", fmt.Sprintf(client.t("Resume may have lost up to %d seconds of history"), gapSeconds))
|
||||
client.Send(nil, client.server.name, "RESUME", "WARN", fmt.Sprintf(client.t("Resume may have lost up to %d seconds of history"), gapSeconds))
|
||||
}
|
||||
|
||||
client.Send(nil, "RESUME", "SUCCESS", oldNick)
|
||||
client.Send(nil, client.server.name, "RESUME", "SUCCESS", oldNick)
|
||||
|
||||
// after we send the rest of the registration burst, we'll try rejoining channels
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func (cmd *Command) Run(server *Server, client *Client, msg ircmsg.IrcMessage) b
|
||||
rb := NewResponseBuffer(client)
|
||||
rb.Label = GetLabel(msg)
|
||||
exiting := cmd.handler(server, client, msg, rb)
|
||||
rb.Send()
|
||||
rb.Send(true)
|
||||
|
||||
// after each command, see if we can send registration to the client
|
||||
if !client.registered {
|
||||
|
@ -268,9 +268,10 @@ type Config struct {
|
||||
Fakelag FakelagConfig
|
||||
|
||||
History struct {
|
||||
Enabled bool
|
||||
ChannelLength int `yaml:"channel-length"`
|
||||
ClientLength int `yaml:"client-length"`
|
||||
Enabled bool
|
||||
ChannelLength int `yaml:"channel-length"`
|
||||
ClientLength int `yaml:"client-length"`
|
||||
AutoreplayOnJoin int `yaml:"autoreplay-on-join"`
|
||||
}
|
||||
|
||||
Filename string
|
||||
|
@ -259,13 +259,7 @@ func (channel *Channel) Members() (result []*Client) {
|
||||
return channel.membersCache
|
||||
}
|
||||
|
||||
func (channel *Channel) UserLimit() uint64 {
|
||||
channel.stateMutex.RLock()
|
||||
defer channel.stateMutex.RUnlock()
|
||||
return channel.userLimit
|
||||
}
|
||||
|
||||
func (channel *Channel) setUserLimit(limit uint64) {
|
||||
func (channel *Channel) setUserLimit(limit int) {
|
||||
channel.stateMutex.Lock()
|
||||
channel.userLimit = limit
|
||||
channel.stateMutex.Unlock()
|
||||
|
@ -933,7 +933,7 @@ func sajoinHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *Re
|
||||
server.channels.Join(target, chname, "", true, rb)
|
||||
}
|
||||
if client != target {
|
||||
rb.Send()
|
||||
rb.Send(false)
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -1706,16 +1706,18 @@ func noticeHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *Re
|
||||
if !user.HasMode(modes.RegisteredOnly) || client.LoggedIntoAccount() {
|
||||
user.SendSplitMsgFromClient(msgid, client, clientOnlyTags, "NOTICE", user.nick, splitMsg)
|
||||
}
|
||||
nickMaskString := client.NickMaskString()
|
||||
accountName := client.AccountName()
|
||||
if client.capabilities.Has(caps.EchoMessage) {
|
||||
rb.AddSplitMessageFromClient(msgid, client, clientOnlyTags, "NOTICE", user.nick, splitMsg)
|
||||
rb.AddSplitMessageFromClient(msgid, nickMaskString, accountName, clientOnlyTags, "NOTICE", user.nick, splitMsg)
|
||||
}
|
||||
|
||||
user.history.Add(history.Item{
|
||||
Type: history.Notice,
|
||||
Msgid: msgid,
|
||||
Message: splitMsg,
|
||||
Nick: client.NickMaskString(),
|
||||
AccountName: client.AccountName(),
|
||||
Nick: nickMaskString,
|
||||
AccountName: accountName,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1916,8 +1918,10 @@ func privmsgHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *R
|
||||
if !user.HasMode(modes.RegisteredOnly) || client.LoggedIntoAccount() {
|
||||
user.SendSplitMsgFromClient(msgid, client, clientOnlyTags, "PRIVMSG", user.nick, splitMsg)
|
||||
}
|
||||
nickMaskString := client.NickMaskString()
|
||||
accountName := client.AccountName()
|
||||
if client.capabilities.Has(caps.EchoMessage) {
|
||||
rb.AddSplitMessageFromClient(msgid, client, clientOnlyTags, "PRIVMSG", user.nick, splitMsg)
|
||||
rb.AddSplitMessageFromClient(msgid, nickMaskString, accountName, clientOnlyTags, "PRIVMSG", user.nick, splitMsg)
|
||||
}
|
||||
if user.HasMode(modes.Away) {
|
||||
//TODO(dan): possibly implement cooldown of away notifications to users
|
||||
@ -1928,8 +1932,8 @@ func privmsgHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *R
|
||||
Type: history.Privmsg,
|
||||
Msgid: msgid,
|
||||
Message: splitMsg,
|
||||
Nick: client.NickMaskString(),
|
||||
AccountName: client.AccountName(),
|
||||
Nick: nickMaskString,
|
||||
AccountName: accountName,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -2150,7 +2154,7 @@ func tagmsgHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *Re
|
||||
}
|
||||
user.SendFromClient(msgid, client, clientOnlyTags, "TAGMSG", user.nick)
|
||||
if client.capabilities.Has(caps.EchoMessage) {
|
||||
rb.AddFromClient(msgid, client, clientOnlyTags, "TAGMSG", user.nick)
|
||||
rb.AddFromClient(msgid, client.NickMaskString(), client.AccountName(), clientOnlyTags, "TAGMSG", user.nick)
|
||||
}
|
||||
if user.HasMode(modes.Away) {
|
||||
//TODO(dan): possibly implement cooldown of away notifications to users
|
||||
|
@ -32,9 +32,12 @@ type Item struct {
|
||||
// this is the uncasefolded account name, if there's no account it should be set to "*"
|
||||
AccountName string
|
||||
Message utils.SplitMessage
|
||||
Msgid string
|
||||
// for non-privmsg items, we may stuff some other data in here
|
||||
Msgid string
|
||||
}
|
||||
|
||||
type Predicate func(item Item) (matches bool)
|
||||
|
||||
// Buffer is a ring buffer holding message/event history for a channel or user
|
||||
type Buffer struct {
|
||||
sync.RWMutex
|
||||
@ -85,7 +88,7 @@ func (list *Buffer) Add(item Item) {
|
||||
}
|
||||
|
||||
if item.Time.IsZero() {
|
||||
item.Time = time.Now()
|
||||
item.Time = time.Now().UTC()
|
||||
}
|
||||
|
||||
list.Lock()
|
||||
@ -112,6 +115,12 @@ func (list *Buffer) Add(item Item) {
|
||||
list.buffer[pos] = item
|
||||
}
|
||||
|
||||
func reverse(results []Item) {
|
||||
for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
|
||||
results[i], results[j] = results[j], results[i]
|
||||
}
|
||||
}
|
||||
|
||||
// Between returns all history items with a time `after` <= time <= `before`,
|
||||
// with an indication of whether the results are complete or are missing items
|
||||
// because some of that period was discarded. A zero value of `before` is considered
|
||||
@ -126,51 +135,55 @@ func (list *Buffer) Between(after, before time.Time) (results []Item, complete b
|
||||
|
||||
complete = after.Equal(list.lastDiscarded) || after.After(list.lastDiscarded)
|
||||
|
||||
satisfies := func(item Item) bool {
|
||||
return (after.IsZero() || item.Time.After(after)) && (before.IsZero() || item.Time.Before(before))
|
||||
}
|
||||
|
||||
return list.matchInternal(satisfies, 0), complete
|
||||
}
|
||||
|
||||
// Match returns all history items such that `predicate` returns true for them.
|
||||
// Items are considered in reverse insertion order, up to a total of `limit` matches.
|
||||
// `predicate` MAY be a closure that maintains its own state across invocations;
|
||||
// it MUST NOT acquire any locks or otherwise do anything weird.
|
||||
func (list *Buffer) Match(predicate Predicate, limit int) (results []Item) {
|
||||
if !list.Enabled() {
|
||||
return
|
||||
}
|
||||
|
||||
list.RLock()
|
||||
defer list.RUnlock()
|
||||
|
||||
return list.matchInternal(predicate, limit)
|
||||
}
|
||||
|
||||
// you must be holding the read lock to call this
|
||||
func (list *Buffer) matchInternal(predicate Predicate, limit int) (results []Item) {
|
||||
if list.start == -1 {
|
||||
return
|
||||
}
|
||||
|
||||
satisfies := func(itime time.Time) bool {
|
||||
return (after.IsZero() || itime.After(after)) && (before.IsZero() || itime.Before(before))
|
||||
}
|
||||
|
||||
// TODO: if we can guarantee that the insertion order is also the monotonic clock order,
|
||||
// then this can do a single allocation and use binary search and 1-2 copy calls
|
||||
|
||||
pos := list.prev(list.end)
|
||||
for {
|
||||
if satisfies(list.buffer[pos].Time) {
|
||||
if predicate(list.buffer[pos]) {
|
||||
results = append(results, list.buffer[pos])
|
||||
}
|
||||
if pos == list.start {
|
||||
if pos == list.start || (limit != 0 && len(results) == limit) {
|
||||
break
|
||||
}
|
||||
pos = list.prev(pos)
|
||||
}
|
||||
|
||||
// reverse the results
|
||||
for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
|
||||
results[i], results[j] = results[j], results[i]
|
||||
}
|
||||
// TODO sort by time instead?
|
||||
reverse(results)
|
||||
return
|
||||
}
|
||||
|
||||
// All returns all available history items as a slice
|
||||
func (list *Buffer) All() (results []Item) {
|
||||
list.RLock()
|
||||
defer list.RUnlock()
|
||||
|
||||
if list.start == -1 {
|
||||
return
|
||||
}
|
||||
results = make([]Item, list.length())
|
||||
if list.start < list.end {
|
||||
copy(results, list.buffer[list.start:list.end])
|
||||
} else {
|
||||
initialSegment := copy(results, list.buffer[list.start:])
|
||||
copy(results[initialSegment:], list.buffer[:list.end])
|
||||
}
|
||||
return
|
||||
// Latest returns the items most recently added, up to `limit`. If `limit` is 0,
|
||||
// it returns all items.
|
||||
func (list *Buffer) Latest(limit int) (results []Item) {
|
||||
matchAll := func(item Item) bool { return true }
|
||||
return list.Match(matchAll, limit)
|
||||
}
|
||||
|
||||
// LastDiscarded returns the latest time of any entry that was evicted
|
||||
|
@ -67,7 +67,8 @@ func TestEmptyBuffer(t *testing.T) {
|
||||
if since[0].Nick != "testnick2" {
|
||||
t.Error("retrieved junk data")
|
||||
}
|
||||
assertEqual(toNicks(buf.All()), []string{"testnick2"}, t)
|
||||
matchAll := func(item Item) bool { return true }
|
||||
assertEqual(toNicks(buf.Match(matchAll, 0)), []string{"testnick2"}, t)
|
||||
}
|
||||
|
||||
func toNicks(items []Item) (result []string) {
|
||||
|
@ -189,7 +189,7 @@ func (channel *Channel) ApplyChannelModeChanges(client *Client, isSamode bool, c
|
||||
case modes.UserLimit:
|
||||
switch change.Op {
|
||||
case modes.Add:
|
||||
val, err := strconv.ParseUint(change.Arg, 10, 64)
|
||||
val, err := strconv.Atoi(change.Arg)
|
||||
if err == nil {
|
||||
channel.setUserLimit(val)
|
||||
applied = append(applied, change)
|
||||
|
@ -83,7 +83,7 @@ func (server *Server) RandomlyRename(client *Client) {
|
||||
nick := fmt.Sprintf("%s%s", prefix, hex.EncodeToString(buf))
|
||||
rb := NewResponseBuffer(client)
|
||||
performNickChange(server, client, client, nick, rb)
|
||||
rb.Send()
|
||||
rb.Send(false)
|
||||
// technically performNickChange can fail to change the nick,
|
||||
// but if they're still delinquent, the timer will get them later
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
package irc
|
||||
|
||||
import (
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/goshuirc/irc-go/ircmsg"
|
||||
@ -11,16 +12,22 @@ import (
|
||||
"github.com/oragono/oragono/irc/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
// https://ircv3.net/specs/extensions/labeled-response.html
|
||||
batchType = "draft/labeled-response"
|
||||
)
|
||||
|
||||
// ResponseBuffer - put simply - buffers messages and then outputs them to a given client.
|
||||
//
|
||||
// Using a ResponseBuffer lets you really easily implement labeled-response, since the
|
||||
// buffer will silently create a batch if required and label the outgoing messages as
|
||||
// necessary (or leave it off and simply tag the outgoing message).
|
||||
type ResponseBuffer struct {
|
||||
Label string
|
||||
target *Client
|
||||
messages []ircmsg.IrcMessage
|
||||
blocking bool
|
||||
Label string
|
||||
batchID string
|
||||
target *Client
|
||||
messages []ircmsg.IrcMessage
|
||||
finalized bool
|
||||
}
|
||||
|
||||
// GetLabel returns the label from the given message.
|
||||
@ -35,96 +42,125 @@ func NewResponseBuffer(target *Client) *ResponseBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
func (rb *ResponseBuffer) SetBlocking(blocking bool) {
|
||||
rb.blocking = blocking
|
||||
}
|
||||
|
||||
// Add adds a standard new message to our queue.
|
||||
func (rb *ResponseBuffer) Add(tags *map[string]ircmsg.TagValue, prefix string, command string, params ...string) {
|
||||
message := ircmsg.MakeMessage(tags, prefix, command, params...)
|
||||
if rb.finalized {
|
||||
rb.target.server.logger.Error("message added to finalized ResponseBuffer, undefined behavior")
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
message := ircmsg.MakeMessage(tags, prefix, command, params...)
|
||||
rb.messages = append(rb.messages, message)
|
||||
}
|
||||
|
||||
// AddFromClient adds a new message from a specific client to our queue.
|
||||
func (rb *ResponseBuffer) AddFromClient(msgid string, from *Client, tags *map[string]ircmsg.TagValue, command string, params ...string) {
|
||||
func (rb *ResponseBuffer) AddFromClient(msgid string, fromNickMask string, fromAccount string, tags *map[string]ircmsg.TagValue, command string, params ...string) {
|
||||
// attach account-tag
|
||||
if rb.target.capabilities.Has(caps.AccountTag) && from.LoggedIntoAccount() {
|
||||
if tags == nil {
|
||||
tags = ircmsg.MakeTags("account", from.AccountName())
|
||||
} else {
|
||||
(*tags)["account"] = ircmsg.MakeTagValue(from.AccountName())
|
||||
if rb.target.capabilities.Has(caps.AccountTag) {
|
||||
if fromAccount != "*" {
|
||||
tags = ensureTag(tags, "account", fromAccount)
|
||||
}
|
||||
}
|
||||
// attach message-id
|
||||
if len(msgid) > 0 && rb.target.capabilities.Has(caps.MessageTags) {
|
||||
if tags == nil {
|
||||
tags = ircmsg.MakeTags("draft/msgid", msgid)
|
||||
} else {
|
||||
(*tags)["draft/msgid"] = ircmsg.MakeTagValue(msgid)
|
||||
}
|
||||
tags = ensureTag(tags, "draft/msgid", msgid)
|
||||
}
|
||||
|
||||
rb.Add(tags, from.nickMaskString, command, params...)
|
||||
rb.Add(tags, fromNickMask, command, params...)
|
||||
}
|
||||
|
||||
// AddSplitMessageFromClient adds a new split message from a specific client to our queue.
|
||||
func (rb *ResponseBuffer) AddSplitMessageFromClient(msgid string, from *Client, tags *map[string]ircmsg.TagValue, command string, target string, message utils.SplitMessage) {
|
||||
func (rb *ResponseBuffer) AddSplitMessageFromClient(msgid string, fromNickMask string, fromAccount string, tags *map[string]ircmsg.TagValue, command string, target string, message utils.SplitMessage) {
|
||||
if rb.target.capabilities.Has(caps.MaxLine) || message.Wrapped == nil {
|
||||
rb.AddFromClient(msgid, from, tags, command, target, message.Original)
|
||||
rb.AddFromClient(msgid, fromNickMask, fromAccount, tags, command, target, message.Original)
|
||||
} else {
|
||||
for _, str := range message.Wrapped {
|
||||
rb.AddFromClient(msgid, from, tags, command, target, str)
|
||||
rb.AddFromClient(msgid, fromNickMask, fromAccount, tags, command, target, str)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send sends the message to our target client.
|
||||
func (rb *ResponseBuffer) Send() error {
|
||||
// fall out if no messages to send
|
||||
if len(rb.messages) == 0 {
|
||||
return nil
|
||||
func (rb *ResponseBuffer) sendBatchStart(blocking bool) {
|
||||
if rb.batchID != "" {
|
||||
// batch already initialized
|
||||
return
|
||||
}
|
||||
|
||||
// make batch and all if required
|
||||
var batch *Batch
|
||||
useLabel := rb.target.capabilities.Has(caps.LabeledResponse) && rb.Label != ""
|
||||
if useLabel && 1 < len(rb.messages) && rb.target.capabilities.Has(caps.Batch) {
|
||||
batch = rb.target.server.batches.New("draft/labeled-response")
|
||||
// formerly this combined time.Now.UnixNano() in base 36 with an incrementing counter,
|
||||
// also in base 36. but let's just use a uuidv4-alike (26 base32 characters):
|
||||
rb.batchID = utils.GenerateSecretToken()
|
||||
|
||||
message := ircmsg.MakeMessage(nil, rb.target.server.name, "BATCH", "+"+rb.batchID, batchType)
|
||||
message.Tags[caps.LabelTagName] = ircmsg.MakeTagValue(rb.Label)
|
||||
rb.target.SendRawMessage(message, blocking)
|
||||
}
|
||||
|
||||
func (rb *ResponseBuffer) sendBatchEnd(blocking bool) {
|
||||
if rb.batchID == "" {
|
||||
// we are not sending a batch, skip this
|
||||
return
|
||||
}
|
||||
|
||||
message := ircmsg.MakeMessage(nil, rb.target.server.name, "BATCH", "-"+rb.batchID)
|
||||
rb.target.SendRawMessage(message, blocking)
|
||||
}
|
||||
|
||||
// Send sends all messages in the buffer to the client.
|
||||
// Afterwards, the buffer is in an undefined state and MUST NOT be used further.
|
||||
// If `blocking` is true you MUST be sending to the client from its own goroutine.
|
||||
func (rb *ResponseBuffer) Send(blocking bool) error {
|
||||
return rb.flushInternal(true, blocking)
|
||||
}
|
||||
|
||||
// Flush sends all messages in the buffer to the client.
|
||||
// Afterwards, the buffer can still be used. Client code MUST subsequently call Send()
|
||||
// to ensure that the final `BATCH -` message is sent.
|
||||
// If `blocking` is true you MUST be sending to the client from its own goroutine.
|
||||
func (rb *ResponseBuffer) Flush(blocking bool) error {
|
||||
return rb.flushInternal(false, blocking)
|
||||
}
|
||||
|
||||
// flushInternal sends the contents of the buffer, either blocking or nonblocking
|
||||
// It sends the `BATCH +` message if the client supports it and it hasn't been sent already.
|
||||
// If `final` is true, it also sends `BATCH -` (if necessary).
|
||||
func (rb *ResponseBuffer) flushInternal(final bool, blocking bool) error {
|
||||
useLabel := rb.target.capabilities.Has(caps.LabeledResponse) && rb.Label != ""
|
||||
// use a batch if we have a label, and we either currently have multiple messages,
|
||||
// or we are doing a Flush() and we have to assume that there will be more messages
|
||||
// in the future.
|
||||
useBatch := useLabel && (len(rb.messages) > 1 || !final)
|
||||
|
||||
// if label but no batch, add label to first message
|
||||
if useLabel && batch == nil {
|
||||
message := rb.messages[0]
|
||||
message.Tags[caps.LabelTagName] = ircmsg.MakeTagValue(rb.Label)
|
||||
rb.messages[0] = message
|
||||
}
|
||||
|
||||
// start batch if required
|
||||
if batch != nil {
|
||||
batch.Start(rb.target, ircmsg.MakeTags(caps.LabelTagName, rb.Label))
|
||||
if useLabel && !useBatch && len(rb.messages) == 1 {
|
||||
rb.messages[0].Tags[caps.LabelTagName] = ircmsg.MakeTagValue(rb.Label)
|
||||
} else if useBatch {
|
||||
rb.sendBatchStart(blocking)
|
||||
}
|
||||
|
||||
// send each message out
|
||||
for _, message := range rb.messages {
|
||||
// attach server-time if needed
|
||||
if rb.target.capabilities.Has(caps.ServerTime) {
|
||||
t := time.Now().UTC().Format(IRCv3TimestampFormat)
|
||||
message.Tags["time"] = ircmsg.MakeTagValue(t)
|
||||
if !message.Tags["time"].HasValue {
|
||||
t := time.Now().UTC().Format(IRCv3TimestampFormat)
|
||||
message.Tags["time"] = ircmsg.MakeTagValue(t)
|
||||
}
|
||||
}
|
||||
|
||||
// attach batch ID
|
||||
if batch != nil {
|
||||
message.Tags["batch"] = ircmsg.MakeTagValue(batch.ID)
|
||||
if rb.batchID != "" {
|
||||
message.Tags["batch"] = ircmsg.MakeTagValue(rb.batchID)
|
||||
}
|
||||
|
||||
// send message out
|
||||
rb.target.SendRawMessage(message, rb.blocking)
|
||||
rb.target.SendRawMessage(message, blocking)
|
||||
}
|
||||
|
||||
// end batch if required
|
||||
if batch != nil {
|
||||
batch.End(rb.target)
|
||||
if final {
|
||||
rb.sendBatchEnd(blocking)
|
||||
rb.finalized = true
|
||||
}
|
||||
|
||||
// clear out any existing messages
|
||||
|
@ -67,7 +67,6 @@ type ListenerWrapper struct {
|
||||
// Server is the main Oragono server.
|
||||
type Server struct {
|
||||
accounts *AccountManager
|
||||
batches *BatchManager
|
||||
channels *ChannelManager
|
||||
channelRegistry *ChannelRegistry
|
||||
clients *ClientManager
|
||||
@ -116,7 +115,6 @@ type clientConn struct {
|
||||
func NewServer(config *Config, logger *logger.Manager) (*Server, error) {
|
||||
// initialize data structures
|
||||
server := &Server{
|
||||
batches: NewBatchManager(),
|
||||
channels: NewChannelManager(),
|
||||
clients: NewClientManager(),
|
||||
connectionLimiter: connection_limits.NewLimiter(),
|
||||
@ -406,7 +404,7 @@ func (server *Server) tryRegister(c *Client) {
|
||||
|
||||
rb := NewResponseBuffer(c)
|
||||
nickAssigned := performNickChange(server, c, c, preregNick, rb)
|
||||
rb.Send()
|
||||
rb.Send(true)
|
||||
if !nickAssigned {
|
||||
c.SetPreregNick("")
|
||||
return
|
||||
@ -446,7 +444,7 @@ func (server *Server) tryRegister(c *Client) {
|
||||
rb = NewResponseBuffer(c)
|
||||
c.RplISupport(rb)
|
||||
server.MOTD(c, rb)
|
||||
rb.Send()
|
||||
rb.Send(true)
|
||||
|
||||
modestring := c.ModeString()
|
||||
if modestring != "+" {
|
||||
|
@ -6,7 +6,12 @@ package utils
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/subtle"
|
||||
"encoding/hex"
|
||||
"encoding/base32"
|
||||
)
|
||||
|
||||
var (
|
||||
// standard b32 alphabet, but in lowercase for silly aesthetic reasons
|
||||
b32encoder = base32.NewEncoding("abcdefghijklmnopqrstuvwxyz234567").WithPadding(base32.NoPadding)
|
||||
)
|
||||
|
||||
// generate a secret token that cannot be brute-forced via online attacks
|
||||
@ -14,8 +19,8 @@ func GenerateSecretToken() string {
|
||||
// 128 bits of entropy are enough to resist any online attack:
|
||||
var buf [16]byte
|
||||
rand.Read(buf[:])
|
||||
// 32 ASCII characters, should be fine for most purposes
|
||||
return hex.EncodeToString(buf[:])
|
||||
// 26 ASCII characters, should be fine for most purposes
|
||||
return b32encoder.EncodeToString(buf[:])
|
||||
}
|
||||
|
||||
// securely check if a supplied token matches a stored token
|
||||
|
@ -16,7 +16,7 @@ const (
|
||||
|
||||
func TestGenerateSecretToken(t *testing.T) {
|
||||
token := GenerateSecretToken()
|
||||
if len(token) != 32 {
|
||||
if len(token) < 22 {
|
||||
t.Errorf("bad token: %v", token)
|
||||
}
|
||||
}
|
||||
@ -46,3 +46,9 @@ func TestTokenCompare(t *testing.T) {
|
||||
t.Error("the empty token should not match anything")
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGenerateSecretToken(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
GenerateSecretToken()
|
||||
}
|
||||
}
|
||||
|
28
oragono.go
28
oragono.go
@ -51,17 +51,7 @@ Options:
|
||||
|
||||
arguments, _ := docopt.ParseArgs(usage, nil, version)
|
||||
|
||||
configfile := arguments["--conf"].(string)
|
||||
config, err := irc.LoadConfig(configfile)
|
||||
if err != nil {
|
||||
log.Fatal("Config file did not load successfully: ", err.Error())
|
||||
}
|
||||
|
||||
logman, err := logger.NewManager(config.Logging)
|
||||
if err != nil {
|
||||
log.Fatal("Logger did not load successfully:", err.Error())
|
||||
}
|
||||
|
||||
// don't require a config file for genpasswd
|
||||
if arguments["genpasswd"].(bool) {
|
||||
fmt.Print("Enter Password: ")
|
||||
password := getPassword()
|
||||
@ -77,7 +67,21 @@ Options:
|
||||
log.Fatal("encoding error:", err.Error())
|
||||
}
|
||||
fmt.Println(string(hash))
|
||||
} else if arguments["initdb"].(bool) {
|
||||
return
|
||||
}
|
||||
|
||||
configfile := arguments["--conf"].(string)
|
||||
config, err := irc.LoadConfig(configfile)
|
||||
if err != nil {
|
||||
log.Fatal("Config file did not load successfully: ", err.Error())
|
||||
}
|
||||
|
||||
logman, err := logger.NewManager(config.Logging)
|
||||
if err != nil {
|
||||
log.Fatal("Logger did not load successfully:", err.Error())
|
||||
}
|
||||
|
||||
if arguments["initdb"].(bool) {
|
||||
irc.InitDB(config.Datastore.Path)
|
||||
if !arguments["--quiet"].(bool) {
|
||||
log.Println("database initialized: ", config.Datastore.Path)
|
||||
|
@ -454,3 +454,6 @@ history:
|
||||
|
||||
# how many direct messages and notices should be tracked per user?
|
||||
client-length: 64
|
||||
|
||||
# number of messages to automatically play back on channel join (0 to disable):
|
||||
autoreplay-on-join: 0
|
||||
|
Loading…
Reference in New Issue
Block a user