replace some utils.Semaphore with (*sync.Mutex).TryLock

See #1994
This commit is contained in:
Shivaram Lingamneni 2022-09-02 04:00:38 -04:00
parent 531a1d6864
commit 746309e386
3 changed files with 39 additions and 46 deletions

View File

@ -47,10 +47,10 @@ type Channel struct {
userLimit int
accountToUMode map[string]modes.Mode
history history.Buffer
stateMutex sync.RWMutex // tier 1
writerSemaphore utils.Semaphore // tier 1.5
joinPartMutex sync.Mutex // tier 3
ensureLoaded utils.Once // manages loading stored registration info from the database
stateMutex sync.RWMutex // tier 1
writebackLock sync.Mutex // tier 1.5
joinPartMutex sync.Mutex // tier 3
ensureLoaded utils.Once // manages loading stored registration info from the database
dirtyBits uint
settings ChannelSettings
}
@ -61,12 +61,11 @@ func NewChannel(s *Server, name, casefoldedName string, registered bool) *Channe
config := s.Config()
channel := &Channel{
createdTime: time.Now().UTC(), // may be overwritten by applyRegInfo
members: make(MemberSet),
name: name,
nameCasefolded: casefoldedName,
server: s,
writerSemaphore: utils.NewSemaphore(1),
createdTime: time.Now().UTC(), // may be overwritten by applyRegInfo
members: make(MemberSet),
name: name,
nameCasefolded: casefoldedName,
server: s,
}
channel.initializeLists()
@ -209,11 +208,11 @@ func (channel *Channel) MarkDirty(dirtyBits uint) {
// ChannelManager's lock (that way, no one can join and make the channel dirty again
// between this method exiting and the actual deletion).
func (channel *Channel) IsClean() bool {
if !channel.writerSemaphore.TryAcquire() {
if !channel.writebackLock.TryLock() {
// a database write (which may fail) is in progress, the channel cannot be cleaned up
return false
}
defer channel.writerSemaphore.Release()
defer channel.writebackLock.Unlock()
channel.stateMutex.RLock()
defer channel.stateMutex.RUnlock()
@ -225,7 +224,7 @@ func (channel *Channel) IsClean() bool {
}
func (channel *Channel) wakeWriter() {
if channel.writerSemaphore.TryAcquire() {
if channel.writebackLock.TryLock() {
go channel.writeLoop()
}
}
@ -235,7 +234,7 @@ func (channel *Channel) writeLoop() {
for {
// TODO(#357) check the error value of this and implement timed backoff
channel.performWrite(0)
channel.writerSemaphore.Release()
channel.writebackLock.Unlock()
channel.stateMutex.RLock()
isDirty := channel.dirtyBits != 0
@ -249,7 +248,7 @@ func (channel *Channel) writeLoop() {
return // nothing to do
} // else: isDirty, so we need to write again
if !channel.writerSemaphore.TryAcquire() {
if !channel.writebackLock.TryLock() {
return
}
}
@ -272,8 +271,8 @@ func (channel *Channel) Store(dirtyBits uint) (err error) {
}
}()
channel.writerSemaphore.Acquire()
defer channel.writerSemaphore.Release()
channel.writebackLock.Lock()
defer channel.writebackLock.Unlock()
return channel.performWrite(dirtyBits)
}

View File

@ -113,7 +113,7 @@ type Client struct {
vhost string
history history.Buffer
dirtyBits uint
writerSemaphore utils.Semaphore // tier 1.5
writebackLock sync.Mutex // tier 1.5
}
type saslStatus struct {
@ -335,16 +335,15 @@ func (server *Server) RunClient(conn IRCConn) {
Duration: config.Accounts.LoginThrottling.Duration,
Limit: config.Accounts.LoginThrottling.MaxAttempts,
},
server: server,
accountName: "*",
nick: "*", // * is used until actual nick is given
nickCasefolded: "*",
nickMaskString: "*", // * is used until actual nick is given
realIP: realIP,
proxiedIP: proxiedIP,
requireSASL: requireSASL,
nextSessionID: 1,
writerSemaphore: utils.NewSemaphore(1),
server: server,
accountName: "*",
nick: "*", // * is used until actual nick is given
nickCasefolded: "*",
nickMaskString: "*", // * is used until actual nick is given
realIP: realIP,
proxiedIP: proxiedIP,
requireSASL: requireSASL,
nextSessionID: 1,
}
if requireSASL {
client.requireSASLMessage = banMsg
@ -424,8 +423,6 @@ func (server *Server) AddAlwaysOnClient(account ClientAccount, channelToStatus m
realname: realname,
nextSessionID: 1,
writerSemaphore: utils.NewSemaphore(1),
}
if client.checkAlwaysOnExpirationNoMutex(config, true) {
@ -1772,7 +1769,7 @@ func (client *Client) markDirty(dirtyBits uint) {
}
func (client *Client) wakeWriter() {
if client.writerSemaphore.TryAcquire() {
if client.writebackLock.TryLock() {
go client.writeLoop()
}
}
@ -1780,13 +1777,13 @@ func (client *Client) wakeWriter() {
func (client *Client) writeLoop() {
for {
client.performWrite(0)
client.writerSemaphore.Release()
client.writebackLock.Unlock()
client.stateMutex.RLock()
isDirty := client.dirtyBits != 0
client.stateMutex.RUnlock()
if !isDirty || !client.writerSemaphore.TryAcquire() {
if !isDirty || !client.writebackLock.TryLock() {
return
}
}
@ -1844,8 +1841,8 @@ func (client *Client) Store(dirtyBits uint) (err error) {
}
}()
client.writerSemaphore.Acquire()
defer client.writerSemaphore.Release()
client.writebackLock.Lock()
defer client.writebackLock.Unlock()
client.performWrite(dirtyBits)
return nil
}

View File

@ -8,8 +8,6 @@ import (
"errors"
"io"
"sync"
"github.com/ergochat/ergo/irc/utils"
)
var (
@ -27,7 +25,7 @@ type Socket struct {
maxSendQBytes int
// this is a trylock enforcing that only one goroutine can write to `conn` at a time
writerSemaphore utils.Semaphore
writeLock sync.Mutex
buffers [][]byte
totalLength int
@ -40,9 +38,8 @@ type Socket struct {
// NewSocket returns a new Socket.
func NewSocket(conn IRCConn, maxSendQBytes int) *Socket {
result := Socket{
conn: conn,
maxSendQBytes: maxSendQBytes,
writerSemaphore: utils.NewSemaphore(1),
conn: conn,
maxSendQBytes: maxSendQBytes,
}
return &result
}
@ -128,8 +125,8 @@ func (socket *Socket) BlockingWrite(data []byte) (err error) {
}()
// blocking acquire of the trylock
socket.writerSemaphore.Acquire()
defer socket.writerSemaphore.Release()
socket.writeLock.Lock()
defer socket.writeLock.Unlock()
// first, flush any buffered data, to preserve the ordering guarantees
closed := socket.performWrite()
@ -146,7 +143,7 @@ func (socket *Socket) BlockingWrite(data []byte) (err error) {
// wakeWriter starts the goroutine that actually performs the write, without blocking
func (socket *Socket) wakeWriter() {
if socket.writerSemaphore.TryAcquire() {
if socket.writeLock.TryLock() {
// acquired the trylock; send() will release it
go socket.send()
}
@ -182,12 +179,12 @@ func (socket *Socket) send() {
socket.performWrite()
// surrender the trylock, avoiding a race where a write comes in after we've
// checked readyToWrite() and it returned false, but while we still hold the trylock:
socket.writerSemaphore.Release()
socket.writeLock.Unlock()
// check if more data came in while we held the trylock:
if !socket.readyToWrite() {
return
}
if !socket.writerSemaphore.TryAcquire() {
if !socket.writeLock.TryLock() {
// failed to acquire; exit and wait for the holder to observe readyToWrite()
// after releasing it
return