From c78253fd938300df4cfa17ccff206511d74aff27 Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Fri, 7 Aug 2020 01:10:46 -0400 Subject: [PATCH 1/2] more memory-efficient implementation of line reading --- irc/history/history.go | 14 +------ irc/history/history_test.go | 11 ------ irc/ircconn.go | 77 +++++++++++++++++++++++++++++-------- irc/socket.go | 4 -- irc/utils/math.go | 16 ++++++++ irc/utils/math_test.go | 19 +++++++++ 6 files changed, 96 insertions(+), 45 deletions(-) create mode 100644 irc/utils/math.go create mode 100644 irc/utils/math_test.go diff --git a/irc/history/history.go b/irc/history/history.go index 9d09c92f..c9af505a 100644 --- a/irc/history/history.go +++ b/irc/history/history.go @@ -321,18 +321,6 @@ func (list *Buffer) next(index int) int { } } -// return n such that v <= n and n == 2**i for some i -func roundUpToPowerOfTwo(v int) int { - // http://graphics.stanford.edu/~seander/bithacks.html - v -= 1 - v |= v >> 1 - v |= v >> 2 - v |= v >> 4 - v |= v >> 8 - v |= v >> 16 - return v + 1 -} - func (list *Buffer) maybeExpand() { if list.window == 0 { return // autoresize is disabled @@ -352,7 +340,7 @@ func (list *Buffer) maybeExpand() { return // oldest element is old enough to overwrite } - newSize := roundUpToPowerOfTwo(length + 1) + newSize := utils.RoundUpToPowerOfTwo(length + 1) if list.maximumSize < newSize { newSize = list.maximumSize } diff --git a/irc/history/history_test.go b/irc/history/history_test.go index 514827d3..18ee965c 100644 --- a/irc/history/history_test.go +++ b/irc/history/history_test.go @@ -241,17 +241,6 @@ func TestDisabledByResize(t *testing.T) { assertEqual(len(items), 0, t) } -func TestRoundUp(t *testing.T) { - assertEqual(roundUpToPowerOfTwo(2), 2, t) - assertEqual(roundUpToPowerOfTwo(3), 4, t) - assertEqual(roundUpToPowerOfTwo(64), 64, t) - assertEqual(roundUpToPowerOfTwo(65), 128, t) - assertEqual(roundUpToPowerOfTwo(100), 128, t) - assertEqual(roundUpToPowerOfTwo(1000), 1024, t) - assertEqual(roundUpToPowerOfTwo(1025), 2048, t) - assertEqual(roundUpToPowerOfTwo(269435457), 536870912, t) -} - func BenchmarkInsert(b *testing.B) { buf := NewHistoryBuffer(1024, 0) b.ResetTimer() diff --git a/irc/ircconn.go b/irc/ircconn.go index 903c5261..2f8f6984 100644 --- a/irc/ircconn.go +++ b/irc/ircconn.go @@ -1,9 +1,9 @@ package irc import ( - "bufio" "bytes" "errors" + "io" "net" "unicode/utf8" @@ -14,7 +14,8 @@ import ( ) const ( - maxReadQBytes = ircmsg.MaxlenTagsFromClient + MaxLineLen + 1024 + maxReadQBytes = ircmsg.MaxlenTagsFromClient + MaxLineLen + 1024 + initialBufferSize = 1024 ) var ( @@ -41,8 +42,13 @@ type IRCConn interface { // IRCStreamConn is an IRCConn over a regular stream connection. type IRCStreamConn struct { - conn *utils.WrappedConn - reader *bufio.Reader + conn *utils.WrappedConn + + buf []byte + start int // start of valid (i.e., read but not yet consumed) data in the buffer + end int // end of valid data in the buffer + searchFrom int // start of valid data in the buffer not yet searched for \n + eof bool } func NewIRCStreamConn(conn *utils.WrappedConn) *IRCStreamConn { @@ -67,21 +73,58 @@ func (cc *IRCStreamConn) WriteLines(buffers [][]byte) (err error) { return } -func (cc *IRCStreamConn) ReadLine() (line []byte, err error) { - // lazy initialize the reader in case the IP is banned - if cc.reader == nil { - cc.reader = bufio.NewReaderSize(cc.conn, maxReadQBytes) - } +func (cc *IRCStreamConn) ReadLine() ([]byte, error) { + for { + // try to find a terminated line in the buffered data already read + nlidx := bytes.IndexByte(cc.buf[cc.searchFrom:cc.end], '\n') + if nlidx != -1 { + // got a complete line + line := cc.buf[cc.start : cc.searchFrom+nlidx] + cc.start = cc.searchFrom + nlidx + 1 + cc.searchFrom = cc.start + if globalUtf8EnforcementSetting && !utf8.Valid(line) { + return line, errInvalidUtf8 + } else { + return line, nil + } + } - var isPrefix bool - line, isPrefix, err = cc.reader.ReadLine() - if isPrefix { - return nil, errReadQ + if cc.start == 0 && len(cc.buf) == maxReadQBytes { + return nil, errReadQ // out of space, can't expand or slide + } + + if cc.eof { + return nil, io.EOF + } + + if len(cc.buf) < maxReadQBytes && (len(cc.buf)-(cc.end-cc.start) < initialBufferSize/2) { + // allocate a new buffer, copy any remaining data + newLen := utils.RoundUpToPowerOfTwo(len(cc.buf) + 1) + if newLen > maxReadQBytes { + newLen = maxReadQBytes + } else if newLen < initialBufferSize { + newLen = initialBufferSize + } + newBuf := make([]byte, newLen) + copy(newBuf, cc.buf[cc.start:cc.end]) + cc.buf = newBuf + } else if cc.start != 0 { + // slide remaining data back to the front of the buffer + copy(cc.buf, cc.buf[cc.start:cc.end]) + } + cc.end = cc.end - cc.start + cc.start = 0 + + cc.searchFrom = cc.end + n, err := cc.conn.Read(cc.buf[cc.end:]) + cc.end += n + if n != 0 && err == io.EOF { + // we may have received new \n-terminated lines, try to parse them + cc.eof = true + } else if err != nil { + return nil, err + } } - if globalUtf8EnforcementSetting && !utf8.Valid(line) { - err = errInvalidUtf8 - } - return } func (cc *IRCStreamConn) Close() (err error) { diff --git a/irc/socket.go b/irc/socket.go index f4d6844e..c4cc3e45 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -69,10 +69,6 @@ func (socket *Socket) Read() (string, error) { if err == io.EOF { socket.Close() - // process last message properly (such as ERROR/QUIT/etc), just fail next reads/writes - if line != "" { - err = nil - } } return line, err diff --git a/irc/utils/math.go b/irc/utils/math.go new file mode 100644 index 00000000..45fca833 --- /dev/null +++ b/irc/utils/math.go @@ -0,0 +1,16 @@ +// Copyright (c) 2020 Shivaram Lingamneni +// released under the MIT license + +package utils + +// return n such that v <= n and n == 2**i for some i +func RoundUpToPowerOfTwo(v int) int { + // http://graphics.stanford.edu/~seander/bithacks.html + v -= 1 + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + return v + 1 +} diff --git a/irc/utils/math_test.go b/irc/utils/math_test.go new file mode 100644 index 00000000..c0e5ec0e --- /dev/null +++ b/irc/utils/math_test.go @@ -0,0 +1,19 @@ +// Copyright (c) 2020 Shivaram Lingamneni +// released under the MIT license + +package utils + +import ( + "testing" +) + +func TestRoundUp(t *testing.T) { + assertEqual(RoundUpToPowerOfTwo(2), 2, t) + assertEqual(RoundUpToPowerOfTwo(3), 4, t) + assertEqual(RoundUpToPowerOfTwo(64), 64, t) + assertEqual(RoundUpToPowerOfTwo(65), 128, t) + assertEqual(RoundUpToPowerOfTwo(100), 128, t) + assertEqual(RoundUpToPowerOfTwo(1000), 1024, t) + assertEqual(RoundUpToPowerOfTwo(1025), 2048, t) + assertEqual(RoundUpToPowerOfTwo(269435457), 536870912, t) +} From a34918e729067e0d6a98e94facc63f31d5407a3c Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Mon, 30 Nov 2020 02:08:47 -0500 Subject: [PATCH 2/2] add a fuzz test for IRCStreamConn changes --- irc/ircconn_test.go | 135 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 irc/ircconn_test.go diff --git a/irc/ircconn_test.go b/irc/ircconn_test.go new file mode 100644 index 00000000..13a9930c --- /dev/null +++ b/irc/ircconn_test.go @@ -0,0 +1,135 @@ +// Copyright (c) 2020 Shivaram Lingamneni +// released under the MIT license + +package irc + +import ( + "io" + "math/rand" + "net" + "reflect" + "testing" + "time" + + "github.com/oragono/oragono/irc/utils" +) + +// mockConn is a fake net.Conn / io.Reader that yields len(counts) lines, +// each consisting of counts[i] 'a' characters and a terminating '\n' +type mockConn struct { + counts []int +} + +func min(i, j int) (m int) { + if i < j { + return i + } else { + return j + } +} + +func (c *mockConn) Read(b []byte) (n int, err error) { + for len(b) > 0 { + if len(c.counts) == 0 { + return n, io.EOF + } + if c.counts[0] == 0 { + b[0] = '\n' + c.counts = c.counts[1:] + b = b[1:] + n += 1 + continue + } + size := min(c.counts[0], len(b)) + for i := 0; i < size; i++ { + b[i] = 'a' + } + c.counts[0] -= size + b = b[size:] + n += size + } + return n, nil +} + +func (c *mockConn) Write(b []byte) (n int, err error) { + return +} + +func (c *mockConn) Close() error { + c.counts = nil + return nil +} + +func (c *mockConn) LocalAddr() net.Addr { + return nil +} + +func (c *mockConn) RemoteAddr() net.Addr { + return nil +} + +func (c *mockConn) SetDeadline(t time.Time) error { + return nil +} + +func (c *mockConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *mockConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func newMockConn(counts []int) *utils.WrappedConn { + cpCounts := make([]int, len(counts)) + copy(cpCounts, counts) + c := &mockConn{ + counts: cpCounts, + } + return &utils.WrappedConn{ + Conn: c, + } +} + +// construct a mock reader with some number of \n-terminated lines, +// verify that IRCStreamConn can read and split them as expected +func doLineReaderTest(counts []int, t *testing.T) { + c := newMockConn(counts) + r := NewIRCStreamConn(c) + var readCounts []int + for { + line, err := r.ReadLine() + if err == nil { + readCounts = append(readCounts, len(line)) + } else if err == io.EOF { + break + } else { + panic(err) + } + } + + if !reflect.DeepEqual(counts, readCounts) { + t.Errorf("expected %#v, got %#v", counts, readCounts) + } +} + +const ( + maxMockReaderLen = 100 + maxMockReaderLineLen = 4096 + 511 +) + +func TestLineReader(t *testing.T) { + counts := []int{44, 428, 3, 0, 200, 2000, 0, 4044, 33, 3, 2, 1, 0, 1, 2, 3, 48, 555} + doLineReaderTest(counts, t) + + // fuzz + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 1000; i++ { + countsLen := r.Intn(maxMockReaderLen) + 1 + counts := make([]int, countsLen) + for i := 0; i < countsLen; i++ { + counts[i] = r.Intn(maxMockReaderLineLen) + } + doLineReaderTest(counts, t) + } +}