3
0
mirror of https://github.com/ergochat/ergo.git synced 2024-11-29 23:49:25 +01:00

Merge pull request #135 from slingamn/listeners_refactor_again.7

refactor create/update/destroy of listeners
This commit is contained in:
Daniel Oaks 2017-09-25 10:48:22 +10:00 committed by GitHub
commit 989fea18f9
2 changed files with 78 additions and 114 deletions

View File

@ -368,6 +368,7 @@ func (conf *Config) TLSListeners() map[string]*tls.Config {
tlsListeners := make(map[string]*tls.Config) tlsListeners := make(map[string]*tls.Config)
for s, tlsListenersConf := range conf.Server.TLSListeners { for s, tlsListenersConf := range conf.Server.TLSListeners {
config, err := tlsListenersConf.Config() config, err := tlsListenersConf.Config()
config.ClientAuth = tls.RequestClientCert
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -57,26 +57,15 @@ type LineLenLimits struct {
Rest int Rest int
} }
// ListenerInterface represents an interface for a listener. // ListenerWrapper wraps a listener so it can be safely reconfigured or stopped
type ListenerInterface struct { type ListenerWrapper struct {
Listener net.Listener listener net.Listener
Events chan ListenerEvent tlsConfig *tls.Config
} shouldStop bool
// lets the ListenerWrapper inform the server that it has stopped:
const ( stopEvent chan bool
// DestroyListener instructs the listener to destroy itself. // protects atomic update of tlsConfig and shouldStop:
DestroyListener ListenerEventType = iota configMutex sync.Mutex
// UpdateListener instructs the listener to update itself (grab new certs, etc).
UpdateListener = iota
)
// ListenerEventType is the type of event this is.
type ListenerEventType int
// ListenerEvent is an event that's passed to the listener.
type ListenerEvent struct {
Type ListenerEventType
NewConfig *tls.Config
} }
// Server is the main Oragono server. // Server is the main Oragono server.
@ -102,9 +91,7 @@ type Server struct {
isupport *ISupportList isupport *ISupportList
klines *KLineManager klines *KLineManager
limits Limits limits Limits
listenerEventActMutex sync.Mutex listeners map[string]*ListenerWrapper
listeners map[string]ListenerInterface
listenerUpdateMutex sync.Mutex
logger *logger.Manager logger *logger.Manager
MaxSendQBytes uint64 MaxSendQBytes uint64
monitoring map[string][]*Client monitoring map[string][]*Client
@ -220,7 +207,7 @@ func NewServer(configFilename string, config *Config, logger *logger.Manager) (*
Rest: config.Limits.LineLen.Rest, Rest: config.Limits.LineLen.Rest,
}, },
}, },
listeners: make(map[string]ListenerInterface), listeners: make(map[string]*ListenerWrapper),
logger: logger, logger: logger,
MaxSendQBytes: config.Server.MaxSendQBytes, MaxSendQBytes: config.Server.MaxSendQBytes,
monitoring: make(map[string][]*Client), monitoring: make(map[string][]*Client),
@ -316,7 +303,7 @@ func NewServer(configFilename string, config *Config, logger *logger.Manager) (*
tlsListeners := config.TLSListeners() tlsListeners := config.TLSListeners()
for _, addr := range config.Server.Listen { for _, addr := range config.Server.Listen {
server.createListener(addr, tlsListeners) server.listeners[addr] = server.createListener(addr, tlsListeners[addr])
} }
if len(tlsListeners) == 0 { if len(tlsListeners) == 0 {
@ -437,10 +424,12 @@ func (server *Server) Run() {
case <-server.rehashSignal: case <-server.rehashSignal:
server.logger.Info("rehash", "Rehashing due to SIGHUP") server.logger.Info("rehash", "Rehashing due to SIGHUP")
go func() {
err := server.rehash() err := server.rehash()
if err != nil { if err != nil {
server.logger.Error("rehash", fmt.Sprintln("Failed to rehash:", err.Error())) server.logger.Error("rehash", fmt.Sprintln("Failed to rehash:", err.Error()))
} }
}()
case conn := <-server.newConns: case conn := <-server.newConns:
// check connection limits // check connection limits
@ -509,97 +498,63 @@ func (server *Server) Run() {
// //
// createListener starts the given listeners. // createListener starts the given listeners.
func (server *Server) createListener(addr string, tlsMap map[string]*tls.Config) { func (server *Server) createListener(addr string, tlsConfig *tls.Config) *ListenerWrapper {
config, listenTLS := tlsMap[addr]
_, alreadyExists := server.listeners[addr]
if alreadyExists {
log.Fatal(server, "listener already exists:", addr)
}
// make listener event channel
listenerEventChannel := make(chan ListenerEvent, 1)
// make listener // make listener
listener, err := net.Listen("tcp", addr) listener, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
log.Fatal(server, "listen error: ", err) log.Fatal(server, "listen error: ", err)
} }
// throw our details to the server so we can be modified/killed later
wrapper := ListenerWrapper{
listener: listener,
tlsConfig: tlsConfig,
shouldStop: false,
stopEvent: make(chan bool, 1),
}
// TODO(slingamn) move all logging of listener status to rehash()
tlsString := "plaintext" tlsString := "plaintext"
if listenTLS { if tlsConfig != nil {
config.ClientAuth = tls.RequestClientCert
listener = tls.NewListener(listener, config)
tlsString = "TLS" tlsString = "TLS"
} }
// throw our details to the server so we can be modified/killed later
li := ListenerInterface{
Events: listenerEventChannel,
Listener: listener,
}
server.listeners[addr] = li
// start listening
server.logger.Info("listeners", fmt.Sprintf("listening on %s using %s.", addr, tlsString)) server.logger.Info("listeners", fmt.Sprintf("listening on %s using %s.", addr, tlsString))
var shouldStop bool
// setup accept goroutine // setup accept goroutine
go func() { go func() {
for { for {
conn, err := listener.Accept() conn, err := listener.Accept()
// synchronously access config data:
// whether TLS is enabled and whether we should stop listening
wrapper.configMutex.Lock()
shouldStop = wrapper.shouldStop
tlsConfig = wrapper.tlsConfig
wrapper.configMutex.Unlock()
if err == nil { if err == nil {
if tlsConfig != nil {
conn = tls.Server(conn, tlsConfig)
}
newConn := clientConn{ newConn := clientConn{
Conn: conn, Conn: conn,
IsTLS: listenTLS, IsTLS: tlsConfig != nil,
} }
// hand off the connection
server.newConns <- newConn server.newConns <- newConn
} }
select { if shouldStop {
case event := <-server.listeners[addr].Events:
// this is used to confirm that whoever passed us this event has closed the existing listener correctly (in an attempt to get us to notice the event).
// this is required to keep REHASH from having a very small race possibility of killing the primary listener
server.listenerEventActMutex.Lock()
server.listenerEventActMutex.Unlock()
if event.Type == DestroyListener {
// listener should already be closed, this is just for safety
listener.Close() listener.Close()
wrapper.stopEvent <- true
return return
} else if event.Type == UpdateListener {
// close old listener
listener.Close()
// make new listener
listener, err = net.Listen("tcp", addr)
if err != nil {
log.Fatal(server, "listen error: ", err)
}
tlsString := "plaintext"
if event.NewConfig != nil {
config = event.NewConfig
config.ClientAuth = tls.RequestClientCert
listener = tls.NewListener(listener, config)
tlsString = "TLS"
}
// update server ListenerInterface
li.Listener = listener
server.listenerUpdateMutex.Lock()
server.listeners[addr] = li
server.listenerUpdateMutex.Unlock()
// print notice
server.logger.Info("listeners", fmt.Sprintf("updated listener %s using %s.", addr, tlsString))
}
default:
// no events waiting for us, fall-through and continue
} }
} }
}() }()
return &wrapper
} }
// //
@ -1651,41 +1606,49 @@ func (server *Server) rehash() error {
} }
server.clients.ByNickMutex.RUnlock() server.clients.ByNickMutex.RUnlock()
// destroy old listeners // update or destroy all existing listeners
tlsListeners := config.TLSListeners() tlsListeners := config.TLSListeners()
for addr := range server.listeners { for addr := range server.listeners {
var exists bool currentListener := server.listeners[addr]
var stillConfigured bool
for _, newaddr := range config.Server.Listen { for _, newaddr := range config.Server.Listen {
if newaddr == addr { if newaddr == addr {
exists = true stillConfigured = true
break break
} }
} }
server.listenerEventActMutex.Lock() // pass new config information to the listener, to be picked up after
if exists { // its next Accept(). this is like sending over a buffered channel of
// update old listener // size 1, but where sending a second item overwrites the buffered item
server.listeners[addr].Events <- ListenerEvent{ // instead of blocking.
Type: UpdateListener, currentListener.configMutex.Lock()
NewConfig: tlsListeners[addr], currentListener.shouldStop = !stillConfigured
} currentListener.tlsConfig = tlsListeners[addr]
currentListener.configMutex.Unlock()
if stillConfigured {
server.logger.Info("rehash",
fmt.Sprintf("now listening on %s, tls=%t.", addr, (currentListener.tlsConfig != nil)),
)
} else { } else {
// destroy nonexistent listener // tell the listener it should stop by interrupting its Accept() call:
server.listeners[addr].Events <- ListenerEvent{ currentListener.listener.Close()
Type: DestroyListener, // XXX there is no guarantee from the API when the address will actually
// free for bind(2) again; this solution "seems to work". See here:
// https://github.com/golang/go/issues/21833
<-currentListener.stopEvent
delete(server.listeners, addr)
server.logger.Info("rehash", fmt.Sprintf("stopped listening on %s.", addr))
} }
} }
// force listener to apply the event right away
server.listeners[addr].Listener.Close()
server.listenerEventActMutex.Unlock()
}
// create new listeners that were not previously configured
for _, newaddr := range config.Server.Listen { for _, newaddr := range config.Server.Listen {
_, exists := server.listeners[newaddr] _, exists := server.listeners[newaddr]
if !exists { if !exists {
// make new listener // make new listener
server.createListener(newaddr, tlsListeners) server.listeners[newaddr] = server.createListener(newaddr, tlsListeners[newaddr])
} }
} }