mirror of
https://github.com/ergochat/ergo.git
synced 2024-11-11 06:29:29 +01:00
fix #1615
Remove the CHATHISTORY * and znc.in/playback *self targets, clean up associated database code, add new mechanisms to play all missed DMs.
This commit is contained in:
parent
d2278faf75
commit
0d05ab4ff4
@ -989,16 +989,6 @@ func (session *Session) playResume() {
|
|||||||
oldestLostMessage = lastDiscarded
|
oldestLostMessage = lastDiscarded
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, privmsgSeq, _ := server.GetHistorySequence(nil, client, "*")
|
|
||||||
if privmsgSeq != nil {
|
|
||||||
privmsgs, _ := privmsgSeq.Between(history.Selector{}, history.Selector{}, config.History.ClientLength)
|
|
||||||
for _, item := range privmsgs {
|
|
||||||
sender := server.clients.Get(NUHToNick(item.Nick))
|
|
||||||
if sender != nil {
|
|
||||||
friends.Add(sender)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
timestamp := session.resumeDetails.Timestamp
|
timestamp := session.resumeDetails.Timestamp
|
||||||
gap := oldestLostMessage.Sub(timestamp)
|
gap := oldestLostMessage.Sub(timestamp)
|
||||||
@ -1054,7 +1044,8 @@ func (session *Session) playResume() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// replay direct PRIVSMG history
|
// replay direct PRIVSMG history
|
||||||
if !timestamp.IsZero() && privmsgSeq != nil {
|
_, privmsgSeq, err := server.GetHistorySequence(nil, client, "")
|
||||||
|
if !timestamp.IsZero() && err == nil && privmsgSeq != nil {
|
||||||
after := history.Selector{Time: timestamp}
|
after := history.Selector{Time: timestamp}
|
||||||
items, _ := privmsgSeq.Between(after, history.Selector{}, config.History.ZNCMax)
|
items, _ := privmsgSeq.Between(after, history.Selector{}, config.History.ZNCMax)
|
||||||
if len(items) != 0 {
|
if len(items) != 0 {
|
||||||
@ -1957,7 +1948,7 @@ func (client *Client) listTargets(start, end history.Selector, limit int) (resul
|
|||||||
extras = append(extras, persistentExtras...)
|
extras = append(extras, persistentExtras...)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, cSeq, err := client.server.GetHistorySequence(nil, client, "*")
|
_, cSeq, err := client.server.GetHistorySequence(nil, client, "")
|
||||||
if err == nil && cSeq != nil {
|
if err == nil && cSeq != nil {
|
||||||
correspondents, err := cSeq.ListCorrespondents(start, end, limit)
|
correspondents, err := cSeq.ListCorrespondents(start, end, limit)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -1969,6 +1960,31 @@ func (client *Client) listTargets(start, end history.Selector, limit int) (resul
|
|||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// latest PRIVMSG from all DM targets
|
||||||
|
func (client *Client) privmsgsBetween(startTime, endTime time.Time, targetLimit, messageLimit int) (results []history.Item, err error) {
|
||||||
|
start := history.Selector{Time: startTime}
|
||||||
|
end := history.Selector{Time: endTime}
|
||||||
|
targets, err := client.listTargets(start, end, targetLimit)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, target := range targets {
|
||||||
|
if strings.HasPrefix(target.CfName, "#") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, seq, err := client.server.GetHistorySequence(nil, client, target.CfName)
|
||||||
|
if err == nil && seq != nil {
|
||||||
|
items, err := seq.Between(start, end, messageLimit)
|
||||||
|
if err == nil {
|
||||||
|
results = append(results, items...)
|
||||||
|
} else {
|
||||||
|
client.server.logger.Error("internal", "error querying privmsg history", client.Nick(), target.CfName, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (client *Client) handleRegisterTimeout() {
|
func (client *Client) handleRegisterTimeout() {
|
||||||
client.Quit(fmt.Sprintf("Registration timeout: %v", RegisterTimeout), nil)
|
client.Quit(fmt.Sprintf("Registration timeout: %v", RegisterTimeout), nil)
|
||||||
client.destroy(nil)
|
client.destroy(nil)
|
||||||
|
@ -1080,7 +1080,7 @@ Get an explanation of <argument>, or "index" for a list of help topics.`), rb)
|
|||||||
|
|
||||||
// HISTORY <target> [<limit>]
|
// HISTORY <target> [<limit>]
|
||||||
// e.g., HISTORY #ubuntu 10
|
// e.g., HISTORY #ubuntu 10
|
||||||
// HISTORY me 15
|
// HISTORY alice 15
|
||||||
// HISTORY #darwin 1h
|
// HISTORY #darwin 1h
|
||||||
func historyHandler(server *Server, client *Client, msg ircmsg.Message, rb *ResponseBuffer) bool {
|
func historyHandler(server *Server, client *Client, msg ircmsg.Message, rb *ResponseBuffer) bool {
|
||||||
config := server.Config()
|
config := server.Config()
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/oragono/oragono/irc/history"
|
"github.com/oragono/oragono/irc/history"
|
||||||
@ -71,7 +70,7 @@ the request of the account holder.`,
|
|||||||
help: `Syntax: $bPLAY <target> [limit]$b
|
help: `Syntax: $bPLAY <target> [limit]$b
|
||||||
|
|
||||||
PLAY plays back history messages, rendering them into direct messages from
|
PLAY plays back history messages, rendering them into direct messages from
|
||||||
HistServ. 'target' is a channel name (or 'me' for direct messages), and 'limit'
|
HistServ. 'target' is a channel name or nickname to query, and 'limit'
|
||||||
is a message count or a time duration. Note that message playback may be
|
is a message count or a time duration. Note that message playback may be
|
||||||
incomplete or degraded, relative to direct playback from /HISTORY or
|
incomplete or degraded, relative to direct playback from /HISTORY or
|
||||||
CHATHISTORY.`,
|
CHATHISTORY.`,
|
||||||
@ -206,11 +205,7 @@ func histservPlayHandler(service *ircService, server *Server, client *Client, co
|
|||||||
|
|
||||||
// handles parameter parsing and history queries for /HISTORY and /HISTSERV PLAY
|
// handles parameter parsing and history queries for /HISTORY and /HISTSERV PLAY
|
||||||
func easySelectHistory(server *Server, client *Client, params []string) (items []history.Item, channel *Channel, err error) {
|
func easySelectHistory(server *Server, client *Client, params []string) (items []history.Item, channel *Channel, err error) {
|
||||||
target := params[0]
|
channel, sequence, err := server.GetHistorySequence(nil, client, params[0])
|
||||||
if strings.ToLower(target) == "me" {
|
|
||||||
target = "*"
|
|
||||||
}
|
|
||||||
channel, sequence, err := server.GetHistorySequence(nil, client, target)
|
|
||||||
|
|
||||||
if sequence == nil || err != nil {
|
if sequence == nil || err != nil {
|
||||||
return nil, nil, errNoSuchChannel
|
return nil, nil, errNoSuchChannel
|
||||||
|
@ -193,16 +193,25 @@ func (mysql *MySQL) createTables() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE sequence (
|
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE sequence (
|
||||||
|
history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
|
||||||
|
target VARBINARY(%[1]d) NOT NULL,
|
||||||
|
nanotime BIGINT UNSIGNED NOT NULL,
|
||||||
|
KEY (target, nanotime)
|
||||||
|
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
/* XXX: this table used to be:
|
||||||
|
CREATE TABLE sequence (
|
||||||
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||||
target VARBINARY(%[1]d) NOT NULL,
|
target VARBINARY(%[1]d) NOT NULL,
|
||||||
nanotime BIGINT UNSIGNED NOT NULL,
|
nanotime BIGINT UNSIGNED NOT NULL,
|
||||||
history_id BIGINT NOT NULL,
|
history_id BIGINT NOT NULL,
|
||||||
KEY (target, nanotime),
|
KEY (target, nanotime),
|
||||||
KEY (history_id)
|
KEY (history_id)
|
||||||
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
|
) CHARSET=ascii COLLATE=ascii_bin;
|
||||||
if err != nil {
|
Some users may still be using the old schema.
|
||||||
return err
|
*/
|
||||||
}
|
|
||||||
|
|
||||||
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE conversations (
|
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE conversations (
|
||||||
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||||
@ -352,31 +361,32 @@ func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err err
|
|||||||
|
|
||||||
func (mysql *MySQL) selectCleanupIDs(ctx context.Context, age time.Duration) (ids []uint64, maxNanotime int64, err error) {
|
func (mysql *MySQL) selectCleanupIDs(ctx context.Context, age time.Duration) (ids []uint64, maxNanotime int64, err error) {
|
||||||
rows, err := mysql.db.QueryContext(ctx, `
|
rows, err := mysql.db.QueryContext(ctx, `
|
||||||
SELECT history.id, sequence.nanotime
|
SELECT history.id, sequence.nanotime, conversations.nanotime
|
||||||
FROM history
|
FROM history
|
||||||
LEFT JOIN sequence ON history.id = sequence.history_id
|
LEFT JOIN sequence ON history.id = sequence.history_id
|
||||||
|
LEFT JOIN conversations on history.id = conversations.history_id
|
||||||
ORDER BY history.id LIMIT ?;`, cleanupRowLimit)
|
ORDER BY history.id LIMIT ?;`, cleanupRowLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
// a history ID may have 0-2 rows in sequence: 1 for a channel entry,
|
|
||||||
// 2 for a DM, 0 if the data is inconsistent. therefore, deduplicate
|
|
||||||
// and delete anything that doesn't have a sequence entry:
|
|
||||||
idset := make(map[uint64]struct{}, cleanupRowLimit)
|
idset := make(map[uint64]struct{}, cleanupRowLimit)
|
||||||
threshold := time.Now().Add(-age).UnixNano()
|
threshold := time.Now().Add(-age).UnixNano()
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var id uint64
|
var id uint64
|
||||||
var nanotime sql.NullInt64
|
var seqNano, convNano sql.NullInt64
|
||||||
err = rows.Scan(&id, &nanotime)
|
err = rows.Scan(&id, &seqNano, &convNano)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !nanotime.Valid || nanotime.Int64 < threshold {
|
nanotime := extractNanotime(seqNano, convNano)
|
||||||
|
// returns 0 if not found; in that case the data is inconsistent
|
||||||
|
// and we should delete the entry
|
||||||
|
if nanotime < threshold {
|
||||||
idset[id] = struct{}{}
|
idset[id] = struct{}{}
|
||||||
if nanotime.Valid && nanotime.Int64 > maxNanotime {
|
if nanotime > maxNanotime {
|
||||||
maxNanotime = nanotime.Int64
|
maxNanotime = nanotime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -675,10 +685,6 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient
|
|||||||
nanotime := item.Message.Time.UnixNano()
|
nanotime := item.Message.Time.UnixNano()
|
||||||
|
|
||||||
if senderAccount != "" {
|
if senderAccount != "" {
|
||||||
err = mysql.insertSequenceEntry(ctx, senderAccount, nanotime, id)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = mysql.insertConversationEntry(ctx, senderAccount, recipient, nanotime, id)
|
err = mysql.insertConversationEntry(ctx, senderAccount, recipient, nanotime, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -690,10 +696,6 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient
|
|||||||
}
|
}
|
||||||
|
|
||||||
if recipientAccount != "" && sender != recipient {
|
if recipientAccount != "" && sender != recipient {
|
||||||
err = mysql.insertSequenceEntry(ctx, recipientAccount, nanotime, id)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = mysql.insertConversationEntry(ctx, recipientAccount, sender, nanotime, id)
|
err = mysql.insertConversationEntry(ctx, recipientAccount, sender, nanotime, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -800,31 +802,24 @@ func (mysql *MySQL) Export(account string, writer io.Writer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) {
|
func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) {
|
||||||
// in theory, we could optimize out a roundtrip to the database by using a subquery instead:
|
|
||||||
// sequence.nanotime > (
|
|
||||||
// SELECT sequence.nanotime FROM sequence, history
|
|
||||||
// WHERE sequence.history_id = history.id AND history.msgid = ?
|
|
||||||
// LIMIT 1)
|
|
||||||
// however, this doesn't handle the BETWEEN case with one or two msgids, where we
|
|
||||||
// don't initially know whether the interval is going forwards or backwards. to simplify
|
|
||||||
// the logic, resolve msgids to timestamps "manually" in all cases, using a separate query.
|
|
||||||
decoded, err := decodeMsgid(msgid)
|
decoded, err := decodeMsgid(msgid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cols := `sequence.nanotime`
|
cols := `sequence.nanotime, conversations.nanotime`
|
||||||
if includeData {
|
if includeData {
|
||||||
cols = `sequence.nanotime, sequence.history_id, history.data`
|
cols = `sequence.nanotime, conversations.nanotime, history.id, history.data`
|
||||||
}
|
}
|
||||||
row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(`
|
row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(`
|
||||||
SELECT %s FROM sequence
|
SELECT %s FROM history
|
||||||
INNER JOIN history ON history.id = sequence.history_id
|
LEFT JOIN sequence ON history.id = sequence.history_id
|
||||||
|
LEFT JOIN conversations ON history.id = conversations.history_id
|
||||||
WHERE history.msgid = ? LIMIT 1;`, cols), decoded)
|
WHERE history.msgid = ? LIMIT 1;`, cols), decoded)
|
||||||
var nanotime int64
|
var nanoSeq, nanoConv sql.NullInt64
|
||||||
if !includeData {
|
if !includeData {
|
||||||
err = row.Scan(&nanotime)
|
err = row.Scan(&nanoSeq, &nanoConv)
|
||||||
} else {
|
} else {
|
||||||
err = row.Scan(&nanotime, &id, &data)
|
err = row.Scan(&nanoSeq, &nanoConv, &id, &data)
|
||||||
}
|
}
|
||||||
if err != sql.ErrNoRows {
|
if err != sql.ErrNoRows {
|
||||||
mysql.logError("could not resolve msgid to time", err)
|
mysql.logError("could not resolve msgid to time", err)
|
||||||
@ -832,11 +827,24 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
nanotime := extractNanotime(nanoSeq, nanoConv)
|
||||||
|
if nanotime == 0 {
|
||||||
|
err = sql.ErrNoRows
|
||||||
|
return
|
||||||
|
}
|
||||||
result = time.Unix(0, nanotime).UTC()
|
result = time.Unix(0, nanotime).UTC()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func extractNanotime(seq, conv sql.NullInt64) (result int64) {
|
||||||
|
if seq.Valid {
|
||||||
|
return seq.Int64
|
||||||
|
} else if conv.Valid {
|
||||||
|
return conv.Int64
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...interface{}) (results []history.Item, err error) {
|
func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...interface{}) (results []history.Item, err error) {
|
||||||
rows, err := mysql.db.QueryContext(ctx, query, args...)
|
rows, err := mysql.db.QueryContext(ctx, query, args...)
|
||||||
if mysql.logError("could not select history items", err) {
|
if mysql.logError("could not select history items", err) {
|
||||||
|
@ -862,6 +862,10 @@ func (server *Server) setupListeners(config *Config) (err error) {
|
|||||||
// we may already know the channel we're querying, or we may have
|
// we may already know the channel we're querying, or we may have
|
||||||
// to look it up via a string query. This function is responsible for
|
// to look it up via a string query. This function is responsible for
|
||||||
// privilege checking.
|
// privilege checking.
|
||||||
|
// XXX: call this with providedChannel==nil and query=="" to get a sequence
|
||||||
|
// suitable for ListCorrespondents (i.e., this function is still used to
|
||||||
|
// decide whether the ringbuf or mysql is authoritative about the client's
|
||||||
|
// message history).
|
||||||
func (server *Server) GetHistorySequence(providedChannel *Channel, client *Client, query string) (channel *Channel, sequence history.Sequence, err error) {
|
func (server *Server) GetHistorySequence(providedChannel *Channel, client *Client, query string) (channel *Channel, sequence history.Sequence, err error) {
|
||||||
config := server.Config()
|
config := server.Config()
|
||||||
// 4 cases: {persistent, ephemeral} x {normal, conversation}
|
// 4 cases: {persistent, ephemeral} x {normal, conversation}
|
||||||
@ -901,7 +905,7 @@ func (server *Server) GetHistorySequence(providedChannel *Channel, client *Clien
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
status, target = client.historyStatus(config)
|
status, target = client.historyStatus(config)
|
||||||
if query != "*" {
|
if query != "" {
|
||||||
correspondent, err = CasefoldName(query)
|
correspondent, err = CasefoldName(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
12
irc/znc.go
12
irc/znc.go
@ -18,6 +18,8 @@ const (
|
|||||||
zncPlaybackCommandExpiration = time.Second * 30
|
zncPlaybackCommandExpiration = time.Second * 30
|
||||||
|
|
||||||
zncPrefix = "*playback!znc@znc.in"
|
zncPrefix = "*playback!znc@znc.in"
|
||||||
|
|
||||||
|
maxDMTargetsForAutoplay = 128
|
||||||
)
|
)
|
||||||
|
|
||||||
type zncCommandHandler func(client *Client, command string, params []string, rb *ResponseBuffer)
|
type zncCommandHandler func(client *Client, command string, params []string, rb *ResponseBuffer)
|
||||||
@ -150,9 +152,7 @@ func zncPlaybackPlayHandler(client *Client, command string, params []string, rb
|
|||||||
} else {
|
} else {
|
||||||
targets = make(utils.StringSet)
|
targets = make(utils.StringSet)
|
||||||
for _, targetName := range strings.Split(targetString, ",") {
|
for _, targetName := range strings.Split(targetString, ",") {
|
||||||
if targetName == "*self" {
|
if strings.HasPrefix(targetName, "#") {
|
||||||
playPrivmsgs = true
|
|
||||||
} else if strings.HasPrefix(targetName, "#") {
|
|
||||||
if cfTarget, err := CasefoldChannel(targetName); err == nil {
|
if cfTarget, err := CasefoldChannel(targetName); err == nil {
|
||||||
targets.Add(cfTarget)
|
targets.Add(cfTarget)
|
||||||
}
|
}
|
||||||
@ -165,7 +165,7 @@ func zncPlaybackPlayHandler(client *Client, command string, params []string, rb
|
|||||||
}
|
}
|
||||||
|
|
||||||
if playPrivmsgs {
|
if playPrivmsgs {
|
||||||
zncPlayPrivmsgs(client, rb, "*", start, end)
|
zncPlayPrivmsgs(client, rb, "", start, end)
|
||||||
}
|
}
|
||||||
|
|
||||||
rb.session.zncPlaybackTimes = &zncPlaybackTimes{
|
rb.session.zncPlaybackTimes = &zncPlaybackTimes{
|
||||||
@ -188,13 +188,13 @@ func zncPlaybackPlayHandler(client *Client, command string, params []string, rb
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func zncPlayPrivmsgs(client *Client, rb *ResponseBuffer, target string, after, before time.Time) {
|
func zncPlayPrivmsgs(client *Client, rb *ResponseBuffer, target string, start, end time.Time) {
|
||||||
_, sequence, _ := client.server.GetHistorySequence(nil, client, target)
|
_, sequence, _ := client.server.GetHistorySequence(nil, client, target)
|
||||||
if sequence == nil {
|
if sequence == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
zncMax := client.server.Config().History.ZNCMax
|
zncMax := client.server.Config().History.ZNCMax
|
||||||
items, err := sequence.Between(history.Selector{Time: after}, history.Selector{Time: before}, zncMax)
|
items, err := client.privmsgsBetween(start, end, maxDMTargetsForAutoplay, zncMax)
|
||||||
if err == nil && len(items) != 0 {
|
if err == nil && len(items) != 0 {
|
||||||
client.replayPrivmsgHistory(rb, items, "")
|
client.replayPrivmsgHistory(rb, items, "")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user