improve performance by using less goroutines

- get rid of command channel in Client
- get rid of loginTimer; use other timers instead
- move debugging code to debug.go
This commit is contained in:
Jeremy Latt 2014-04-15 08:49:52 -07:00
parent cab21782b4
commit 6c10add399
6 changed files with 153 additions and 108 deletions

View File

@ -7,7 +7,6 @@ import (
) )
const ( const (
LOGIN_TIMEOUT = time.Minute / 2 // how long the client has to login
IDLE_TIMEOUT = time.Minute // how long before a client is considered idle IDLE_TIMEOUT = time.Minute // how long before a client is considered idle
QUIT_TIMEOUT = time.Minute // how long after idle before a client is kicked QUIT_TIMEOUT = time.Minute // how long after idle before a client is kicked
) )
@ -19,14 +18,12 @@ type Client struct {
capabilities CapabilitySet capabilities CapabilitySet
capState CapState capState CapState
channels ChannelSet channels ChannelSet
commands chan Command
ctime time.Time ctime time.Time
flags map[UserMode]bool flags map[UserMode]bool
hasQuit bool hasQuit bool
hops uint hops uint
hostname Name hostname Name
idleTimer *time.Timer idleTimer *time.Timer
loginTimer *time.Timer
nick Name nick Name
quitTimer *time.Timer quitTimer *time.Timer
realname Text realname Text
@ -44,13 +41,12 @@ func NewClient(server *Server, conn net.Conn) *Client {
capState: CapNone, capState: CapNone,
capabilities: make(CapabilitySet), capabilities: make(CapabilitySet),
channels: make(ChannelSet), channels: make(ChannelSet),
commands: make(chan Command),
ctime: now, ctime: now,
flags: make(map[UserMode]bool), flags: make(map[UserMode]bool),
server: server, server: server,
socket: NewSocket(conn),
} }
client.socket = NewSocket(conn, client.commands) client.Touch()
client.loginTimer = time.AfterFunc(LOGIN_TIMEOUT, client.connectionTimeout)
go client.run() go client.run()
return client return client
@ -61,8 +57,31 @@ func NewClient(server *Server, conn net.Conn) *Client {
// //
func (client *Client) run() { func (client *Client) run() {
for command := range client.commands { var command Command
if checkPass, ok := command.(checkPasswordCommand); ok { var err error
var line string
// Set the hostname for this client. The client may later send a PROXY
// command from stunnel that sets the hostname to something more accurate.
client.send(NewProxyCommand(AddrLookupHostname(
client.socket.conn.RemoteAddr())))
for err == nil {
if line, err = client.socket.Read(); err != nil {
command = NewQuitCommand("connection closed")
} else if command, err = ParseCommand(line); err != nil {
switch err {
case ErrParseCommand:
client.Reply(RplNotice(client.server, client,
NewText("failed to parse command")))
case NotEnoughArgsError:
// TODO
}
continue
} else if checkPass, ok := command.(checkPasswordCommand); ok {
checkPass.LoadPassword(client.server) checkPass.LoadPassword(client.server)
// Block the client thread while handling a potentially expensive // Block the client thread while handling a potentially expensive
// password bcrypt operation. Since the server is single-threaded // password bcrypt operation. Since the server is single-threaded
@ -71,13 +90,20 @@ func (client *Client) run() {
// completes. This could be a form of DoS if handled naively. // completes. This could be a form of DoS if handled naively.
checkPass.CheckPassword() checkPass.CheckPassword()
} }
command.SetClient(client)
client.server.commands <- command client.send(command)
} }
} }
func (client *Client) send(command Command) {
command.SetClient(client)
client.server.commands <- command
}
// quit timer goroutine
func (client *Client) connectionTimeout() { func (client *Client) connectionTimeout() {
client.commands <- NewQuitCommand("connection timeout") client.send(NewQuitCommand("connection timeout"))
} }
// //
@ -109,7 +135,7 @@ func (client *Client) Touch() {
} }
func (client *Client) Idle() { func (client *Client) Idle() {
client.Reply(RplPing(client)) client.Reply(RplPing(client.server))
if client.quitTimer == nil { if client.quitTimer == nil {
client.quitTimer = time.AfterFunc(QUIT_TIMEOUT, client.connectionTimeout) client.quitTimer = time.AfterFunc(QUIT_TIMEOUT, client.connectionTimeout)
@ -123,7 +149,6 @@ func (client *Client) Register() {
return return
} }
client.registered = true client.registered = true
client.loginTimer.Stop()
client.Touch() client.Touch()
} }
@ -140,9 +165,6 @@ func (client *Client) destroy() {
// clean up self // clean up self
if client.loginTimer != nil {
client.loginTimer.Stop()
}
if client.idleTimer != nil { if client.idleTimer != nil {
client.idleTimer.Stop() client.idleTimer.Stop()
} }

View File

@ -702,7 +702,6 @@ func ParseProxyCommand(args []string) (Command, error) {
type AwayCommand struct { type AwayCommand struct {
BaseCommand BaseCommand
text Text text Text
away bool
} }
func ParseAwayCommand(args []string) (Command, error) { func ParseAwayCommand(args []string) (Command, error) {
@ -710,7 +709,6 @@ func ParseAwayCommand(args []string) (Command, error) {
if len(args) > 0 { if len(args) > 0 {
cmd.text = NewText(args[0]) cmd.text = NewText(args[0])
cmd.away = true
} }
return cmd, nil return cmd, nil

68
irc/debug.go Normal file
View File

@ -0,0 +1,68 @@
package irc
import (
"os"
"runtime"
"runtime/debug"
"runtime/pprof"
"time"
)
func (msg *DebugCommand) HandleServer(server *Server) {
client := msg.Client()
if !client.flags[Operator] {
return
}
switch msg.subCommand {
case "GCSTATS":
stats := debug.GCStats{
Pause: make([]time.Duration, 10),
PauseQuantiles: make([]time.Duration, 5),
}
debug.ReadGCStats(&stats)
server.Replyf(client, "last GC: %s", stats.LastGC.Format(time.RFC1123))
server.Replyf(client, "num GC: %d", stats.NumGC)
server.Replyf(client, "pause total: %s", stats.PauseTotal)
server.Replyf(client, "pause quantiles min%%: %s", stats.PauseQuantiles[0])
server.Replyf(client, "pause quantiles 25%%: %s", stats.PauseQuantiles[1])
server.Replyf(client, "pause quantiles 50%%: %s", stats.PauseQuantiles[2])
server.Replyf(client, "pause quantiles 75%%: %s", stats.PauseQuantiles[3])
server.Replyf(client, "pause quantiles max%%: %s", stats.PauseQuantiles[4])
case "NUMGOROUTINE":
count := runtime.NumGoroutine()
server.Replyf(client, "num goroutines: %d", count)
case "PROFILEHEAP":
profFile := "ergonomadic.mprof"
file, err := os.Create(profFile)
if err != nil {
server.Replyf(client, "error: %s", err)
break
}
defer file.Close()
pprof.Lookup("heap").WriteTo(file, 0)
server.Replyf(client, "written to %s", profFile)
case "STARTCPUPROFILE":
profFile := "ergonomadic.prof"
file, err := os.Create(profFile)
if err != nil {
server.Replyf(client, "error: %s", err)
break
}
if err := pprof.StartCPUProfile(file); err != nil {
defer file.Close()
server.Replyf(client, "error: %s", err)
break
}
server.Replyf(client, "CPU profile writing to %s", profFile)
case "STOPCPUPROFILE":
pprof.StopCPUProfile()
server.Reply(client, "CPU profiling stopped")
}
}

View File

@ -151,8 +151,8 @@ func RplPing(target Identifiable) string {
return NewStringReply(nil, PING, ":%s", target.Nick()) return NewStringReply(nil, PING, ":%s", target.Nick())
} }
func RplPong(client *Client) string { func RplPong(client *Client, msg Text) string {
return NewStringReply(nil, PONG, client.Nick().String()) return NewStringReply(nil, PONG, "%s :%s", client.server, msg.String())
} }
func RplQuit(client *Client, message Text) string { func RplQuit(client *Client, message Text) string {

View File

@ -8,9 +8,6 @@ import (
"net" "net"
"os" "os"
"os/signal" "os/signal"
"runtime"
"runtime/debug"
"runtime/pprof"
"strings" "strings"
"syscall" "syscall"
"time" "time"
@ -121,7 +118,6 @@ func (server *Server) loadChannels() {
func (server *Server) processCommand(cmd Command) { func (server *Server) processCommand(cmd Command) {
client := cmd.Client() client := cmd.Client()
Log.debug.Printf("%s → %+v", client, cmd)
if !client.registered { if !client.registered {
regCmd, ok := cmd.(RegServerCommand) regCmd, ok := cmd.(RegServerCommand)
@ -138,6 +134,7 @@ func (server *Server) processCommand(cmd Command) {
client.ErrUnknownCommand(cmd.Code()) client.ErrUnknownCommand(cmd.Code())
return return
} }
switch srvCmd.(type) { switch srvCmd.(type) {
case *PingCommand, *PongCommand: case *PingCommand, *PongCommand:
client.Touch() client.Touch()
@ -149,6 +146,7 @@ func (server *Server) processCommand(cmd Command) {
client.Active() client.Active()
client.Touch() client.Touch()
} }
srvCmd.HandleServer(server) srvCmd.HandleServer(server)
} }
@ -272,6 +270,14 @@ func (s *Server) Nick() Name {
return s.Id() return s.Id()
} }
func (server *Server) Reply(target *Client, message string) {
target.Reply(RplPrivMsg(server, target, NewText(message)))
}
func (server *Server) Replyf(target *Client, format string, args ...interface{}) {
server.Reply(target, fmt.Sprintf(format, args...))
}
// //
// registration commands // registration commands
// //
@ -344,7 +350,8 @@ func (m *PassCommand) HandleServer(s *Server) {
} }
func (m *PingCommand) HandleServer(s *Server) { func (m *PingCommand) HandleServer(s *Server) {
m.Client().Reply(RplPong(m.Client())) client := m.Client()
client.Reply(RplPong(client, m.server.Text()))
} }
func (m *PongCommand) HandleServer(s *Server) { func (m *PongCommand) HandleServer(s *Server) {
@ -514,23 +521,33 @@ func (msg *OperCommand) HandleServer(server *Server) {
client.flags[Operator] = true client.flags[Operator] = true
client.RplYoureOper() client.RplYoureOper()
client.RplUModeIs(client) client.Reply(RplModeChanges(client, client, ModeChanges{&ModeChange{
mode: Operator,
op: Add,
}}))
} }
func (msg *AwayCommand) HandleServer(server *Server) { func (msg *AwayCommand) HandleServer(server *Server) {
client := msg.Client() client := msg.Client()
if msg.away { if len(msg.text) > 0 {
client.flags[Away] = true client.flags[Away] = true
} else { } else {
delete(client.flags, Away) delete(client.flags, Away)
} }
client.awayMessage = msg.text client.awayMessage = msg.text
var op ModeOp
if client.flags[Away] { if client.flags[Away] {
op = Add
client.RplNowAway() client.RplNowAway()
} else { } else {
op = Remove
client.RplUnAway() client.RplUnAway()
} }
client.Reply(RplModeChanges(client, client, ModeChanges{&ModeChange{
mode: Away,
op: op,
}}))
} }
func (msg *IsOnCommand) HandleServer(server *Server) { func (msg *IsOnCommand) HandleServer(server *Server) {
@ -638,53 +655,6 @@ func (msg *NamesCommand) HandleServer(server *Server) {
} }
} }
func (server *Server) Reply(target *Client, format string, args ...interface{}) {
target.Reply(RplPrivMsg(server, target, NewText(fmt.Sprintf(format, args...))))
}
func (msg *DebugCommand) HandleServer(server *Server) {
client := msg.Client()
if !client.flags[Operator] {
return
}
switch msg.subCommand {
case "GC":
runtime.GC()
server.Reply(client, "OK")
case "GCSTATS":
stats := debug.GCStats{
Pause: make([]time.Duration, 10),
PauseQuantiles: make([]time.Duration, 5),
}
debug.ReadGCStats(&stats)
server.Reply(client, "last GC: %s", stats.LastGC.Format(time.RFC1123))
server.Reply(client, "num GC: %d", stats.NumGC)
server.Reply(client, "pause total: %s", stats.PauseTotal)
server.Reply(client, "pause quantiles min%%: %s", stats.PauseQuantiles[0])
server.Reply(client, "pause quantiles 25%%: %s", stats.PauseQuantiles[1])
server.Reply(client, "pause quantiles 50%%: %s", stats.PauseQuantiles[2])
server.Reply(client, "pause quantiles 75%%: %s", stats.PauseQuantiles[3])
server.Reply(client, "pause quantiles max%%: %s", stats.PauseQuantiles[4])
case "NUMGOROUTINE":
count := runtime.NumGoroutine()
server.Reply(client, "num goroutines: %d", count)
case "PROFILEHEAP":
profFile := "ergonomadic-heap.prof"
file, err := os.Create(profFile)
if err != nil {
log.Printf("error: %s", err)
break
}
defer file.Close()
pprof.Lookup("heap").WriteTo(file, 0)
server.Reply(client, "written to %s", profFile)
}
}
func (msg *VersionCommand) HandleServer(server *Server) { func (msg *VersionCommand) HandleServer(server *Server) {
client := msg.Client() client := msg.Client()
if (msg.target != "") && (msg.target != server.name) { if (msg.target != "") && (msg.target != server.name) {

View File

@ -9,23 +9,20 @@ import (
const ( const (
R = '→' R = '→'
W = '←' W = '←'
EOF = ""
) )
type Socket struct { type Socket struct {
conn net.Conn conn net.Conn
scanner *bufio.Scanner
writer *bufio.Writer writer *bufio.Writer
} }
func NewSocket(conn net.Conn, commands chan<- Command) *Socket { func NewSocket(conn net.Conn) *Socket {
socket := &Socket{ return &Socket{
conn: conn, conn: conn,
scanner: bufio.NewScanner(conn),
writer: bufio.NewWriter(conn), writer: bufio.NewWriter(conn),
} }
go socket.readLines(commands)
return socket
} }
func (socket *Socket) String() string { func (socket *Socket) String() string {
@ -37,32 +34,22 @@ func (socket *Socket) Close() {
Log.debug.Printf("%s closed", socket) Log.debug.Printf("%s closed", socket)
} }
func (socket *Socket) readLines(commands chan<- Command) { func (socket *Socket) Read() (line string, err error) {
commands <- NewProxyCommand(AddrLookupHostname(socket.conn.RemoteAddr())) for socket.scanner.Scan() {
line = socket.scanner.Text()
scanner := bufio.NewScanner(socket.conn)
for scanner.Scan() {
line := scanner.Text()
if len(line) == 0 { if len(line) == 0 {
continue continue
} }
Log.debug.Printf("%s → %s", socket, line) Log.debug.Printf("%s → %s", socket, line)
return
msg, err := ParseCommand(line)
if err != nil {
// TODO error messaging to client
continue
}
commands <- msg
} }
if err := scanner.Err(); err != nil { err = socket.scanner.Err()
Log.debug.Printf("%s error: %s", socket, err) socket.isError(err, R)
if err == nil {
err = io.EOF
} }
return
commands <- NewQuitCommand("connection closed")
close(commands)
} }
func (socket *Socket) Write(line string) (err error) { func (socket *Socket) Write(line string) (err error) {