3
0
mirror of https://github.com/ergochat/ergo.git synced 2025-01-11 04:32:39 +01:00

Merge pull request #1806 from slingamn/history.2

fix #1676
This commit is contained in:
Shivaram Lingamneni 2021-10-29 05:04:01 -04:00 committed by GitHub
commit 41089b0e16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 126 additions and 81 deletions

View File

@ -962,7 +962,7 @@ history:
# if `default` is false, store TAGMSG containing any of these tags:
whitelist:
- "+draft/react"
- "react"
- "+react"
# if `default` is true, don't store TAGMSG containing any of these tags:
#blacklist:

View File

@ -917,7 +917,7 @@ func (channel *Channel) autoReplayHistory(client *Client, rb *ResponseBuffer, sk
}
if hasAutoreplayTimestamps {
_, seq, _ := channel.server.GetHistorySequence(channel, client, "")
_, seq, _ := channel.server.GetHistorySequence(channel, client, "", 0)
if seq != nil {
zncMax := channel.server.Config().History.ZNCMax
items, _ = seq.Between(history.Selector{Time: start}, history.Selector{Time: end}, zncMax)
@ -935,7 +935,7 @@ func (channel *Channel) autoReplayHistory(client *Client, rb *ResponseBuffer, sk
replayLimit = channel.server.Config().History.AutoreplayOnJoin
}
if 0 < replayLimit {
_, seq, _ := channel.server.GetHistorySequence(channel, client, "")
_, seq, _ := channel.server.GetHistorySequence(channel, client, "", 0)
if seq != nil {
items, _ = seq.Between(history.Selector{}, history.Selector{}, replayLimit)
}
@ -1084,7 +1084,7 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I
} else {
message = fmt.Sprintf(client.t("%[1]s [account: %[2]s] joined the channel"), nick, item.AccountName)
}
rb.AddFromClient(item.Message.Time, utils.MungeSecretToken(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
}
case history.Part:
if eventPlayback {
@ -1094,14 +1094,14 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I
continue // #474
}
message := fmt.Sprintf(client.t("%[1]s left the channel (%[2]s)"), nick, item.Message.Message)
rb.AddFromClient(item.Message.Time, utils.MungeSecretToken(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
}
case history.Kick:
if eventPlayback {
rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "KICK", chname, item.Params[0], item.Message.Message)
} else {
message := fmt.Sprintf(client.t("%[1]s kicked %[2]s (%[3]s)"), nick, item.Params[0], item.Message.Message)
rb.AddFromClient(item.Message.Time, utils.MungeSecretToken(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
}
case history.Quit:
if eventPlayback {
@ -1111,21 +1111,21 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I
continue // #474
}
message := fmt.Sprintf(client.t("%[1]s quit (%[2]s)"), nick, item.Message.Message)
rb.AddFromClient(item.Message.Time, utils.MungeSecretToken(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
}
case history.Nick:
if eventPlayback {
rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "NICK", item.Params[0])
} else {
message := fmt.Sprintf(client.t("%[1]s changed nick to %[2]s"), nick, item.Params[0])
rb.AddFromClient(item.Message.Time, utils.MungeSecretToken(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
}
case history.Topic:
if eventPlayback {
rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "TOPIC", chname, item.Message.Message)
} else {
message := fmt.Sprintf(client.t("%[1]s set the channel topic to: %[2]s"), nick, item.Message.Message)
rb.AddFromClient(item.Message.Time, utils.MungeSecretToken(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
}
case history.Mode:
params := make([]string, len(item.Message.Split)+1)
@ -1137,7 +1137,7 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I
rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "MODE", params...)
} else {
message := fmt.Sprintf(client.t("%[1]s set channel modes: %[2]s"), nick, strings.Join(params[1:], " "))
rb.AddFromClient(item.Message.Time, utils.MungeSecretToken(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message)
}
}
}

View File

@ -883,7 +883,7 @@ func (client *Client) replayPrivmsgHistory(rb *ResponseBuffer, items []history.I
if hasEventPlayback {
rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "INVITE", nick, item.Message.Message)
} else {
rb.AddFromClient(item.Message.Time, utils.MungeSecretToken(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", fmt.Sprintf(client.t("%[1]s invited you to channel %[2]s"), NUHToNick(item.Nick), item.Message.Message))
rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", fmt.Sprintf(client.t("%[1]s invited you to channel %[2]s"), NUHToNick(item.Nick), item.Message.Message))
}
continue
case history.Privmsg:
@ -1713,7 +1713,7 @@ func (client *Client) listTargets(start, end history.Selector, limit int) (resul
var base, extras []history.TargetListing
var chcfnames []string
for _, channel := range client.Channels() {
_, seq, err := client.server.GetHistorySequence(channel, client, "")
_, seq, err := client.server.GetHistorySequence(channel, client, "", 0)
if seq == nil || err != nil {
continue
}
@ -1734,7 +1734,7 @@ func (client *Client) listTargets(start, end history.Selector, limit int) (resul
extras = append(extras, persistentExtras...)
}
_, cSeq, err := client.server.GetHistorySequence(nil, client, "")
_, cSeq, err := client.server.GetHistorySequence(nil, client, "", 0)
if err == nil && cSeq != nil {
correspondents, err := cSeq.ListCorrespondents(start, end, limit)
if err == nil {
@ -1758,7 +1758,7 @@ func (client *Client) privmsgsBetween(startTime, endTime time.Time, targetLimit,
if strings.HasPrefix(target.CfName, "#") {
continue
}
_, seq, err := client.server.GetHistorySequence(nil, client, target.CfName)
_, seq, err := client.server.GetHistorySequence(nil, client, target.CfName, 0)
if err == nil && seq != nil {
items, err := seq.Between(start, end, messageLimit)
if err == nil {

View File

@ -640,7 +640,7 @@ func chathistoryHandler(server *Server, client *Client, msg ircmsg.Message, rb *
}
identifier, value := strings.ToLower(pieces[0]), pieces[1]
if identifier == "msgid" {
msgid, err = value, nil
msgid, err = history.NormalizeMsgid(value), nil
return
} else if identifier == "timestamp" {
timestamp, err = time.Parse(IRCv3TimestampFormat, value)
@ -725,7 +725,17 @@ func chathistoryHandler(server *Server, client *Client, msg ircmsg.Message, rb *
if listTargets {
targets, err = client.listTargets(start, end, limit)
} else {
channel, sequence, err = server.GetHistorySequence(nil, client, target)
// see #1676; for CHATHISTORY we need to make the paging window as exact as possible,
// hence filtering out undisplayable messages on the backend, in order to send a full
// paging window if possible
var flags history.ExcludeFlags
if !rb.session.capabilities.Has(caps.EventPlayback) {
flags |= history.ExcludeTagmsg
}
if client.AccountSettings().ReplayJoins == ReplayJoinsNever {
flags |= history.ExcludeJoins
}
channel, sequence, err = server.GetHistorySequence(nil, client, target, flags)
if err != nil || sequence == nil {
return
}

View File

@ -53,6 +53,17 @@ func (item *Item) HasMsgid(msgid string) bool {
return item.Message.Msgid == msgid
}
func (item *Item) IsExcluded(excludeFlags ExcludeFlags) bool {
switch item.Type {
case Tagmsg:
return excludeFlags&ExcludeTagmsg != 0
case Join, Part, Quit:
return excludeFlags&ExcludeJoins != 0
default:
return false
}
}
type Predicate func(item *Item) (matches bool)
func Reverse(results []Item) {
@ -155,7 +166,7 @@ func (list *Buffer) lookup(msgid string) (result Item, found bool) {
// with an indication of whether the results are complete or are missing items
// because some of that period was discarded. A zero value of `before` is considered
// higher than all other times.
func (list *Buffer) betweenHelper(start, end Selector, cutoff time.Time, pred Predicate, limit int) (results []Item, complete bool, err error) {
func (list *Buffer) betweenHelper(start, end Selector, cutoff time.Time, pred Predicate, limit int, excludeFlags ExcludeFlags) (results []Item, complete bool, err error) {
var ascending bool
defer func() {
@ -195,7 +206,8 @@ func (list *Buffer) betweenHelper(start, end Selector, cutoff time.Time, pred Pr
satisfies := func(item *Item) bool {
return (after.IsZero() || item.Message.Time.After(after)) &&
(before.IsZero() || item.Message.Time.Before(before)) &&
(pred == nil || pred(item))
(pred == nil || pred(item)) &&
!item.IsExcluded(excludeFlags)
}
return list.matchInternal(satisfies, ascending, limit), complete, nil
@ -279,9 +291,10 @@ type bufferSequence struct {
list *Buffer
pred Predicate
cutoff time.Time
flags ExcludeFlags
}
func (list *Buffer) MakeSequence(correspondent string, cutoff time.Time) Sequence {
func (list *Buffer) MakeSequence(correspondent string, cutoff time.Time, flags ExcludeFlags) Sequence {
var pred Predicate
if correspondent != "" {
pred = func(item *Item) bool {
@ -292,11 +305,12 @@ func (list *Buffer) MakeSequence(correspondent string, cutoff time.Time) Sequenc
list: list,
pred: pred,
cutoff: cutoff,
flags: flags,
}
}
func (seq *bufferSequence) Between(start, end Selector, limit int) (results []Item, err error) {
results, _, err = seq.list.betweenHelper(start, end, seq.cutoff, seq.pred, limit)
results, _, err = seq.list.betweenHelper(start, end, seq.cutoff, seq.pred, limit, seq.flags)
return
}
@ -377,7 +391,7 @@ func (list *Buffer) Delete(predicate Predicate) (count int) {
// latest returns the items most recently added, up to `limit`. If `limit` is 0,
// it returns all items.
func (list *Buffer) latest(limit int) (results []Item) {
results, _, _ = list.betweenHelper(Selector{}, Selector{}, time.Time{}, nil, limit)
results, _, _ = list.betweenHelper(Selector{}, Selector{}, time.Time{}, nil, limit, 0)
return
}

View File

@ -15,7 +15,7 @@ const (
)
func betweenTimestamps(buf *Buffer, start, end time.Time, limit int) (result []Item, complete bool) {
result, complete, _ = buf.betweenHelper(Selector{Time: start}, Selector{Time: end}, time.Time{}, nil, limit)
result, complete, _ = buf.betweenHelper(Selector{Time: start}, Selector{Time: end}, time.Time{}, nil, limit, 0)
return
}
@ -45,7 +45,7 @@ func TestEmptyBuffer(t *testing.T) {
})
since, complete = betweenTimestamps(buf, pastTime, time.Now(), 0)
if len(since) != 1 {
t.Error("should be able to store items in a nonempty buffer")
t.Errorf("should be able to store items in a nonempty buffer: expected %d, got %d", 1, len(since))
}
if !complete {
t.Error("results should be complete")

View File

@ -4,9 +4,17 @@
package history
import (
"strings"
"time"
)
type ExcludeFlags uint
const (
ExcludeTagmsg ExcludeFlags = 1 << iota
ExcludeJoins
)
// Selector represents a parameter to a CHATHISTORY command
type Selector struct {
Msgid string
@ -77,3 +85,16 @@ func MinMaxAsc(after, before, cutoff time.Time) (min, max time.Time, ascending b
}
return after, before, ascending
}
// maps regular msgids from JOIN, etc. to a msgid suitable for attaching
// to a HistServ message describing the JOIN. See #491 for some history.
func HistservMungeMsgid(msgid string) string {
return "_" + msgid
}
// strips munging from a msgid. future schemes may not support a well-defined
// mapping of munged msgids to true msgids, but munged msgids should always contain
// a _, with metadata in front and data (possibly the true msgid) after.
func NormalizeMsgid(msgid string) string {
return strings.TrimPrefix(msgid, "_")
}

View File

@ -199,7 +199,7 @@ func histservPlayHandler(service *ircService, server *Server, client *Client, co
// handles parameter parsing and history queries for /HISTORY and /HISTSERV PLAY
func easySelectHistory(server *Server, client *Client, params []string) (items []history.Item, channel *Channel, err error) {
channel, sequence, err := server.GetHistorySequence(nil, client, params[0])
channel, sequence, err := server.GetHistorySequence(nil, client, params[0], 0)
if sequence == nil || err != nil {
return nil, nil, errNoSuchChannel

View File

@ -40,6 +40,10 @@ const (
keySchemaMinorVersion = "db.minorversion"
cleanupRowLimit = 50
cleanupPauseTime = 10 * time.Minute
// if we don't fill the pagination window due to exclusions,
// retry with an expanded window at most this many times
maxPaginationRetries = 3
)
type e struct{}
@ -1033,9 +1037,18 @@ type mySQLHistorySequence struct {
target string
correspondent string
cutoff time.Time
excludeFlags history.ExcludeFlags
}
func (s *mySQLHistorySequence) Between(start, end history.Selector, limit int) (results []history.Item, err error) {
if s.excludeFlags == 0 {
return s.baseBetween(start, end, limit)
} else {
return s.betweenWithRetries(start, end, limit)
}
}
func (s *mySQLHistorySequence) baseBetween(start, end history.Selector, limit int) (results []history.Item, err error) {
ctx, cancel := context.WithTimeout(context.Background(), s.mysql.getTimeout())
defer cancel()
@ -1058,7 +1071,45 @@ func (s *mySQLHistorySequence) Between(start, end history.Selector, limit int) (
return results, err
}
func (s *mySQLHistorySequence) betweenWithRetries(start, end history.Selector, limit int) (results []history.Item, err error) {
applyExclusions := func(currentResults []history.Item, excludeFlags history.ExcludeFlags, trueLimit int) (filteredResults []history.Item) {
filteredResults = make([]history.Item, 0, len(currentResults))
for _, item := range currentResults {
if !item.IsExcluded(excludeFlags) {
filteredResults = append(filteredResults, item)
}
if len(filteredResults) == trueLimit {
break
}
}
return
}
i := 1
for {
currentLimit := limit * i
currentResults, err := s.baseBetween(start, end, currentLimit)
if err != nil {
return nil, err
}
results = applyExclusions(currentResults, s.excludeFlags, limit)
// we're done in any of these three cases:
// (1) we filled the window (2) we ran out of results on the backend (3) we can't retry anymore
if len(results) == limit || len(currentResults) < currentLimit || i == maxPaginationRetries {
return results, nil
}
i++
}
}
func (s *mySQLHistorySequence) Around(start history.Selector, limit int) (results []history.Item, err error) {
// temporarily clear the exclude flags when running GenericAround, since we don't care about
// the exactness of the paging window at all
oldExcludeFlags := s.excludeFlags
s.excludeFlags = 0
defer func() {
s.excludeFlags = oldExcludeFlags
}()
return history.GenericAround(s, start, limit)
}
@ -1083,11 +1134,12 @@ func (seq *mySQLHistorySequence) Ephemeral() bool {
return false
}
func (mysql *MySQL) MakeSequence(target, correspondent string, cutoff time.Time) history.Sequence {
func (mysql *MySQL) MakeSequence(target, correspondent string, cutoff time.Time, excludeFlags history.ExcludeFlags) history.Sequence {
return &mySQLHistorySequence{
target: target,
correspondent: correspondent,
mysql: mysql,
cutoff: cutoff,
excludeFlags: excludeFlags,
}
}

View File

@ -862,7 +862,7 @@ func (server *Server) setupListeners(config *Config) (err error) {
// suitable for ListCorrespondents (i.e., this function is still used to
// decide whether the ringbuf or mysql is authoritative about the client's
// message history).
func (server *Server) GetHistorySequence(providedChannel *Channel, client *Client, query string) (channel *Channel, sequence history.Sequence, err error) {
func (server *Server) GetHistorySequence(providedChannel *Channel, client *Client, query string, excludeFlags history.ExcludeFlags) (channel *Channel, sequence history.Sequence, err error) {
config := server.Config()
// 4 cases: {persistent, ephemeral} x {normal, conversation}
// with ephemeral history, target is implicit in the choice of `hist`,
@ -940,9 +940,9 @@ func (server *Server) GetHistorySequence(providedChannel *Channel, client *Clien
}
if hist != nil {
sequence = hist.MakeSequence(correspondent, cutoff)
sequence = hist.MakeSequence(correspondent, cutoff, excludeFlags)
} else if target != "" {
sequence = server.historyDB.MakeSequence(target, correspondent, cutoff)
sequence = server.historyDB.MakeSequence(target, correspondent, cutoff, excludeFlags)
}
return
}

View File

@ -42,29 +42,6 @@ func GenerateSecretToken() string {
return B32Encoder.EncodeToString(buf[:])
}
// "munge" a secret token to a new value. requirements:
// 1. MUST be roughly as unlikely to collide with `GenerateSecretToken` outputs
// as those outputs are with each other
// 2. SHOULD be deterministic (motivation: if a JOIN line has msgid x,
// create a deterministic msgid y for the fake HistServ PRIVMSG that "replays" it)
// 3. SHOULD be in the same "namespace" as `GenerateSecretToken` outputs
// (same length and character set)
func MungeSecretToken(token string) (result string) {
bytes, err := B32Encoder.DecodeString(token)
if err != nil {
// this should never happen
return GenerateSecretToken()
}
// add 1 with carrying
for i := len(bytes) - 1; 0 <= i; i -= 1 {
bytes[i] += 1
if bytes[i] != 0 {
break
} // else: overflow, carry to the next place
}
return B32Encoder.EncodeToString(bytes)
}
// securely check if a supplied token matches a stored token
func SecretTokensMatch(storedToken string, suppliedToken string) bool {
// XXX fix a potential gotcha: if the stored token is uninitialized,

View File

@ -47,41 +47,12 @@ func TestTokenCompare(t *testing.T) {
}
}
func TestMunging(t *testing.T) {
count := 131072
set := make(map[string]bool)
var token string
for i := 0; i < count; i++ {
token = GenerateSecretToken()
set[token] = true
}
// all tokens generated thus far should be unique
assertEqual(len(set), count, t)
// iteratively munge the last generated token an additional `count` times
mungedToken := token
for i := 0; i < count; i++ {
mungedToken = MungeSecretToken(mungedToken)
assertEqual(len(mungedToken), len(token), t)
set[mungedToken] = true
}
// munged tokens should not collide with generated tokens, or each other
assertEqual(len(set), count*2, t)
}
func BenchmarkGenerateSecretToken(b *testing.B) {
for i := 0; i < b.N; i++ {
GenerateSecretToken()
}
}
func BenchmarkMungeSecretToken(b *testing.B) {
t := GenerateSecretToken()
for i := 0; i < b.N; i++ {
t = MungeSecretToken(t)
}
}
func TestCertfpComparisons(t *testing.T) {
opensslFP := "3D:6B:11:BF:B4:05:C3:F8:4B:38:CD:30:38:FB:EC:01:71:D5:03:54:79:04:07:88:4C:A5:5D:23:41:85:66:C9"
oragonoFP := "3d6b11bfb405c3f84b38cd3038fbec0171d50354790407884ca55d23418566c9"

View File

@ -189,7 +189,7 @@ func zncPlaybackPlayHandler(client *Client, command string, params []string, rb
}
func zncPlayPrivmsgsFrom(client *Client, rb *ResponseBuffer, target string, start, end time.Time) {
_, sequence, err := client.server.GetHistorySequence(nil, client, target)
_, sequence, err := client.server.GetHistorySequence(nil, client, target, 0)
if sequence == nil || err != nil {
return
}