From 0d05ab4ff4249f174a68ca61bfe3cbbf221436da Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Mon, 19 Apr 2021 08:54:40 -0400 Subject: [PATCH] fix #1615 Remove the CHATHISTORY * and znc.in/playback *self targets, clean up associated database code, add new mechanisms to play all missed DMs. --- irc/client.go | 40 ++++++++++++++------- irc/handlers.go | 2 +- irc/histserv.go | 9 ++--- irc/mysql/history.go | 82 ++++++++++++++++++++++++-------------------- irc/server.go | 6 +++- irc/znc.go | 12 +++---- 6 files changed, 87 insertions(+), 64 deletions(-) diff --git a/irc/client.go b/irc/client.go index 53488bc2..7cf2d0e9 100644 --- a/irc/client.go +++ b/irc/client.go @@ -989,16 +989,6 @@ func (session *Session) playResume() { oldestLostMessage = lastDiscarded } } - _, privmsgSeq, _ := server.GetHistorySequence(nil, client, "*") - if privmsgSeq != nil { - privmsgs, _ := privmsgSeq.Between(history.Selector{}, history.Selector{}, config.History.ClientLength) - for _, item := range privmsgs { - sender := server.clients.Get(NUHToNick(item.Nick)) - if sender != nil { - friends.Add(sender) - } - } - } timestamp := session.resumeDetails.Timestamp gap := oldestLostMessage.Sub(timestamp) @@ -1054,7 +1044,8 @@ func (session *Session) playResume() { } // replay direct PRIVSMG history - if !timestamp.IsZero() && privmsgSeq != nil { + _, privmsgSeq, err := server.GetHistorySequence(nil, client, "") + if !timestamp.IsZero() && err == nil && privmsgSeq != nil { after := history.Selector{Time: timestamp} items, _ := privmsgSeq.Between(after, history.Selector{}, config.History.ZNCMax) if len(items) != 0 { @@ -1957,7 +1948,7 @@ func (client *Client) listTargets(start, end history.Selector, limit int) (resul extras = append(extras, persistentExtras...) } - _, cSeq, err := client.server.GetHistorySequence(nil, client, "*") + _, cSeq, err := client.server.GetHistorySequence(nil, client, "") if err == nil && cSeq != nil { correspondents, err := cSeq.ListCorrespondents(start, end, limit) if err == nil { @@ -1969,6 +1960,31 @@ func (client *Client) listTargets(start, end history.Selector, limit int) (resul return results, nil } +// latest PRIVMSG from all DM targets +func (client *Client) privmsgsBetween(startTime, endTime time.Time, targetLimit, messageLimit int) (results []history.Item, err error) { + start := history.Selector{Time: startTime} + end := history.Selector{Time: endTime} + targets, err := client.listTargets(start, end, targetLimit) + if err != nil { + return + } + for _, target := range targets { + if strings.HasPrefix(target.CfName, "#") { + continue + } + _, seq, err := client.server.GetHistorySequence(nil, client, target.CfName) + if err == nil && seq != nil { + items, err := seq.Between(start, end, messageLimit) + if err == nil { + results = append(results, items...) + } else { + client.server.logger.Error("internal", "error querying privmsg history", client.Nick(), target.CfName, err.Error()) + } + } + } + return +} + func (client *Client) handleRegisterTimeout() { client.Quit(fmt.Sprintf("Registration timeout: %v", RegisterTimeout), nil) client.destroy(nil) diff --git a/irc/handlers.go b/irc/handlers.go index 80a8cf44..a644abc2 100644 --- a/irc/handlers.go +++ b/irc/handlers.go @@ -1080,7 +1080,7 @@ Get an explanation of , or "index" for a list of help topics.`), rb) // HISTORY [] // e.g., HISTORY #ubuntu 10 -// HISTORY me 15 +// HISTORY alice 15 // HISTORY #darwin 1h func historyHandler(server *Server, client *Client, msg ircmsg.Message, rb *ResponseBuffer) bool { config := server.Config() diff --git a/irc/histserv.go b/irc/histserv.go index 3497463c..653e6822 100644 --- a/irc/histserv.go +++ b/irc/histserv.go @@ -9,7 +9,6 @@ import ( "os" "runtime/debug" "strconv" - "strings" "time" "github.com/oragono/oragono/irc/history" @@ -71,7 +70,7 @@ the request of the account holder.`, help: `Syntax: $bPLAY [limit]$b PLAY plays back history messages, rendering them into direct messages from -HistServ. 'target' is a channel name (or 'me' for direct messages), and 'limit' +HistServ. 'target' is a channel name or nickname to query, and 'limit' is a message count or a time duration. Note that message playback may be incomplete or degraded, relative to direct playback from /HISTORY or CHATHISTORY.`, @@ -206,11 +205,7 @@ func histservPlayHandler(service *ircService, server *Server, client *Client, co // handles parameter parsing and history queries for /HISTORY and /HISTSERV PLAY func easySelectHistory(server *Server, client *Client, params []string) (items []history.Item, channel *Channel, err error) { - target := params[0] - if strings.ToLower(target) == "me" { - target = "*" - } - channel, sequence, err := server.GetHistorySequence(nil, client, target) + channel, sequence, err := server.GetHistorySequence(nil, client, params[0]) if sequence == nil || err != nil { return nil, nil, errNoSuchChannel diff --git a/irc/mysql/history.go b/irc/mysql/history.go index 0e24cf36..c8ec2e5f 100644 --- a/irc/mysql/history.go +++ b/irc/mysql/history.go @@ -193,16 +193,25 @@ func (mysql *MySQL) createTables() (err error) { } _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE sequence ( + history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY, + target VARBINARY(%[1]d) NOT NULL, + nanotime BIGINT UNSIGNED NOT NULL, + KEY (target, nanotime) + ) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength)) + if err != nil { + return err + } + /* XXX: this table used to be: + CREATE TABLE sequence ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, target VARBINARY(%[1]d) NOT NULL, nanotime BIGINT UNSIGNED NOT NULL, history_id BIGINT NOT NULL, KEY (target, nanotime), KEY (history_id) - ) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength)) - if err != nil { - return err - } + ) CHARSET=ascii COLLATE=ascii_bin; + Some users may still be using the old schema. + */ _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE conversations ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -352,31 +361,32 @@ func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err err func (mysql *MySQL) selectCleanupIDs(ctx context.Context, age time.Duration) (ids []uint64, maxNanotime int64, err error) { rows, err := mysql.db.QueryContext(ctx, ` - SELECT history.id, sequence.nanotime + SELECT history.id, sequence.nanotime, conversations.nanotime FROM history LEFT JOIN sequence ON history.id = sequence.history_id + LEFT JOIN conversations on history.id = conversations.history_id ORDER BY history.id LIMIT ?;`, cleanupRowLimit) if err != nil { return } defer rows.Close() - // a history ID may have 0-2 rows in sequence: 1 for a channel entry, - // 2 for a DM, 0 if the data is inconsistent. therefore, deduplicate - // and delete anything that doesn't have a sequence entry: idset := make(map[uint64]struct{}, cleanupRowLimit) threshold := time.Now().Add(-age).UnixNano() for rows.Next() { var id uint64 - var nanotime sql.NullInt64 - err = rows.Scan(&id, &nanotime) + var seqNano, convNano sql.NullInt64 + err = rows.Scan(&id, &seqNano, &convNano) if err != nil { return } - if !nanotime.Valid || nanotime.Int64 < threshold { + nanotime := extractNanotime(seqNano, convNano) + // returns 0 if not found; in that case the data is inconsistent + // and we should delete the entry + if nanotime < threshold { idset[id] = struct{}{} - if nanotime.Valid && nanotime.Int64 > maxNanotime { - maxNanotime = nanotime.Int64 + if nanotime > maxNanotime { + maxNanotime = nanotime } } } @@ -675,10 +685,6 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient nanotime := item.Message.Time.UnixNano() if senderAccount != "" { - err = mysql.insertSequenceEntry(ctx, senderAccount, nanotime, id) - if err != nil { - return - } err = mysql.insertConversationEntry(ctx, senderAccount, recipient, nanotime, id) if err != nil { return @@ -690,10 +696,6 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient } if recipientAccount != "" && sender != recipient { - err = mysql.insertSequenceEntry(ctx, recipientAccount, nanotime, id) - if err != nil { - return - } err = mysql.insertConversationEntry(ctx, recipientAccount, sender, nanotime, id) if err != nil { return @@ -800,31 +802,24 @@ func (mysql *MySQL) Export(account string, writer io.Writer) { } func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) { - // in theory, we could optimize out a roundtrip to the database by using a subquery instead: - // sequence.nanotime > ( - // SELECT sequence.nanotime FROM sequence, history - // WHERE sequence.history_id = history.id AND history.msgid = ? - // LIMIT 1) - // however, this doesn't handle the BETWEEN case with one or two msgids, where we - // don't initially know whether the interval is going forwards or backwards. to simplify - // the logic, resolve msgids to timestamps "manually" in all cases, using a separate query. decoded, err := decodeMsgid(msgid) if err != nil { return } - cols := `sequence.nanotime` + cols := `sequence.nanotime, conversations.nanotime` if includeData { - cols = `sequence.nanotime, sequence.history_id, history.data` + cols = `sequence.nanotime, conversations.nanotime, history.id, history.data` } row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(` - SELECT %s FROM sequence - INNER JOIN history ON history.id = sequence.history_id + SELECT %s FROM history + LEFT JOIN sequence ON history.id = sequence.history_id + LEFT JOIN conversations ON history.id = conversations.history_id WHERE history.msgid = ? LIMIT 1;`, cols), decoded) - var nanotime int64 + var nanoSeq, nanoConv sql.NullInt64 if !includeData { - err = row.Scan(&nanotime) + err = row.Scan(&nanoSeq, &nanoConv) } else { - err = row.Scan(&nanotime, &id, &data) + err = row.Scan(&nanoSeq, &nanoConv, &id, &data) } if err != sql.ErrNoRows { mysql.logError("could not resolve msgid to time", err) @@ -832,11 +827,24 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b if err != nil { return } - + nanotime := extractNanotime(nanoSeq, nanoConv) + if nanotime == 0 { + err = sql.ErrNoRows + return + } result = time.Unix(0, nanotime).UTC() return } +func extractNanotime(seq, conv sql.NullInt64) (result int64) { + if seq.Valid { + return seq.Int64 + } else if conv.Valid { + return conv.Int64 + } + return +} + func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...interface{}) (results []history.Item, err error) { rows, err := mysql.db.QueryContext(ctx, query, args...) if mysql.logError("could not select history items", err) { diff --git a/irc/server.go b/irc/server.go index c35ed1a6..448f0a41 100644 --- a/irc/server.go +++ b/irc/server.go @@ -862,6 +862,10 @@ func (server *Server) setupListeners(config *Config) (err error) { // we may already know the channel we're querying, or we may have // to look it up via a string query. This function is responsible for // privilege checking. +// XXX: call this with providedChannel==nil and query=="" to get a sequence +// suitable for ListCorrespondents (i.e., this function is still used to +// decide whether the ringbuf or mysql is authoritative about the client's +// message history). func (server *Server) GetHistorySequence(providedChannel *Channel, client *Client, query string) (channel *Channel, sequence history.Sequence, err error) { config := server.Config() // 4 cases: {persistent, ephemeral} x {normal, conversation} @@ -901,7 +905,7 @@ func (server *Server) GetHistorySequence(providedChannel *Channel, client *Clien } } else { status, target = client.historyStatus(config) - if query != "*" { + if query != "" { correspondent, err = CasefoldName(query) if err != nil { return diff --git a/irc/znc.go b/irc/znc.go index 6f3ceae8..a9d22d7c 100644 --- a/irc/znc.go +++ b/irc/znc.go @@ -18,6 +18,8 @@ const ( zncPlaybackCommandExpiration = time.Second * 30 zncPrefix = "*playback!znc@znc.in" + + maxDMTargetsForAutoplay = 128 ) type zncCommandHandler func(client *Client, command string, params []string, rb *ResponseBuffer) @@ -150,9 +152,7 @@ func zncPlaybackPlayHandler(client *Client, command string, params []string, rb } else { targets = make(utils.StringSet) for _, targetName := range strings.Split(targetString, ",") { - if targetName == "*self" { - playPrivmsgs = true - } else if strings.HasPrefix(targetName, "#") { + if strings.HasPrefix(targetName, "#") { if cfTarget, err := CasefoldChannel(targetName); err == nil { targets.Add(cfTarget) } @@ -165,7 +165,7 @@ func zncPlaybackPlayHandler(client *Client, command string, params []string, rb } if playPrivmsgs { - zncPlayPrivmsgs(client, rb, "*", start, end) + zncPlayPrivmsgs(client, rb, "", start, end) } rb.session.zncPlaybackTimes = &zncPlaybackTimes{ @@ -188,13 +188,13 @@ func zncPlaybackPlayHandler(client *Client, command string, params []string, rb } } -func zncPlayPrivmsgs(client *Client, rb *ResponseBuffer, target string, after, before time.Time) { +func zncPlayPrivmsgs(client *Client, rb *ResponseBuffer, target string, start, end time.Time) { _, sequence, _ := client.server.GetHistorySequence(nil, client, target) if sequence == nil { return } zncMax := client.server.Config().History.ZNCMax - items, err := sequence.Between(history.Selector{Time: after}, history.Selector{Time: before}, zncMax) + items, err := client.privmsgsBetween(start, end, maxDMTargetsForAutoplay, zncMax) if err == nil && len(items) != 0 { client.replayPrivmsgHistory(rb, items, "") }