REHASH: Update listeners, fix some bad hangs/locks

This commit is contained in:
Daniel Oaks 2016-10-22 20:54:04 +10:00
parent 149550b453
commit 835187a736
1 changed files with 137 additions and 19 deletions

View File

@ -17,6 +17,7 @@ import (
"os/signal" "os/signal"
"strconv" "strconv"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@ -34,6 +35,29 @@ type Limits struct {
TopicLen int TopicLen int
} }
// ListenerInterface represents an interface for a listener.
type ListenerInterface struct {
Listener net.Listener
Events chan ListenerEvent
}
const (
// DestroyListener instructs the listener to destroy itself.
DestroyListener ListenerEventType = iota
// UpdateListener instructs the listener to update itself (grab new certs, etc).
UpdateListener = iota
)
// ListenerEventType is the type of event this is.
type ListenerEventType int
// ListenerEvent is an event that's passed to the listener.
type ListenerEvent struct {
Type ListenerEventType
NewConfig *tls.Config
}
// Server is the main Oragono server.
type Server struct { type Server struct {
accounts map[string]*ClientAccount accounts map[string]*ClientAccount
channels ChannelNameMap channels ChannelNameMap
@ -44,6 +68,8 @@ type Server struct {
store buntdb.DB store buntdb.DB
idle chan *Client idle chan *Client
limits Limits limits Limits
listenerUpdateMutex sync.Mutex
listeners map[string]ListenerInterface
monitoring map[string][]Client monitoring map[string][]Client
motdLines []string motdLines []string
name string name string
@ -53,6 +79,7 @@ type Server struct {
operators map[string][]byte operators map[string][]byte
password []byte password []byte
passwords *PasswordManager passwords *PasswordManager
rehashMutex sync.Mutex
accountRegistration *AccountRegistration accountRegistration *AccountRegistration
signals chan os.Signal signals chan os.Signal
whoWas *WhoWasList whoWas *WhoWasList
@ -98,6 +125,7 @@ func NewServer(configFilename string, config *Config) *Server {
NickLen: int(config.Limits.NickLen), NickLen: int(config.Limits.NickLen),
TopicLen: int(config.Limits.TopicLen), TopicLen: int(config.Limits.TopicLen),
}, },
listeners: make(map[string]ListenerInterface),
monitoring: make(map[string][]Client), monitoring: make(map[string][]Client),
name: config.Server.Name, name: config.Server.Name,
nameCasefolded: casefoldedName, nameCasefolded: casefoldedName,
@ -162,7 +190,7 @@ func NewServer(configFilename string, config *Config) *Server {
} }
for _, addr := range config.Server.Listen { for _, addr := range config.Server.Listen {
server.listen(addr, config.TLSListeners()) server.createListener(addr, config.TLSListeners())
} }
if config.Server.Wslisten != "" { if config.Server.Wslisten != "" {
@ -251,12 +279,7 @@ func (server *Server) Run() {
done = true done = true
case conn := <-server.newConns: case conn := <-server.newConns:
NewClient(server, conn.Conn, conn.IsTLS) go NewClient(server, conn.Conn, conn.IsTLS)
/*TODO(dan): LOOK AT THIS MORE CLOSELY
case cmd := <-server.commands:
server.processCommand(cmd)
*/
case client := <-server.idle: case client := <-server.idle:
client.Idle() client.Idle()
@ -265,13 +288,21 @@ func (server *Server) Run() {
} }
// //
// listen goroutine // IRC protocol listeners
// //
func (s *Server) listen(addr string, tlsMap map[string]*tls.Config) { func (s *Server) createListener(addr string, tlsMap map[string]*tls.Config) {
//TODO(dan): we could casemap this but... eh
config, listenTLS := tlsMap[addr] config, listenTLS := tlsMap[addr]
_, alreadyExists := s.listeners[addr]
if alreadyExists {
log.Fatal(s, "listener already exists:", addr)
}
// make listener event channel
listenerEventChannel := make(chan ListenerEvent, 1)
// make listener
listener, err := net.Listen("tcp", addr) listener, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
log.Fatal(s, "listen error: ", err) log.Fatal(s, "listen error: ", err)
@ -283,23 +314,67 @@ func (s *Server) listen(addr string, tlsMap map[string]*tls.Config) {
listener = tls.NewListener(listener, config) listener = tls.NewListener(listener, config)
tlsString = "TLS" tlsString = "TLS"
} }
// throw our details to the server so we can be modified/killed later
li := ListenerInterface{
Events: listenerEventChannel,
Listener: listener,
}
s.listeners[addr] = li
// start listening
Log.info.Printf("%s listening on %s using %s.", s.name, addr, tlsString) Log.info.Printf("%s listening on %s using %s.", s.name, addr, tlsString)
// setup accept goroutine
go func() { go func() {
for { for {
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil {
Log.error.Printf("%s accept error: %s", s.name, err)
continue
}
Log.debug.Printf("%s accept: %s", s.name, conn.RemoteAddr())
newConn := clientConn{ if err == nil {
Conn: conn, newConn := clientConn{
IsTLS: listenTLS, Conn: conn,
IsTLS: listenTLS,
}
s.newConns <- newConn
} }
s.newConns <- newConn select {
case event := <-s.listeners[addr].Events:
if event.Type == DestroyListener {
// listener should already be closed, this is just for safety
listener.Close()
return
} else if event.Type == UpdateListener {
// close old listener
listener.Close()
// make new listener
listener, err = net.Listen("tcp", addr)
if err != nil {
log.Fatal(s, "listen error: ", err)
}
tlsString := "plaintext"
if event.NewConfig != nil {
config = event.NewConfig
config.ClientAuth = tls.RequestClientCert
listener = tls.NewListener(listener, config)
tlsString = "TLS"
}
// update server ListenerInterface
li.Listener = listener
s.listenerUpdateMutex.Lock()
s.listeners[addr] = li
s.listenerUpdateMutex.Unlock()
// print notice
Log.info.Printf("%s updated listener %s using %s.", s.name, addr, tlsString)
}
default:
// no events waiting for us, fall-through and continue
}
} }
}() }()
} }
@ -795,6 +870,9 @@ func operHandler(server *Server, client *Client, msg ircmsg.IrcMessage) bool {
// REHASH // REHASH
func rehashHandler(server *Server, client *Client, msg ircmsg.IrcMessage) bool { func rehashHandler(server *Server, client *Client, msg ircmsg.IrcMessage) bool {
// only let one REHASH go on at a time
server.rehashMutex.Lock()
config, err := LoadConfig(server.configFilename) config, err := LoadConfig(server.configFilename)
if err != nil { if err != nil {
@ -833,6 +911,46 @@ func rehashHandler(server *Server, client *Client, msg ircmsg.IrcMessage) bool {
} }
} }
// destroy old listeners
tlsListeners := config.TLSListeners()
for addr := range server.listeners {
var exists bool
for _, newaddr := range config.Server.Listen {
if newaddr == addr {
exists = true
break
}
}
if exists {
// update old listener
fmt.Println("refreshing", addr)
server.listeners[addr].Events <- ListenerEvent{
Type: UpdateListener,
NewConfig: tlsListeners[addr],
}
} else {
// destroy nonexistent listener
fmt.Println("destroying", addr)
server.listeners[addr].Events <- ListenerEvent{
Type: DestroyListener,
}
}
// force listener to apply the event right away
server.listeners[addr].Listener.Close()
}
for _, newaddr := range config.Server.Listen {
_, exists := server.listeners[newaddr]
if !exists {
// make new listener
fmt.Println("creating", newaddr)
server.createListener(newaddr, tlsListeners)
}
}
server.rehashMutex.Unlock()
client.Send(nil, server.name, RPL_REHASHING, client.nick, "ircd.yaml", "Rehashing") client.Send(nil, server.name, RPL_REHASHING, client.nick, "ircd.yaml", "Rehashing")
return false return false
} }