From 67f35e5c8a18ec70af0557f4e92349a899118100 Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Tue, 12 May 2020 12:05:40 -0400 Subject: [PATCH] fix #858 and #383 --- conventional.yaml | 18 +- irc/accounts.go | 24 +++ irc/channel.go | 48 +++-- irc/chanserv.go | 6 +- irc/client.go | 2 +- irc/config.go | 11 ++ irc/handlers.go | 57 ++---- irc/history/history.go | 26 +++ irc/histserv.go | 242 +++++++++++++++++++++++++ irc/mysql/config.go | 3 +- irc/mysql/history.go | 391 ++++++++++++++++++++++++++++++++++++++--- irc/nickname.go | 2 +- irc/roleplay.go | 2 +- irc/server.go | 70 ++++++++ irc/services.go | 7 + oragono.yaml | 18 +- 16 files changed, 819 insertions(+), 108 deletions(-) create mode 100644 irc/histserv.go diff --git a/conventional.yaml b/conventional.yaml index 6fcdded1..255dd4e8 100644 --- a/conventional.yaml +++ b/conventional.yaml @@ -259,6 +259,10 @@ server: secure-nets: # - "10.0.0.0/8" + # oragono will write files to disk under certain circumstances, e.g., + # CPU profiling or data export. by default, these files will be written + # to the working directory. set this to customize: + # output-path: "/home/oragono/out" # account options accounts: @@ -556,6 +560,7 @@ oper-classes: - "samode" - "vhosts" - "chanreg" + - "history" # ircd operators opers: @@ -751,7 +756,8 @@ roleplay: # add the real nickname, in parentheses, to the end of every roleplay message? add-suffix: true -# message history tracking, for the RESUME extension and possibly other uses in future +# history message storage: this is used by CHATHISTORY, HISTORY, znc.in/playback, +# various autoreplay features, and the resume extension history: # should we store messages for later playback? # by default, messages are stored in RAM only; they do not persist @@ -820,3 +826,13 @@ history: # if you enable this, strict nickname reservation is strongly recommended # as well. direct-messages: "opt-out" + + # options to control how messages are stored and deleted: + retention: + # allow users to delete their own messages from history? + allow-individual-delete: false + + # if persistent history is enabled, create additional index tables, + # allowing deletion of JSON export of an account's messages. this + # may be needed for compliance with data privacy regulations. + enable-account-indexing: false diff --git a/irc/accounts.go b/irc/accounts.go index 99e1239a..d4ecd93c 100644 --- a/irc/accounts.go +++ b/irc/accounts.go @@ -1103,6 +1103,30 @@ func (am *AccountManager) LoadAccount(accountName string) (result ClientAccount, return } +// look up the unfolded version of an account name, possibly after deletion +func (am *AccountManager) AccountToAccountName(account string) (result string) { + casefoldedAccount, err := CasefoldName(account) + if err != nil { + return + } + + unregisteredKey := fmt.Sprintf(keyAccountUnregistered, casefoldedAccount) + accountNameKey := fmt.Sprintf(keyAccountName, casefoldedAccount) + + am.server.store.View(func(tx *buntdb.Tx) error { + if name, err := tx.Get(accountNameKey); err == nil { + result = name + return nil + } + if name, err := tx.Get(unregisteredKey); err == nil { + result = name + } + return nil + }) + + return +} + func (am *AccountManager) deserializeRawAccount(raw rawClientAccount, cfName string) (result ClientAccount, err error) { result.Name = raw.Name result.NameCasefolded = cfName diff --git a/irc/channel.go b/irc/channel.go index b0bb276b..8c689744 100644 --- a/irc/channel.go +++ b/irc/channel.go @@ -20,10 +20,6 @@ import ( "github.com/oragono/oragono/irc/utils" ) -const ( - histServMask = "HistServ!HistServ@localhost" -) - type ChannelSettings struct { History HistoryStatus } @@ -641,14 +637,14 @@ func channelHistoryStatus(config *Config, registered bool, storedStatus HistoryS } } -func (channel *Channel) AddHistoryItem(item history.Item) (err error) { +func (channel *Channel) AddHistoryItem(item history.Item, account string) (err error) { if !item.IsStorable() { return } status, target := channel.historyStatus(channel.server.Config()) if status == HistoryPersistent { - err = channel.server.historyDB.AddChannelItem(target, item) + err = channel.server.historyDB.AddChannelItem(target, item, account) } else if status == HistoryEphemeral { channel.history.Add(item) } @@ -746,7 +742,7 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp Message: message, } histItem.Params[0] = details.realname - channel.AddHistoryItem(histItem) + channel.AddHistoryItem(histItem, details.account) } client.addChannel(channel, rb == nil) @@ -902,7 +898,7 @@ func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer) Nick: details.nickMask, AccountName: details.accountName, Message: splitMessage, - }) + }, details.account) client.server.logger.Debug("part", fmt.Sprintf("%s left channel %s", details.nick, chname)) } @@ -1165,7 +1161,7 @@ func (channel *Channel) SetTopic(client *Client, topic string, rb *ResponseBuffe Nick: details.nickMask, AccountName: details.accountName, Message: message, - }) + }, details.account) channel.MarkDirty(IncludeTopic) } @@ -1222,8 +1218,7 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod return } - nickmask := client.NickMaskString() - account := client.AccountName() + details := client.Details() chname := channel.Name() // STATUSMSG targets are prefixed with the supplied min-prefix, e.g., @#channel @@ -1238,9 +1233,9 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod tagsToUse = clientOnlyTags } if histType == history.Tagmsg && rb.session.capabilities.Has(caps.MessageTags) { - rb.AddFromClient(message.Time, message.Msgid, nickmask, account, tagsToUse, command, chname) + rb.AddFromClient(message.Time, message.Msgid, details.nickMask, details.accountName, tagsToUse, command, chname) } else { - rb.AddSplitMessageFromClient(nickmask, account, tagsToUse, command, chname, message) + rb.AddSplitMessageFromClient(details.nickMask, details.accountName, tagsToUse, command, chname, message) } } // send echo-message to other connected sessions @@ -1253,9 +1248,9 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod tagsToUse = clientOnlyTags } if histType == history.Tagmsg && session.capabilities.Has(caps.MessageTags) { - session.sendFromClientInternal(false, message.Time, message.Msgid, nickmask, account, tagsToUse, command, chname) + session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, tagsToUse, command, chname) } else if histType != history.Tagmsg { - session.sendSplitMsgFromClientInternal(false, nickmask, account, tagsToUse, command, chname, message) + session.sendSplitMsgFromClientInternal(false, details.nickMask, details.accountName, tagsToUse, command, chname, message) } } @@ -1282,9 +1277,9 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod } if histType == history.Tagmsg { - session.sendFromClientInternal(false, message.Time, message.Msgid, nickmask, account, tagsToUse, command, chname) + session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, tagsToUse, command, chname) } else { - session.sendSplitMsgFromClientInternal(false, nickmask, account, tagsToUse, command, chname, message) + session.sendSplitMsgFromClientInternal(false, details.nickMask, details.accountName, tagsToUse, command, chname, message) } } } @@ -1294,10 +1289,10 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod channel.AddHistoryItem(history.Item{ Type: histType, Message: message, - Nick: nickmask, - AccountName: account, + Nick: details.nickMask, + AccountName: details.accountName, Tags: clientOnlyTags, - }) + }, details.account) } } @@ -1391,28 +1386,27 @@ func (channel *Channel) Kick(client *Client, target *Client, comment string, rb } message := utils.MakeMessage(comment) - clientMask := client.NickMaskString() - clientAccount := client.AccountName() + details := client.Details() targetNick := target.Nick() chname := channel.Name() for _, member := range channel.Members() { for _, session := range member.Sessions() { if session != rb.session { - session.sendFromClientInternal(false, message.Time, message.Msgid, clientMask, clientAccount, nil, "KICK", chname, targetNick, comment) + session.sendFromClientInternal(false, message.Time, message.Msgid, details.nickMask, details.accountName, nil, "KICK", chname, targetNick, comment) } } } - rb.Add(nil, clientMask, "KICK", chname, targetNick, comment) + rb.AddFromClient(message.Time, message.Msgid, details.nickMask, details.accountName, nil, "KICK", chname, targetNick, comment) histItem := history.Item{ Type: history.Kick, - Nick: clientMask, - AccountName: target.AccountName(), + Nick: details.nickMask, + AccountName: details.accountName, Message: message, } histItem.Params[0] = targetNick - channel.AddHistoryItem(histItem) + channel.AddHistoryItem(histItem, details.account) channel.Quit(target) } diff --git a/irc/chanserv.go b/irc/chanserv.go index ce225da7..b61d5756 100644 --- a/irc/chanserv.go +++ b/irc/chanserv.go @@ -255,7 +255,7 @@ func csAmodeHandler(server *Server, client *Client, command string, params []str if member.Account() == change.Arg { applied, change := channel.applyModeToMember(client, change, rb) if applied { - announceCmodeChanges(channel, modes.ModeChanges{change}, chanservMask, "*", rb) + announceCmodeChanges(channel, modes.ModeChanges{change}, chanservMask, "*", "", rb) } } } @@ -302,7 +302,7 @@ func csOpHandler(server *Server, client *Client, command string, params []string }, rb) if applied { - announceCmodeChanges(channelInfo, modes.ModeChanges{change}, chanservMask, "*", rb) + announceCmodeChanges(channelInfo, modes.ModeChanges{change}, chanservMask, "*", "", rb) } csNotice(rb, fmt.Sprintf(client.t("Successfully op'd in channel %s"), channelName)) @@ -354,7 +354,7 @@ func csRegisterHandler(server *Server, client *Client, command string, params [] }, rb) if applied { - announceCmodeChanges(channelInfo, modes.ModeChanges{change}, chanservMask, "*", rb) + announceCmodeChanges(channelInfo, modes.ModeChanges{change}, chanservMask, "*", "", rb) } } diff --git a/irc/client.go b/irc/client.go index 57d839d6..8465fa6b 100644 --- a/irc/client.go +++ b/irc/client.go @@ -1277,7 +1277,7 @@ func (client *Client) destroy(session *Session) { // use a defer here to avoid writing to mysql while holding the destroy semaphore: defer func() { for _, channel := range channels { - channel.AddHistoryItem(quitItem) + channel.AddHistoryItem(quitItem, details.account) } }() diff --git a/irc/config.go b/irc/config.go index 982afc3a..228bdeb3 100644 --- a/irc/config.go +++ b/irc/config.go @@ -13,6 +13,7 @@ import ( "log" "net" "os" + "path/filepath" "regexp" "sort" "strconv" @@ -511,6 +512,7 @@ type Config struct { supportedCaps *caps.Set capValues caps.Values Casemapping Casemapping + OutputPath string `yaml:"output-path"` } Roleplay struct { @@ -590,6 +592,10 @@ type Config struct { RegisteredChannels PersistentStatus `yaml:"registered-channels"` DirectMessages PersistentStatus `yaml:"direct-messages"` } + Retention struct { + AllowIndividualDelete bool `yaml:"allow-individual-delete"` + EnableAccountIndexing bool `yaml:"enable-account-indexing"` + } } Filename string @@ -1111,6 +1117,7 @@ func LoadConfig(filename string) (config *Config, err error) { config.Roleplay.addSuffix = utils.BoolDefaultTrue(config.Roleplay.AddSuffix) config.Datastore.MySQL.ExpireTime = time.Duration(config.History.Restrictions.ExpireTime) + config.Datastore.MySQL.TrackAccountMessages = config.History.Retention.EnableAccountIndexing config.Server.Cloaks.Initialize() if config.Server.Cloaks.Enabled { @@ -1133,6 +1140,10 @@ func LoadConfig(filename string) (config *Config, err error) { return config, nil } +func (config *Config) getOutputPath(filename string) string { + return filepath.Join(config.Server.OutputPath, filename) +} + // setISupport sets up our RPL_ISUPPORT reply. func (config *Config) generateISupport() (err error) { maxTargetsString := strconv.Itoa(maxTargets) diff --git a/irc/handlers.go b/irc/handlers.go index 64d16cde..995172ec 100644 --- a/irc/handlers.go +++ b/irc/handlers.go @@ -677,7 +677,7 @@ func debugHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *Res rb.Notice(fmt.Sprintf("num goroutines: %d", count)) case "PROFILEHEAP": - profFile := "oragono.mprof" + profFile := server.Config().getOutputPath("oragono.mprof") file, err := os.Create(profFile) if err != nil { rb.Notice(fmt.Sprintf("error: %s", err)) @@ -688,7 +688,7 @@ func debugHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *Res rb.Notice(fmt.Sprintf("written to %s", profFile)) case "STARTCPUPROFILE": - profFile := "oragono.prof" + profFile := server.Config().getOutputPath("oragono.prof") file, err := os.Create(profFile) if err != nil { rb.Notice(fmt.Sprintf("error: %s", err)) @@ -935,50 +935,17 @@ func historyHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *R return false } - target := msg.Params[0] - if strings.ToLower(target) == "me" { - target = "*" - } - channel, sequence, err := server.GetHistorySequence(nil, client, target) + items, channel, err := easySelectHistory(server, client, msg.Params) - if sequence == nil || err != nil { - // whatever - rb.Add(nil, server.name, ERR_NOSUCHCHANNEL, client.Nick(), utils.SafeErrorParam(target), client.t("No such channel")) + if err == errNoSuchChannel { + rb.Add(nil, server.name, ERR_NOSUCHCHANNEL, client.Nick(), utils.SafeErrorParam(msg.Params[0]), client.t("No such channel")) + return false + } else if err != nil { + rb.Add(nil, server.name, ERR_UNKNOWNERROR, client.Nick(), msg.Command, client.t("Could not retrieve history")) return false } - var duration time.Duration - maxChathistoryLimit := config.History.ChathistoryMax - limit := 100 - if maxChathistoryLimit < limit { - limit = maxChathistoryLimit - } - if len(msg.Params) > 1 { - providedLimit, err := strconv.Atoi(msg.Params[1]) - if err == nil && providedLimit != 0 { - limit = providedLimit - if maxChathistoryLimit < limit { - limit = maxChathistoryLimit - } - } else if err != nil { - duration, err = time.ParseDuration(msg.Params[1]) - if err == nil { - limit = maxChathistoryLimit - } - } - } - - var items []history.Item - if duration == 0 { - items, _, err = sequence.Between(history.Selector{}, history.Selector{}, limit) - } else { - now := time.Now().UTC() - start := history.Selector{Time: now} - end := history.Selector{Time: now.Add(-duration)} - items, _, err = sequence.Between(start, end, limit) - } - - if err == nil && len(items) != 0 { + if len(items) != 0 { if channel != nil { channel.replayHistoryItems(rb, items, false) } else { @@ -1530,12 +1497,12 @@ func cmodeHandler(server *Server, client *Client, msg ircmsg.IrcMessage, rb *Res // process mode changes, include list operations (an empty set of changes does a list) applied := channel.ApplyChannelModeChanges(client, msg.Command == "SAMODE", changes, rb) details := client.Details() - announceCmodeChanges(channel, applied, details.nickMask, details.accountName, rb) + announceCmodeChanges(channel, applied, details.nickMask, details.accountName, details.account, rb) return false } -func announceCmodeChanges(channel *Channel, applied modes.ModeChanges, source, accountName string, rb *ResponseBuffer) { +func announceCmodeChanges(channel *Channel, applied modes.ModeChanges, source, accountName, account string, rb *ResponseBuffer) { // send out changes if len(applied) > 0 { message := utils.MakeMessage("") @@ -1557,7 +1524,7 @@ func announceCmodeChanges(channel *Channel, applied modes.ModeChanges, source, a Nick: source, AccountName: accountName, Message: message, - }) + }, account) } } diff --git a/irc/history/history.go b/irc/history/history.go index 5a1ece64..bc5b45ed 100644 --- a/irc/history/history.go +++ b/irc/history/history.go @@ -284,6 +284,32 @@ func (list *Buffer) matchInternal(predicate Predicate, ascending bool, limit int return } +// Delete deletes messages matching some predicate. +func (list *Buffer) Delete(predicate Predicate) (count int) { + list.Lock() + defer list.Unlock() + + if list.start == -1 || len(list.buffer) == 0 { + return + } + + pos := list.start + stop := list.prev(list.end) + + for { + if predicate(&list.buffer[pos]) { + list.buffer[pos] = Item{} + count++ + } + if pos == stop { + break + } + pos = list.next(pos) + } + + return +} + // latest returns the items most recently added, up to `limit`. If `limit` is 0, // it returns all items. func (list *Buffer) latest(limit int) (results []Item) { diff --git a/irc/histserv.go b/irc/histserv.go new file mode 100644 index 00000000..11099d94 --- /dev/null +++ b/irc/histserv.go @@ -0,0 +1,242 @@ +// Copyright (c) 2020 Shivaram Lingamneni +// released under the MIT license + +package irc + +import ( + "bufio" + "fmt" + "os" + "runtime/debug" + "strconv" + "strings" + "time" + + "github.com/oragono/oragono/irc/history" +) + +const ( + histservHelp = `HistServ provides commands related to history.` + histServMask = "HistServ!HistServ@localhost" +) + +func histservEnabled(config *Config) bool { + return config.History.Enabled +} + +func historyComplianceEnabled(config *Config) bool { + return config.History.Enabled && config.History.Persistent.Enabled && config.History.Retention.EnableAccountIndexing +} + +var ( + histservCommands = map[string]*serviceCommand{ + "forget": { + handler: histservForgetHandler, + help: `Syntax: $bFORGET $b + +FORGET deletes all history messages sent by an account.`, + helpShort: `$bFORGET$b deletes all history messages sent by an account.`, + capabs: []string{"history"}, + enabled: histservEnabled, + minParams: 1, + maxParams: 1, + }, + "delete": { + handler: histservDeleteHandler, + help: `Syntax: $bDELETE [target] $b + +DELETE deletes an individual message by its msgid. The target is a channel +name or nickname; depending on the history implementation, this may or may not +be necessary to locate the message.`, + helpShort: `$bDELETE$b deletes an individual message by its msgid.`, + enabled: histservEnabled, + minParams: 1, + maxParams: 2, + }, + "export": { + handler: histservExportHandler, + help: `Syntax: $bEXPORT $b + +EXPORT exports all messages sent by an account as JSON. This can be used +for regulatory compliance, e.g., article 15 of the GDPR.`, + helpShort: `$bEXPORT$b exports all messages sent by an account as JSON.`, + enabled: historyComplianceEnabled, + capabs: []string{"history"}, + minParams: 1, + maxParams: 1, + }, + "play": { + handler: histservPlayHandler, + help: `Syntax: $bPLAY [limit]$b + +PLAY plays back history messages, rendering them into direct messages from +HistServ. 'target' is a channel name (or 'me' for direct messages), and 'limit' +is a message count or a time duration. Note that message playback may be +incomplete or degraded, relative to direct playback from /HISTORY or +CHATHISTORY.`, + helpShort: `$bPLAY$b plays back history messages.`, + enabled: histservEnabled, + minParams: 1, + maxParams: 2, + }, + } +) + +// histNotice sends the client a notice from HistServ +func histNotice(rb *ResponseBuffer, text string) { + rb.Add(nil, histServMask, "NOTICE", rb.target.Nick(), text) +} + +func histservForgetHandler(server *Server, client *Client, command string, params []string, rb *ResponseBuffer) { + accountName := server.accounts.AccountToAccountName(params[0]) + if accountName == "" { + histNotice(rb, client.t("Could not look up account name, proceeding anyway")) + accountName = params[0] + } + + server.ForgetHistory(accountName) + + histNotice(rb, fmt.Sprintf(client.t("Enqueued account %s for message deletion"), accountName)) +} + +func histservDeleteHandler(server *Server, client *Client, command string, params []string, rb *ResponseBuffer) { + var target, msgid string + if len(params) == 1 { + msgid = params[0] + } else { + target, msgid = params[0], params[1] + } + + accountName := "*" + hasPrivs := client.HasRoleCapabs("history") + if !hasPrivs { + accountName = client.AccountName() + if !(server.Config().History.Retention.AllowIndividualDelete && accountName != "*") { + hsNotice(rb, client.t("Insufficient privileges")) + return + } + } + + err := server.DeleteMessage(target, msgid, accountName) + if err == nil { + hsNotice(rb, client.t("Successfully deleted message")) + } else { + if hasPrivs { + hsNotice(rb, fmt.Sprintf(client.t("Error deleting message: %v"), err)) + } else { + hsNotice(rb, client.t("Could not delete message")) + } + } +} + +func histservExportHandler(server *Server, client *Client, command string, params []string, rb *ResponseBuffer) { + cfAccount, err := CasefoldName(params[0]) + if err != nil { + histNotice(rb, client.t("Invalid account name")) + return + } + + config := server.Config() + filename := fmt.Sprintf("%s@%s.json", cfAccount, time.Now().UTC().Format(IRCv3TimestampFormat)) + pathname := config.getOutputPath(filename) + outfile, err := os.Create(pathname) + if err != nil { + hsNotice(rb, fmt.Sprintf(client.t("Error opening export file: %v"), err)) + } else { + hsNotice(rb, fmt.Sprintf(client.t("Started exporting account data to file %s"), pathname)) + } + + go histservExportAndNotify(server, cfAccount, outfile, client.Nick()) +} + +func histservExportAndNotify(server *Server, cfAccount string, outfile *os.File, alertNick string) { + defer func() { + if r := recover(); r != nil { + server.logger.Error("history", + fmt.Sprintf("Panic in history export routine: %v\n%s", r, debug.Stack())) + } + }() + + defer outfile.Close() + writer := bufio.NewWriter(outfile) + defer writer.Flush() + + server.historyDB.Export(cfAccount, writer) + + client := server.clients.Get(alertNick) + if client != nil && client.HasRoleCapabs("history") { + client.Send(nil, histServMask, "NOTICE", client.Nick(), fmt.Sprintf(client.t("Data export for %[1]s completed and written to %[2]s"), cfAccount, outfile.Name())) + } +} + +func histservPlayHandler(server *Server, client *Client, command string, params []string, rb *ResponseBuffer) { + items, _, err := easySelectHistory(server, client, params) + if err != nil { + hsNotice(rb, client.t("Could not retrieve history")) + return + } + + playMessage := func(timestamp time.Time, nick, message string) { + hsNotice(rb, fmt.Sprintf("%s <%s> %s", timestamp.Format("15:04:05"), stripMaskFromNick(nick), message)) + } + + for _, item := range items { + // TODO: support a few more of these, maybe JOIN/PART/QUIT + if item.Type != history.Privmsg && item.Type != history.Notice { + continue + } + if len(item.Message.Split) == 0 { + playMessage(item.Message.Time, item.Nick, item.Message.Message) + } else { + for _, pair := range item.Message.Split { + playMessage(item.Message.Time, item.Nick, pair.Message) + } + } + } + + hsNotice(rb, client.t("End of history playback")) +} + +// handles parameter parsing and history queries for /HISTORY and /HISTSERV PLAY +func easySelectHistory(server *Server, client *Client, params []string) (items []history.Item, channel *Channel, err error) { + target := params[0] + if strings.ToLower(target) == "me" { + target = "*" + } + channel, sequence, err := server.GetHistorySequence(nil, client, target) + + if sequence == nil || err != nil { + return nil, nil, errNoSuchChannel + } + + var duration time.Duration + maxChathistoryLimit := server.Config().History.ChathistoryMax + limit := 100 + if maxChathistoryLimit < limit { + limit = maxChathistoryLimit + } + if len(params) > 1 { + providedLimit, err := strconv.Atoi(params[1]) + if err == nil && providedLimit != 0 { + limit = providedLimit + if maxChathistoryLimit < limit { + limit = maxChathistoryLimit + } + } else if err != nil { + duration, err = time.ParseDuration(params[1]) + if err == nil { + limit = maxChathistoryLimit + } + } + } + + if duration == 0 { + items, _, err = sequence.Between(history.Selector{}, history.Selector{}, limit) + } else { + now := time.Now().UTC() + start := history.Selector{Time: now} + end := history.Selector{Time: now.Add(-duration)} + items, _, err = sequence.Between(start, end, limit) + } + return +} diff --git a/irc/mysql/config.go b/irc/mysql/config.go index c9e39437..c4a19ae5 100644 --- a/irc/mysql/config.go +++ b/irc/mysql/config.go @@ -18,5 +18,6 @@ type Config struct { Timeout time.Duration // XXX these are copied from elsewhere in the config: - ExpireTime time.Duration + ExpireTime time.Duration + TrackAccountMessages bool } diff --git a/irc/mysql/history.go b/irc/mysql/history.go index 0081ccce..b1b86a23 100644 --- a/irc/mysql/history.go +++ b/irc/mysql/history.go @@ -7,7 +7,10 @@ import ( "bytes" "context" "database/sql" + "encoding/json" + "errors" "fmt" + "io" "runtime/debug" "sync" "sync/atomic" @@ -19,6 +22,10 @@ import ( "github.com/oragono/oragono/irc/utils" ) +var ( + ErrDisallowed = errors.New("disallowed") +) + const ( // maximum length in bytes of any message target (nickname or channel name) in its // canonicalized (i.e., casefolded) state: @@ -27,30 +34,46 @@ const ( // latest schema of the db latestDbSchema = "2" keySchemaVersion = "db.version" - cleanupRowLimit = 50 - cleanupPauseTime = 10 * time.Minute + // minor version indicates rollback-safe upgrades, i.e., + // you can downgrade oragono and everything will work + latestDbMinorVersion = "1" + keySchemaMinorVersion = "db.minorversion" + cleanupRowLimit = 50 + cleanupPauseTime = 10 * time.Minute ) -type MySQL struct { - timeout int64 - db *sql.DB - logger *logger.Manager +type e struct{} - insertHistory *sql.Stmt - insertSequence *sql.Stmt - insertConversation *sql.Stmt +type MySQL struct { + timeout int64 + trackAccountMessages uint32 + db *sql.DB + logger *logger.Manager + + insertHistory *sql.Stmt + insertSequence *sql.Stmt + insertConversation *sql.Stmt + insertAccountMessage *sql.Stmt stateMutex sync.Mutex config Config + + wakeForgetter chan e } func (mysql *MySQL) Initialize(logger *logger.Manager, config Config) { mysql.logger = logger + mysql.wakeForgetter = make(chan e, 1) mysql.SetConfig(config) } func (mysql *MySQL) SetConfig(config Config) { atomic.StoreInt64(&mysql.timeout, int64(config.Timeout)) + var trackAccountMessages uint32 + if config.TrackAccountMessages { + trackAccountMessages = 1 + } + atomic.StoreUint32(&mysql.trackAccountMessages, trackAccountMessages) mysql.stateMutex.Lock() mysql.config = config mysql.stateMutex.Unlock() @@ -85,6 +108,7 @@ func (m *MySQL) Open() (err error) { } go m.cleanupLoop() + go m.forgetLoop() return nil } @@ -109,14 +133,35 @@ func (mysql *MySQL) fixSchemas() (err error) { if err != nil { return } + _, err = mysql.db.Exec(`insert into metadata (key_name, value) values (?, ?);`, keySchemaMinorVersion, latestDbMinorVersion) + if err != nil { + return + } + return } else if err == nil && schema != latestDbSchema { // TODO figure out what to do about schema changes return &utils.IncompatibleSchemaError{CurrentVersion: schema, RequiredVersion: latestDbSchema} - } else { + } else if err != nil { return err } - return nil + var minorVersion string + err = mysql.db.QueryRow(`select value from metadata where key_name = ?;`, keySchemaMinorVersion).Scan(&minorVersion) + if err == sql.ErrNoRows { + // XXX for now, the only minor version upgrade is the account tracking tables + err = mysql.createComplianceTables() + if err != nil { + return + } + _, err = mysql.db.Exec(`insert into metadata (key_name, value) values (?, ?);`, keySchemaMinorVersion, latestDbMinorVersion) + if err != nil { + return + } + } else if err == nil && minorVersion != latestDbMinorVersion { + // TODO: if minorVersion < latestDbMinorVersion, upgrade, + // if latestDbMinorVersion < minorVersion, ignore because backwards compatible + } + return } func (mysql *MySQL) createTables() (err error) { @@ -155,6 +200,32 @@ func (mysql *MySQL) createTables() (err error) { return err } + err = mysql.createComplianceTables() + if err != nil { + return err + } + + return nil +} + +func (mysql *MySQL) createComplianceTables() (err error) { + _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE account_messages ( + history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY, + account VARBINARY(%[1]d) NOT NULL, + KEY (account, history_id) + ) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength)) + if err != nil { + return err + } + + _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE forget ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + account VARBINARY(%[1]d) NOT NULL + ) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength)) + if err != nil { + return err + } + return nil } @@ -191,7 +262,10 @@ func (mysql *MySQL) cleanupLoop() { } func (mysql *MySQL) doCleanup(age time.Duration) (count int, err error) { - ids, maxNanotime, err := mysql.selectCleanupIDs(age) + ctx, cancel := context.WithTimeout(context.Background(), cleanupPauseTime) + defer cancel() + + ids, maxNanotime, err := mysql.selectCleanupIDs(ctx, age) if len(ids) == 0 { mysql.logger.Debug("mysql", "found no rows to clean up") return @@ -199,6 +273,10 @@ func (mysql *MySQL) doCleanup(age time.Duration) (count int, err error) { mysql.logger.Debug("mysql", fmt.Sprintf("deleting %d history rows, max age %s", len(ids), utils.NanoToTimestamp(maxNanotime))) + return len(ids), mysql.deleteHistoryIDs(ctx, ids) +} + +func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err error) { // can't use ? binding for a variable number of arguments, build the IN clause manually var inBuf bytes.Buffer inBuf.WriteByte('(') @@ -210,25 +288,30 @@ func (mysql *MySQL) doCleanup(age time.Duration) (count int, err error) { } inBuf.WriteRune(')') - _, err = mysql.db.Exec(fmt.Sprintf(`DELETE FROM conversations WHERE history_id in %s;`, inBuf.Bytes())) + _, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM conversations WHERE history_id in %s;`, inBuf.Bytes())) if err != nil { return } - _, err = mysql.db.Exec(fmt.Sprintf(`DELETE FROM sequence WHERE history_id in %s;`, inBuf.Bytes())) + _, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM sequence WHERE history_id in %s;`, inBuf.Bytes())) if err != nil { return } - _, err = mysql.db.Exec(fmt.Sprintf(`DELETE FROM history WHERE id in %s;`, inBuf.Bytes())) + if mysql.isTrackingAccountMessages() { + _, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM account_messages WHERE history_id in %s;`, inBuf.Bytes())) + if err != nil { + return + } + } + _, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM history WHERE id in %s;`, inBuf.Bytes())) if err != nil { return } - count = len(ids) return } -func (mysql *MySQL) selectCleanupIDs(age time.Duration) (ids []uint64, maxNanotime int64, err error) { - rows, err := mysql.db.Query(` +func (mysql *MySQL) selectCleanupIDs(ctx context.Context, age time.Duration) (ids []uint64, maxNanotime int64, err error) { + rows, err := mysql.db.QueryContext(ctx, ` SELECT history.id, sequence.nanotime FROM history LEFT JOIN sequence ON history.id = sequence.history_id @@ -266,6 +349,109 @@ func (mysql *MySQL) selectCleanupIDs(age time.Duration) (ids []uint64, maxNanoti return } +// wait for forget queue items and process them one by one +func (mysql *MySQL) forgetLoop() { + defer func() { + if r := recover(); r != nil { + mysql.logger.Error("mysql", + fmt.Sprintf("Panic in forget routine: %v\n%s", r, debug.Stack())) + time.Sleep(cleanupPauseTime) + go mysql.forgetLoop() + } + }() + + for { + for { + found, err := mysql.doForget() + mysql.logError("error processing forget", err) + if err != nil { + time.Sleep(cleanupPauseTime) + } + if !found { + break + } + } + + <-mysql.wakeForgetter + } +} + +// dequeue an item from the forget queue and process it +func (mysql *MySQL) doForget() (found bool, err error) { + id, account, err := func() (id int64, account string, err error) { + ctx, cancel := context.WithTimeout(context.Background(), cleanupPauseTime) + defer cancel() + + row := mysql.db.QueryRowContext(ctx, + `SELECT forget.id, forget.account FROM forget LIMIT 1;`) + err = row.Scan(&id, &account) + if err == sql.ErrNoRows { + return 0, "", nil + } + return + }() + + if err != nil || account == "" { + return false, err + } + + found = true + + var count int + for { + start := time.Now() + count, err = mysql.doForgetIteration(account) + elapsed := time.Since(start) + if err != nil { + return true, err + } + if count == 0 { + break + } + time.Sleep(elapsed) + } + + mysql.logger.Debug("mysql", "forget complete for account", account) + + ctx, cancel := context.WithTimeout(context.Background(), cleanupPauseTime) + defer cancel() + _, err = mysql.db.ExecContext(ctx, `DELETE FROM forget where id = ?;`, id) + return +} + +func (mysql *MySQL) doForgetIteration(account string) (count int, err error) { + ctx, cancel := context.WithTimeout(context.Background(), cleanupPauseTime) + defer cancel() + + rows, err := mysql.db.QueryContext(ctx, ` + SELECT account_messages.history_id + FROM account_messages + WHERE account_messages.account = ? + LIMIT ?;`, account, cleanupRowLimit) + if err != nil { + return + } + defer rows.Close() + + var ids []uint64 + for rows.Next() { + var id uint64 + err = rows.Scan(&id) + if err != nil { + return + } + ids = append(ids, id) + } + + if len(ids) == 0 { + return + } + + mysql.logger.Debug("mysql", fmt.Sprintf("deleting %d history rows from account %s", len(ids), account)) + err = mysql.deleteHistoryIDs(ctx, ids) + return len(ids), err +} + func (mysql *MySQL) prepareStatements() (err error) { mysql.insertHistory, err = mysql.db.Prepare(`INSERT INTO history (data, msgid) VALUES (?, ?);`) @@ -282,6 +468,11 @@ func (mysql *MySQL) prepareStatements() (err error) { if err != nil { return } + mysql.insertAccountMessage, err = mysql.db.Prepare(`INSERT INTO account_messages + (history_id, account) VALUES (?, ?);`) + if err != nil { + return + } return } @@ -290,6 +481,10 @@ func (mysql *MySQL) getTimeout() time.Duration { return time.Duration(atomic.LoadInt64(&mysql.timeout)) } +func (mysql *MySQL) isTrackingAccountMessages() bool { + return atomic.LoadUint32(&mysql.trackAccountMessages) != 0 +} + func (mysql *MySQL) logError(context string, err error) (quit bool) { if err != nil { mysql.logger.Error("mysql", context, err.Error()) @@ -298,7 +493,27 @@ func (mysql *MySQL) logError(context string, err error) (quit bool) { return false } -func (mysql *MySQL) AddChannelItem(target string, item history.Item) (err error) { +func (mysql *MySQL) Forget(account string) { + if mysql.db == nil || account == "" { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), mysql.getTimeout()) + defer cancel() + + _, err := mysql.db.ExecContext(ctx, `INSERT INTO forget (account) VALUES (?);`, account) + if mysql.logError("can't insert into forget table", err) { + return + } + + // wake up the forget goroutine if it's blocked: + select { + case mysql.wakeForgetter <- e{}: + default: + } +} + +func (mysql *MySQL) AddChannelItem(target string, item history.Item, account string) (err error) { if mysql.db == nil { return } @@ -316,6 +531,15 @@ func (mysql *MySQL) AddChannelItem(target string, item history.Item) (err error) } err = mysql.insertSequenceEntry(ctx, target, item.Message.Time.UnixNano(), id) + if err != nil { + return + } + + err = mysql.insertAccountMessageEntry(ctx, id, account) + if err != nil { + return + } + return } @@ -354,6 +578,15 @@ func (mysql *MySQL) insertBase(ctx context.Context, item history.Item) (id int64 return } +func (mysql *MySQL) insertAccountMessageEntry(ctx context.Context, id int64, account string) (err error) { + if account == "" || !mysql.isTrackingAccountMessages() { + return + } + _, err = mysql.insertAccountMessage.ExecContext(ctx, id, account) + mysql.logError("could not insert account-message entry", err) + return +} + func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipientAccount string, item history.Item) (err error) { if mysql.db == nil { return @@ -399,10 +632,102 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient } } + err = mysql.insertAccountMessageEntry(ctx, id, senderAccount) + if err != nil { + return + } + return } -func (mysql *MySQL) msgidToTime(ctx context.Context, msgid string) (result time.Time, err error) { +// note that accountName is the unfolded name +func (mysql *MySQL) DeleteMsgid(msgid, accountName string) (err error) { + if mysql.db == nil { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), mysql.getTimeout()) + defer cancel() + + _, id, data, err := mysql.lookupMsgid(ctx, msgid, true) + if err != nil { + return + } + + if accountName != "*" { + var item history.Item + err = unmarshalItem(data, &item) + // delete if the entry is corrupt + if err == nil && item.AccountName != accountName { + return ErrDisallowed + } + } + + err = mysql.deleteHistoryIDs(ctx, []uint64{id}) + mysql.logError("couldn't delete msgid", err) + return +} + +func (mysql *MySQL) Export(account string, writer io.Writer) { + if mysql.db == nil { + return + } + + var err error + var lastSeen uint64 + for { + rows := func() (count int) { + ctx, cancel := context.WithTimeout(context.Background(), cleanupPauseTime) + defer cancel() + + rows, rowsErr := mysql.db.QueryContext(ctx, ` + SELECT account_messages.history_id, history.data, sequence.target FROM account_messages + INNER JOIN history ON history.id = account_messages.history_id + INNER JOIN sequence ON account_messages.history_id = sequence.history_id + WHERE account_messages.account = ? AND account_messages.history_id > ? + LIMIT ?`, account, lastSeen, cleanupRowLimit) + if rowsErr != nil { + err = rowsErr + return + } + defer rows.Close() + for rows.Next() { + var id uint64 + var blob, jsonBlob []byte + var target string + var item history.Item + err = rows.Scan(&id, &blob, &target) + if err != nil { + return + } + err = unmarshalItem(blob, &item) + if err != nil { + return + } + item.CfCorrespondent = target + jsonBlob, err = json.Marshal(item) + if err != nil { + return + } + count++ + if lastSeen < id { + lastSeen = id + } + writer.Write(jsonBlob) + writer.Write([]byte{'\n'}) + } + return + }() + if rows == 0 || err != nil { + break + } + } + + mysql.logError("could not export history", err) + return +} + +func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) { // in theory, we could optimize out a roundtrip to the database by using a subquery instead: // sequence.nanotime > ( // SELECT sequence.nanotime FROM sequence, history @@ -415,15 +740,27 @@ func (mysql *MySQL) msgidToTime(ctx context.Context, msgid string) (result time. if err != nil { return } - row := mysql.db.QueryRowContext(ctx, ` - SELECT sequence.nanotime FROM sequence + cols := `sequence.nanotime` + if includeData { + cols = `sequence.nanotime, sequence.history_id, history.data` + } + row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(` + SELECT %s FROM sequence INNER JOIN history ON history.id = sequence.history_id - WHERE history.msgid = ? LIMIT 1;`, decoded) + WHERE history.msgid = ? LIMIT 1;`, cols), decoded) var nanotime int64 - err = row.Scan(&nanotime) - if mysql.logError("could not resolve msgid to time", err) { + if !includeData { + err = row.Scan(&nanotime) + } else { + err = row.Scan(&nanotime, &id, &data) + } + if err != sql.ErrNoRows { + mysql.logError("could not resolve msgid to time", err) + } + if err != nil { return } + result = time.Unix(0, nanotime).UTC() return } @@ -519,14 +856,14 @@ func (s *mySQLHistorySequence) Between(start, end history.Selector, limit int) ( startTime := start.Time if start.Msgid != "" { - startTime, err = s.mysql.msgidToTime(ctx, start.Msgid) + startTime, _, _, err = s.mysql.lookupMsgid(ctx, start.Msgid, false) if err != nil { return nil, false, err } } endTime := end.Time if end.Msgid != "" { - endTime, err = s.mysql.msgidToTime(ctx, end.Msgid) + endTime, _, _, err = s.mysql.lookupMsgid(ctx, end.Msgid, false) if err != nil { return nil, false, err } diff --git a/irc/nickname.go b/irc/nickname.go index 59d7beb2..793a5895 100644 --- a/irc/nickname.go +++ b/irc/nickname.go @@ -80,7 +80,7 @@ func performNickChange(server *Server, client *Client, target *Client, session * } for _, channel := range client.Channels() { - channel.AddHistoryItem(histItem) + channel.AddHistoryItem(histItem, details.account) } if target.Registered() { diff --git a/irc/roleplay.go b/irc/roleplay.go index 3e89d11c..3c1f750e 100644 --- a/irc/roleplay.go +++ b/irc/roleplay.go @@ -91,7 +91,7 @@ func sendRoleplayMessage(server *Server, client *Client, source string, targetSt Type: history.Privmsg, Message: splitMessage, Nick: source, - }) + }, client.Account()) } else { target, err := CasefoldName(targetString) user := server.clients.Get(target) diff --git a/irc/server.go b/irc/server.go index 06016124..e3016453 100644 --- a/irc/server.go +++ b/irc/server.go @@ -879,6 +879,76 @@ func (server *Server) GetHistorySequence(providedChannel *Channel, client *Clien return } +func (server *Server) ForgetHistory(accountName string) { + // sanity check + if accountName == "*" { + return + } + + config := server.Config() + if !config.History.Enabled { + return + } + + if cfAccount, err := CasefoldName(accountName); err == nil { + server.historyDB.Forget(cfAccount) + } + + persistent := config.History.Persistent + if persistent.Enabled && persistent.UnregisteredChannels && persistent.RegisteredChannels == PersistentMandatory && persistent.DirectMessages == PersistentMandatory { + return + } + + predicate := func(item *history.Item) bool { return item.AccountName == accountName } + + for _, channel := range server.channels.Channels() { + channel.history.Delete(predicate) + } + + for _, client := range server.clients.AllClients() { + client.history.Delete(predicate) + } +} + +// deletes a message. target is a hint about what buffer it's in (not required for +// persistent history, where all the msgids are indexed together). if accountName +// is anything other than "*", it must match the recorded AccountName of the message +func (server *Server) DeleteMessage(target, msgid, accountName string) (err error) { + config := server.Config() + var hist *history.Buffer + + if target != "" { + if target[0] == '#' { + channel := server.channels.Get(target) + if channel != nil { + if status, _ := channel.historyStatus(config); status == HistoryEphemeral { + hist = &channel.history + } + } + } else { + client := server.clients.Get(target) + if client != nil { + if status, _ := client.historyStatus(config); status == HistoryEphemeral { + hist = &client.history + } + } + } + } + + if hist == nil { + err = server.historyDB.DeleteMsgid(msgid, accountName) + } else { + count := hist.Delete(func(item *history.Item) bool { + return item.Message.Msgid == msgid && (accountName == "*" || item.AccountName == accountName) + }) + if count == 0 { + err = errNoop + } + } + + return +} + // elistMatcher takes and matches ELIST conditions type elistMatcher struct { MinClientsActive bool diff --git a/irc/services.go b/irc/services.go index 0bcde401..e2dfcd0a 100644 --- a/irc/services.go +++ b/irc/services.go @@ -82,6 +82,13 @@ var OragonoServices = map[string]*ircService{ Commands: hostservCommands, HelpBanner: hostservHelp, }, + "histserv": { + Name: "HistServ", + ShortName: "HISTSERV", + CommandAliases: []string{"HISTSERV"}, + Commands: histservCommands, + HelpBanner: histservHelp, + }, } // all service commands at the protocol level, by uppercase command name diff --git a/oragono.yaml b/oragono.yaml index 7e29c9ba..a3a1582d 100644 --- a/oragono.yaml +++ b/oragono.yaml @@ -280,6 +280,10 @@ server: secure-nets: # - "10.0.0.0/8" + # oragono will write files to disk under certain circumstances, e.g., + # CPU profiling or data export. by default, these files will be written + # to the working directory. set this to customize: + # output-path: "/home/oragono/out" # account options accounts: @@ -577,6 +581,7 @@ oper-classes: - "samode" - "vhosts" - "chanreg" + - "history" # ircd operators opers: @@ -772,7 +777,8 @@ roleplay: # add the real nickname, in parentheses, to the end of every roleplay message? add-suffix: true -# message history tracking, for the RESUME extension and possibly other uses in future +# history message storage: this is used by CHATHISTORY, HISTORY, znc.in/playback, +# various autoreplay features, and the resume extension history: # should we store messages for later playback? # by default, messages are stored in RAM only; they do not persist @@ -841,3 +847,13 @@ history: # if you enable this, strict nickname reservation is strongly recommended # as well. direct-messages: "opt-out" + + # options to control how messages are stored and deleted: + retention: + # allow users to delete their own messages from history? + allow-individual-delete: false + + # if persistent history is enabled, create additional index tables, + # allowing deletion of JSON export of an account's messages. this + # may be needed for compliance with data privacy regulations. + enable-account-indexing: false