3
0
mirror of https://github.com/ergochat/ergo.git synced 2024-11-25 21:39:25 +01:00

autoresizing of history buffers (#349)

This commit is contained in:
Shivaram Lingamneni 2019-05-19 16:34:52 -04:00
parent 7761323f01
commit 6e9a728354
7 changed files with 190 additions and 31 deletions

View File

@ -76,7 +76,7 @@ func NewChannel(s *Server, name string, registered bool) *Channel {
config := s.Config() config := s.Config()
channel.writerSemaphore.Initialize(1) channel.writerSemaphore.Initialize(1)
channel.history.Initialize(config.History.ChannelLength) channel.history.Initialize(config.History.ChannelLength, config.History.AutoresizeWindow)
if !registered { if !registered {
for _, mode := range config.Channels.defaultModes { for _, mode := range config.Channels.defaultModes {

View File

@ -232,7 +232,7 @@ func (server *Server) RunClient(conn clientConn) {
nickCasefolded: "*", nickCasefolded: "*",
nickMaskString: "*", // * is used until actual nick is given nickMaskString: "*", // * is used until actual nick is given
} }
client.history.Initialize(config.History.ClientLength) client.history.Initialize(config.History.ClientLength, config.History.AutoresizeWindow)
client.brbTimer.Initialize(client) client.brbTimer.Initialize(client)
session := &Session{ session := &Session{
client: client, client: client,

View File

@ -347,10 +347,11 @@ type Config struct {
History struct { History struct {
Enabled bool Enabled bool
ChannelLength int `yaml:"channel-length"` ChannelLength int `yaml:"channel-length"`
ClientLength int `yaml:"client-length"` ClientLength int `yaml:"client-length"`
AutoreplayOnJoin int `yaml:"autoreplay-on-join"` AutoresizeWindow time.Duration `yaml:"autoresize-window"`
ChathistoryMax int `yaml:"chathistory-maxmessages"` AutoreplayOnJoin int `yaml:"autoreplay-on-join"`
ChathistoryMax int `yaml:"chathistory-maxmessages"`
} }
Filename string Filename string

View File

@ -25,6 +25,10 @@ const (
Nick Nick
) )
const (
initialAutoSize = 32
)
// a Tagmsg that consists entirely of transient tags is not stored // a Tagmsg that consists entirely of transient tags is not stored
var transientTags = map[string]bool{ var transientTags = map[string]bool{
"+draft/typing": true, "+draft/typing": true,
@ -77,25 +81,39 @@ type Buffer struct {
sync.RWMutex sync.RWMutex
// ring buffer, see irc/whowas.go for conventions // ring buffer, see irc/whowas.go for conventions
buffer []Item buffer []Item
start int start int
end int end int
maximumSize int
window time.Duration
lastDiscarded time.Time lastDiscarded time.Time
enabled uint32 enabled uint32
nowFunc func() time.Time
} }
func NewHistoryBuffer(size int) (result *Buffer) { func NewHistoryBuffer(size int, window time.Duration) (result *Buffer) {
result = new(Buffer) result = new(Buffer)
result.Initialize(size) result.Initialize(size, window)
return return
} }
func (hist *Buffer) Initialize(size int) { func (hist *Buffer) Initialize(size int, window time.Duration) {
hist.buffer = make([]Item, size) initialSize := size
if window != 0 {
initialSize = initialAutoSize
if size < initialSize {
initialSize = size // min(initialAutoSize, size)
}
}
hist.buffer = make([]Item, initialSize)
hist.start = -1 hist.start = -1
hist.end = -1 hist.end = -1
hist.window = window
hist.maximumSize = size
hist.nowFunc = time.Now
hist.setEnabled(size) hist.setEnabled(size)
} }
@ -132,6 +150,8 @@ func (list *Buffer) Add(item Item) {
list.Lock() list.Lock()
defer list.Unlock() defer list.Unlock()
list.maybeExpand()
var pos int var pos int
if list.start == -1 { // empty if list.start == -1 { // empty
pos = 0 pos = 0
@ -269,12 +289,68 @@ func (list *Buffer) next(index int) int {
} }
} }
// return n such that v <= n and n == 2**i for some i
func roundUpToPowerOfTwo(v int) int {
// http://graphics.stanford.edu/~seander/bithacks.html
v -= 1
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
return v + 1
}
func (list *Buffer) maybeExpand() {
if list.window == 0 {
return // autoresize is disabled
}
length := list.length()
if length < len(list.buffer) {
return // we have spare capacity already
}
if len(list.buffer) == list.maximumSize {
return // cannot expand any further
}
wouldDiscard := list.buffer[list.start].Message.Time
if list.window < list.nowFunc().Sub(wouldDiscard) {
return // oldest element is old enough to overwrite
}
newSize := roundUpToPowerOfTwo(length + 1)
if list.maximumSize < newSize {
newSize = list.maximumSize
}
list.resize(newSize)
}
// Resize shrinks or expands the buffer // Resize shrinks or expands the buffer
func (list *Buffer) Resize(size int) { func (list *Buffer) Resize(maximumSize int, window time.Duration) {
newbuffer := make([]Item, size)
list.Lock() list.Lock()
defer list.Unlock() defer list.Unlock()
if list.maximumSize == maximumSize && list.window == window {
return // no-op
}
list.maximumSize = maximumSize
list.window = window
// if we're not autoresizing, we need to resize now;
// if we are autoresizing, we may need to shrink the buffer down to maximumSize,
// but we don't need to grow it now (we can just grow it on the next Add)
// TODO make it possible to shrink the buffer so that it only contains `window`
if window == 0 || maximumSize < len(list.buffer) {
list.resize(maximumSize)
}
}
func (list *Buffer) resize(size int) {
newbuffer := make([]Item, size)
list.setEnabled(size) list.setEnabled(size)
if list.start == -1 { if list.start == -1 {

View File

@ -5,6 +5,7 @@ package history
import ( import (
"reflect" "reflect"
"strconv"
"testing" "testing"
"time" "time"
) )
@ -16,7 +17,7 @@ const (
func TestEmptyBuffer(t *testing.T) { func TestEmptyBuffer(t *testing.T) {
pastTime := easyParse(timeFormat) pastTime := easyParse(timeFormat)
buf := NewHistoryBuffer(0) buf := NewHistoryBuffer(0, 0)
if buf.Enabled() { if buf.Enabled() {
t.Error("the buffer of size 0 must be considered disabled") t.Error("the buffer of size 0 must be considered disabled")
} }
@ -33,7 +34,7 @@ func TestEmptyBuffer(t *testing.T) {
t.Error("the empty/disabled buffer should report results as incomplete") t.Error("the empty/disabled buffer should report results as incomplete")
} }
buf.Resize(1) buf.Resize(1, 0)
if !buf.Enabled() { if !buf.Enabled() {
t.Error("the buffer of size 1 must be considered enabled") t.Error("the buffer of size 1 must be considered enabled")
} }
@ -102,7 +103,7 @@ func assertEqual(supplied, expected interface{}, t *testing.T) {
func TestBuffer(t *testing.T) { func TestBuffer(t *testing.T) {
start := easyParse("2006-01-01 00:00:00Z") start := easyParse("2006-01-01 00:00:00Z")
buf := NewHistoryBuffer(3) buf := NewHistoryBuffer(3, 0)
buf.Add(easyItem("testnick0", "2006-01-01 15:04:05Z")) buf.Add(easyItem("testnick0", "2006-01-01 15:04:05Z"))
buf.Add(easyItem("testnick1", "2006-01-02 15:04:05Z")) buf.Add(easyItem("testnick1", "2006-01-02 15:04:05Z"))
@ -128,12 +129,12 @@ func TestBuffer(t *testing.T) {
assertEqual(toNicks(since), []string{"testnick1"}, t) assertEqual(toNicks(since), []string{"testnick1"}, t)
// shrink the buffer, cutting off testnick1 // shrink the buffer, cutting off testnick1
buf.Resize(2) buf.Resize(2, 0)
since, complete = buf.Between(easyParse("2006-01-02 00:00:00Z"), time.Now(), false, 0) since, complete = buf.Between(easyParse("2006-01-02 00:00:00Z"), time.Now(), false, 0)
assertEqual(complete, false, t) assertEqual(complete, false, t)
assertEqual(toNicks(since), []string{"testnick2", "testnick3"}, t) assertEqual(toNicks(since), []string{"testnick2", "testnick3"}, t)
buf.Resize(5) buf.Resize(5, 0)
buf.Add(easyItem("testnick4", "2006-01-05 15:04:05Z")) buf.Add(easyItem("testnick4", "2006-01-05 15:04:05Z"))
buf.Add(easyItem("testnick5", "2006-01-06 15:04:05Z")) buf.Add(easyItem("testnick5", "2006-01-06 15:04:05Z"))
buf.Add(easyItem("testnick6", "2006-01-07 15:04:05Z")) buf.Add(easyItem("testnick6", "2006-01-07 15:04:05Z"))
@ -145,3 +146,80 @@ func TestBuffer(t *testing.T) {
since, _ = buf.Between(easyParse("2006-01-03 00:00:00Z"), time.Now(), true, 2) since, _ = buf.Between(easyParse("2006-01-03 00:00:00Z"), time.Now(), true, 2)
assertEqual(toNicks(since), []string{"testnick2", "testnick3"}, t) assertEqual(toNicks(since), []string{"testnick2", "testnick3"}, t)
} }
func autoItem(id int, t time.Time) (result Item) {
result.Message.Time = t
result.Nick = strconv.Itoa(id)
return
}
func atoi(s string) int {
result, err := strconv.Atoi(s)
if err != nil {
panic(err)
}
return result
}
func TestAutoresize(t *testing.T) {
now := easyParse("2006-01-01 00:00:00Z")
nowFunc := func() time.Time {
return now
}
buf := NewHistoryBuffer(128, time.Hour)
buf.nowFunc = nowFunc
// add items slowly (one every 10 minutes): the buffer should not expand
// beyond initialAutoSize
id := 0
for i := 0; i < 72; i += 1 {
buf.Add(autoItem(id, now))
if initialAutoSize < buf.length() {
t.Errorf("buffer incorrectly resized above %d to %d", initialAutoSize, buf.length())
}
now = now.Add(time.Minute * 10)
id += 1
}
items := buf.Latest(0)
assertEqual(len(items), initialAutoSize, t)
assertEqual(atoi(items[0].Nick), 40, t)
assertEqual(atoi(items[len(items)-1].Nick), 71, t)
// dump 100 items in very fast:
for i := 0; i < 100; i += 1 {
buf.Add(autoItem(id, now))
now = now.Add(time.Second)
id += 1
}
// ok, 5 items from the first batch are still in the 1-hour window;
// we should overwrite until only those 5 are left, then start expanding
// the buffer so that it retains those 5 and the 100 new items
items = buf.Latest(0)
assertEqual(len(items), 105, t)
assertEqual(atoi(items[0].Nick), 67, t)
assertEqual(atoi(items[len(items)-1].Nick), 171, t)
// another 100 items very fast:
for i := 0; i < 100; i += 1 {
buf.Add(autoItem(id, now))
now = now.Add(time.Second)
id += 1
}
// should fill up to the maximum size of 128 and start overwriting
items = buf.Latest(0)
assertEqual(len(items), 128, t)
assertEqual(atoi(items[0].Nick), 144, t)
assertEqual(atoi(items[len(items)-1].Nick), 271, t)
}
func TestRoundUp(t *testing.T) {
assertEqual(roundUpToPowerOfTwo(2), 2, t)
assertEqual(roundUpToPowerOfTwo(3), 4, t)
assertEqual(roundUpToPowerOfTwo(64), 64, t)
assertEqual(roundUpToPowerOfTwo(65), 128, t)
assertEqual(roundUpToPowerOfTwo(100), 128, t)
assertEqual(roundUpToPowerOfTwo(1000), 1024, t)
assertEqual(roundUpToPowerOfTwo(1025), 2048, t)
assertEqual(roundUpToPowerOfTwo(269435457), 536870912, t)
}

View File

@ -672,16 +672,12 @@ func (server *Server) applyConfig(config *Config, initial bool) (err error) {
} }
// resize history buffers as needed // resize history buffers as needed
if oldConfig != nil { if oldConfig != nil && oldConfig.History != config.History {
if oldConfig.History.ChannelLength != config.History.ChannelLength { for _, channel := range server.channels.Channels() {
for _, channel := range server.channels.Channels() { channel.history.Resize(config.History.ChannelLength, config.History.AutoresizeWindow)
channel.history.Resize(config.History.ChannelLength)
}
} }
if oldConfig.History.ClientLength != config.History.ClientLength { for _, client := range server.clients.AllClients() {
for _, client := range server.clients.AllClients() { client.history.Resize(config.History.ClientLength, config.History.AutoresizeWindow)
client.history.Resize(config.History.ClientLength)
}
} }
} }

View File

@ -592,10 +592,18 @@ history:
enabled: false enabled: false
# how many channel-specific events (messages, joins, parts) should be tracked per channel? # how many channel-specific events (messages, joins, parts) should be tracked per channel?
channel-length: 256 channel-length: 1024
# how many direct messages and notices should be tracked per user? # how many direct messages and notices should be tracked per user?
client-length: 64 client-length: 256
# how long should we try to preserve messages?
# if `autoresize-window` is 0, the in-memory message buffers are preallocated to
# their maximum length. if it is nonzero, the buffers are initially small and
# are dynamically expanded up to the maximum length. if the buffer is full
# and the oldest message is older than `autoresize-window`, then it will overwrite
# the oldest message rather than resize; otherwise, it will expand if possible.
autoresize-window: 1h
# number of messages to automatically play back on channel join (0 to disable): # number of messages to automatically play back on channel join (0 to disable):
autoreplay-on-join: 0 autoreplay-on-join: 0