3
0
mirror of https://github.com/ergochat/ergo.git synced 2026-05-12 18:38:10 +02:00

ensure always-on state is flushed

* Fix a race condition in persisting channel memberships for always-on
  clients (the asynchronous write of the client's channel memberships
  could precede the update to the channel's member list, resulting in
  the membership not being observed and written)

* Ensure always-on state is flushed on shutdown (we were already
  flushing timestamps, because those writes are heavily debounced, but
  we were relying on immediate asynchronous writeback for channel
  memberships and similar state).
This commit is contained in:
Shivaram Lingamneni 2026-05-08 04:25:09 +00:00
parent 2874e181aa
commit 6ccd4e2313
3 changed files with 34 additions and 17 deletions

View File

@ -255,7 +255,7 @@ func (channel *Channel) writeLoop() {
// Store writes part (or all) of the channel's data back to the database,
// blocking until the write is complete. This is the equivalent of
// Socket.BlockingWrite.
func (channel *Channel) Store(dirtyBits uint) (err error) {
func (channel *Channel) Store(additionalDirtyBits uint) (err error) {
defer func() {
channel.stateMutex.Lock()
isDirty := channel.dirtyBits != 0
@ -271,7 +271,7 @@ func (channel *Channel) Store(dirtyBits uint) (err error) {
channel.writebackLock.Lock()
defer channel.writebackLock.Unlock()
return channel.performWrite(dirtyBits)
return channel.performWrite(additionalDirtyBits)
}
// do an individual write; equivalent of Socket.send()
@ -802,7 +802,8 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp
}
}
if joinErr := client.addChannel(channel, rb == nil); joinErr != nil {
alwaysOn, joinErr := client.addChannel(channel)
if joinErr != nil {
return joinErr, ""
}
@ -834,6 +835,13 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp
return
}()
if alwaysOn {
// skip this for simulated join of always-on clients on server startup:
if rb != nil {
client.markDirty(IncludeChannels)
}
}
var message utils.SplitMessage
respectAuditorium := givenMode == modes.Mode(0) && channel.flags.HasMode(modes.Auditorium)
message = utils.MakeMessage("")

View File

@ -1639,11 +1639,12 @@ func (session *Session) Notice(text string) {
// `simulated` is for the fake join of an always-on client
// (we just read the channel name from the database, there's no need to write it back)
func (client *Client) addChannel(channel *Channel, simulated bool) (err error) {
func (client *Client) addChannel(channel *Channel) (alwaysOn bool, err error) {
config := client.server.Config()
client.stateMutex.Lock()
alwaysOn := client.alwaysOn
defer client.stateMutex.Unlock()
alwaysOn = client.alwaysOn
if client.destroyed {
err = errClientDestroyed
} else if client.oper == nil && len(client.channels) >= config.Channels.MaxChannelsPerClient {
@ -1651,11 +1652,8 @@ func (client *Client) addChannel(channel *Channel, simulated bool) (err error) {
} else {
client.channels.Add(channel) // success
}
client.stateMutex.Unlock()
if err == nil && alwaysOn && !simulated {
client.markDirty(IncludeChannels)
}
// XXX don't markDirty here; we need to wait for the change to go through
// on the channel side, so we can correctly record whatever mode was granted
return
}
@ -1967,8 +1965,11 @@ func (client *Client) performWrite(additionalDirtyBits uint) {
}
// Blocking store; see Channel.Store and Socket.BlockingWrite
func (client *Client) Store(dirtyBits uint) (err error) {
func (client *Client) Store(dirtyBits uint, shutdown bool) (err error) {
defer func() {
if shutdown {
return // no need to restart the loop if we're shutting down
}
client.stateMutex.Lock()
isDirty := client.dirtyBits != 0
client.stateMutex.Unlock()

View File

@ -163,7 +163,7 @@ func (server *Server) Shutdown() {
}
// flush data associated with always-on clients:
server.performAlwaysOnMaintenance(false, true)
server.performAlwaysOnMaintenance(true)
if err := server.store.Close(); err != nil {
server.logger.Error("shutdown", "Could not close datastore", err.Error())
@ -285,20 +285,28 @@ func (server *Server) periodicAlwaysOnMaintenance() {
defer server.HandlePanic(nil)
server.logger.Info("accounts", "Performing periodic always-on client checks")
server.performAlwaysOnMaintenance(true, true)
server.performAlwaysOnMaintenance(false)
}
func (server *Server) performAlwaysOnMaintenance(checkExpiration, flushTimestamps bool) {
func (server *Server) performAlwaysOnMaintenance(shutdown bool) {
config := server.Config()
for _, client := range server.clients.AllClients() {
if checkExpiration && client.IsExpiredAlwaysOn(config) {
if !shutdown && client.IsExpiredAlwaysOn(config) {
// TODO save the channels list, use it for autojoin if/when they return?
server.logger.Info("accounts", "Expiring always-on client", client.AccountName())
client.destroy(nil)
continue
}
if flushTimestamps && client.shouldFlushTimestamps() {
// synchronously flush channel memberships, etc., avoiding a race between
// immediate but asynchronous writeback of those fields and server shutdown
if shutdown && client.AlwaysOn() {
client.Store(0, shutdown)
}
// flush the timestamps (which are not written back immediately, for debouncing
// reasons), either as periodic maintenance or on shutdown
if client.shouldFlushTimestamps() {
account := client.Account()
server.accounts.saveLastSeen(account, client.copyLastSeen())
server.accounts.saveReadMarkers(account, client.copyReadMarkers())
@ -343,7 +351,7 @@ func (server *Server) performPushMaintenance() {
}
}
// persist all push subscriptions on the assumption that the timestamps have changed
client.Store(IncludePushSubscriptions)
client.Store(IncludePushSubscriptions, false)
}
}