diff --git a/irc/channel.go b/irc/channel.go index 8aafe13a..2cb4e385 100644 --- a/irc/channel.go +++ b/irc/channel.go @@ -47,10 +47,10 @@ type Channel struct { userLimit int accountToUMode map[string]modes.Mode history history.Buffer - stateMutex sync.RWMutex // tier 1 - writerSemaphore utils.Semaphore // tier 1.5 - joinPartMutex sync.Mutex // tier 3 - ensureLoaded utils.Once // manages loading stored registration info from the database + stateMutex sync.RWMutex // tier 1 + writebackLock sync.Mutex // tier 1.5 + joinPartMutex sync.Mutex // tier 3 + ensureLoaded utils.Once // manages loading stored registration info from the database dirtyBits uint settings ChannelSettings } @@ -61,12 +61,11 @@ func NewChannel(s *Server, name, casefoldedName string, registered bool) *Channe config := s.Config() channel := &Channel{ - createdTime: time.Now().UTC(), // may be overwritten by applyRegInfo - members: make(MemberSet), - name: name, - nameCasefolded: casefoldedName, - server: s, - writerSemaphore: utils.NewSemaphore(1), + createdTime: time.Now().UTC(), // may be overwritten by applyRegInfo + members: make(MemberSet), + name: name, + nameCasefolded: casefoldedName, + server: s, } channel.initializeLists() @@ -209,11 +208,11 @@ func (channel *Channel) MarkDirty(dirtyBits uint) { // ChannelManager's lock (that way, no one can join and make the channel dirty again // between this method exiting and the actual deletion). func (channel *Channel) IsClean() bool { - if !channel.writerSemaphore.TryAcquire() { + if !channel.writebackLock.TryLock() { // a database write (which may fail) is in progress, the channel cannot be cleaned up return false } - defer channel.writerSemaphore.Release() + defer channel.writebackLock.Unlock() channel.stateMutex.RLock() defer channel.stateMutex.RUnlock() @@ -225,7 +224,7 @@ func (channel *Channel) IsClean() bool { } func (channel *Channel) wakeWriter() { - if channel.writerSemaphore.TryAcquire() { + if channel.writebackLock.TryLock() { go channel.writeLoop() } } @@ -235,7 +234,7 @@ func (channel *Channel) writeLoop() { for { // TODO(#357) check the error value of this and implement timed backoff channel.performWrite(0) - channel.writerSemaphore.Release() + channel.writebackLock.Unlock() channel.stateMutex.RLock() isDirty := channel.dirtyBits != 0 @@ -249,7 +248,7 @@ func (channel *Channel) writeLoop() { return // nothing to do } // else: isDirty, so we need to write again - if !channel.writerSemaphore.TryAcquire() { + if !channel.writebackLock.TryLock() { return } } @@ -272,8 +271,8 @@ func (channel *Channel) Store(dirtyBits uint) (err error) { } }() - channel.writerSemaphore.Acquire() - defer channel.writerSemaphore.Release() + channel.writebackLock.Lock() + defer channel.writebackLock.Unlock() return channel.performWrite(dirtyBits) } diff --git a/irc/client.go b/irc/client.go index 55d3d1fd..b98a8501 100644 --- a/irc/client.go +++ b/irc/client.go @@ -113,7 +113,7 @@ type Client struct { vhost string history history.Buffer dirtyBits uint - writerSemaphore utils.Semaphore // tier 1.5 + writebackLock sync.Mutex // tier 1.5 } type saslStatus struct { @@ -335,16 +335,15 @@ func (server *Server) RunClient(conn IRCConn) { Duration: config.Accounts.LoginThrottling.Duration, Limit: config.Accounts.LoginThrottling.MaxAttempts, }, - server: server, - accountName: "*", - nick: "*", // * is used until actual nick is given - nickCasefolded: "*", - nickMaskString: "*", // * is used until actual nick is given - realIP: realIP, - proxiedIP: proxiedIP, - requireSASL: requireSASL, - nextSessionID: 1, - writerSemaphore: utils.NewSemaphore(1), + server: server, + accountName: "*", + nick: "*", // * is used until actual nick is given + nickCasefolded: "*", + nickMaskString: "*", // * is used until actual nick is given + realIP: realIP, + proxiedIP: proxiedIP, + requireSASL: requireSASL, + nextSessionID: 1, } if requireSASL { client.requireSASLMessage = banMsg @@ -424,8 +423,6 @@ func (server *Server) AddAlwaysOnClient(account ClientAccount, channelToStatus m realname: realname, nextSessionID: 1, - - writerSemaphore: utils.NewSemaphore(1), } if client.checkAlwaysOnExpirationNoMutex(config, true) { @@ -1772,7 +1769,7 @@ func (client *Client) markDirty(dirtyBits uint) { } func (client *Client) wakeWriter() { - if client.writerSemaphore.TryAcquire() { + if client.writebackLock.TryLock() { go client.writeLoop() } } @@ -1780,13 +1777,13 @@ func (client *Client) wakeWriter() { func (client *Client) writeLoop() { for { client.performWrite(0) - client.writerSemaphore.Release() + client.writebackLock.Unlock() client.stateMutex.RLock() isDirty := client.dirtyBits != 0 client.stateMutex.RUnlock() - if !isDirty || !client.writerSemaphore.TryAcquire() { + if !isDirty || !client.writebackLock.TryLock() { return } } @@ -1844,8 +1841,8 @@ func (client *Client) Store(dirtyBits uint) (err error) { } }() - client.writerSemaphore.Acquire() - defer client.writerSemaphore.Release() + client.writebackLock.Lock() + defer client.writebackLock.Unlock() client.performWrite(dirtyBits) return nil } diff --git a/irc/socket.go b/irc/socket.go index 3a55bf5f..c27ccd52 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -8,8 +8,6 @@ import ( "errors" "io" "sync" - - "github.com/ergochat/ergo/irc/utils" ) var ( @@ -27,7 +25,7 @@ type Socket struct { maxSendQBytes int // this is a trylock enforcing that only one goroutine can write to `conn` at a time - writerSemaphore utils.Semaphore + writeLock sync.Mutex buffers [][]byte totalLength int @@ -40,9 +38,8 @@ type Socket struct { // NewSocket returns a new Socket. func NewSocket(conn IRCConn, maxSendQBytes int) *Socket { result := Socket{ - conn: conn, - maxSendQBytes: maxSendQBytes, - writerSemaphore: utils.NewSemaphore(1), + conn: conn, + maxSendQBytes: maxSendQBytes, } return &result } @@ -128,8 +125,8 @@ func (socket *Socket) BlockingWrite(data []byte) (err error) { }() // blocking acquire of the trylock - socket.writerSemaphore.Acquire() - defer socket.writerSemaphore.Release() + socket.writeLock.Lock() + defer socket.writeLock.Unlock() // first, flush any buffered data, to preserve the ordering guarantees closed := socket.performWrite() @@ -146,7 +143,7 @@ func (socket *Socket) BlockingWrite(data []byte) (err error) { // wakeWriter starts the goroutine that actually performs the write, without blocking func (socket *Socket) wakeWriter() { - if socket.writerSemaphore.TryAcquire() { + if socket.writeLock.TryLock() { // acquired the trylock; send() will release it go socket.send() } @@ -182,12 +179,12 @@ func (socket *Socket) send() { socket.performWrite() // surrender the trylock, avoiding a race where a write comes in after we've // checked readyToWrite() and it returned false, but while we still hold the trylock: - socket.writerSemaphore.Release() + socket.writeLock.Unlock() // check if more data came in while we held the trylock: if !socket.readyToWrite() { return } - if !socket.writerSemaphore.TryAcquire() { + if !socket.writeLock.TryLock() { // failed to acquire; exit and wait for the holder to observe readyToWrite() // after releasing it return