mirror of
https://github.com/ergochat/ergo.git
synced 2024-11-15 08:29:31 +01:00
Merge pull request #1995 from slingamn/trylock.1
replace some utils.Semaphore with (*sync.Mutex).TryLock
This commit is contained in:
commit
57a213123f
@ -47,10 +47,10 @@ type Channel struct {
|
|||||||
userLimit int
|
userLimit int
|
||||||
accountToUMode map[string]modes.Mode
|
accountToUMode map[string]modes.Mode
|
||||||
history history.Buffer
|
history history.Buffer
|
||||||
stateMutex sync.RWMutex // tier 1
|
stateMutex sync.RWMutex // tier 1
|
||||||
writerSemaphore utils.Semaphore // tier 1.5
|
writebackLock sync.Mutex // tier 1.5
|
||||||
joinPartMutex sync.Mutex // tier 3
|
joinPartMutex sync.Mutex // tier 3
|
||||||
ensureLoaded utils.Once // manages loading stored registration info from the database
|
ensureLoaded utils.Once // manages loading stored registration info from the database
|
||||||
dirtyBits uint
|
dirtyBits uint
|
||||||
settings ChannelSettings
|
settings ChannelSettings
|
||||||
}
|
}
|
||||||
@ -61,12 +61,11 @@ func NewChannel(s *Server, name, casefoldedName string, registered bool) *Channe
|
|||||||
config := s.Config()
|
config := s.Config()
|
||||||
|
|
||||||
channel := &Channel{
|
channel := &Channel{
|
||||||
createdTime: time.Now().UTC(), // may be overwritten by applyRegInfo
|
createdTime: time.Now().UTC(), // may be overwritten by applyRegInfo
|
||||||
members: make(MemberSet),
|
members: make(MemberSet),
|
||||||
name: name,
|
name: name,
|
||||||
nameCasefolded: casefoldedName,
|
nameCasefolded: casefoldedName,
|
||||||
server: s,
|
server: s,
|
||||||
writerSemaphore: utils.NewSemaphore(1),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
channel.initializeLists()
|
channel.initializeLists()
|
||||||
@ -209,11 +208,11 @@ func (channel *Channel) MarkDirty(dirtyBits uint) {
|
|||||||
// ChannelManager's lock (that way, no one can join and make the channel dirty again
|
// ChannelManager's lock (that way, no one can join and make the channel dirty again
|
||||||
// between this method exiting and the actual deletion).
|
// between this method exiting and the actual deletion).
|
||||||
func (channel *Channel) IsClean() bool {
|
func (channel *Channel) IsClean() bool {
|
||||||
if !channel.writerSemaphore.TryAcquire() {
|
if !channel.writebackLock.TryLock() {
|
||||||
// a database write (which may fail) is in progress, the channel cannot be cleaned up
|
// a database write (which may fail) is in progress, the channel cannot be cleaned up
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
defer channel.writerSemaphore.Release()
|
defer channel.writebackLock.Unlock()
|
||||||
|
|
||||||
channel.stateMutex.RLock()
|
channel.stateMutex.RLock()
|
||||||
defer channel.stateMutex.RUnlock()
|
defer channel.stateMutex.RUnlock()
|
||||||
@ -225,7 +224,7 @@ func (channel *Channel) IsClean() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (channel *Channel) wakeWriter() {
|
func (channel *Channel) wakeWriter() {
|
||||||
if channel.writerSemaphore.TryAcquire() {
|
if channel.writebackLock.TryLock() {
|
||||||
go channel.writeLoop()
|
go channel.writeLoop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -235,7 +234,7 @@ func (channel *Channel) writeLoop() {
|
|||||||
for {
|
for {
|
||||||
// TODO(#357) check the error value of this and implement timed backoff
|
// TODO(#357) check the error value of this and implement timed backoff
|
||||||
channel.performWrite(0)
|
channel.performWrite(0)
|
||||||
channel.writerSemaphore.Release()
|
channel.writebackLock.Unlock()
|
||||||
|
|
||||||
channel.stateMutex.RLock()
|
channel.stateMutex.RLock()
|
||||||
isDirty := channel.dirtyBits != 0
|
isDirty := channel.dirtyBits != 0
|
||||||
@ -249,7 +248,7 @@ func (channel *Channel) writeLoop() {
|
|||||||
return // nothing to do
|
return // nothing to do
|
||||||
} // else: isDirty, so we need to write again
|
} // else: isDirty, so we need to write again
|
||||||
|
|
||||||
if !channel.writerSemaphore.TryAcquire() {
|
if !channel.writebackLock.TryLock() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -272,8 +271,8 @@ func (channel *Channel) Store(dirtyBits uint) (err error) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
channel.writerSemaphore.Acquire()
|
channel.writebackLock.Lock()
|
||||||
defer channel.writerSemaphore.Release()
|
defer channel.writebackLock.Unlock()
|
||||||
return channel.performWrite(dirtyBits)
|
return channel.performWrite(dirtyBits)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ type Client struct {
|
|||||||
vhost string
|
vhost string
|
||||||
history history.Buffer
|
history history.Buffer
|
||||||
dirtyBits uint
|
dirtyBits uint
|
||||||
writerSemaphore utils.Semaphore // tier 1.5
|
writebackLock sync.Mutex // tier 1.5
|
||||||
}
|
}
|
||||||
|
|
||||||
type saslStatus struct {
|
type saslStatus struct {
|
||||||
@ -335,16 +335,15 @@ func (server *Server) RunClient(conn IRCConn) {
|
|||||||
Duration: config.Accounts.LoginThrottling.Duration,
|
Duration: config.Accounts.LoginThrottling.Duration,
|
||||||
Limit: config.Accounts.LoginThrottling.MaxAttempts,
|
Limit: config.Accounts.LoginThrottling.MaxAttempts,
|
||||||
},
|
},
|
||||||
server: server,
|
server: server,
|
||||||
accountName: "*",
|
accountName: "*",
|
||||||
nick: "*", // * is used until actual nick is given
|
nick: "*", // * is used until actual nick is given
|
||||||
nickCasefolded: "*",
|
nickCasefolded: "*",
|
||||||
nickMaskString: "*", // * is used until actual nick is given
|
nickMaskString: "*", // * is used until actual nick is given
|
||||||
realIP: realIP,
|
realIP: realIP,
|
||||||
proxiedIP: proxiedIP,
|
proxiedIP: proxiedIP,
|
||||||
requireSASL: requireSASL,
|
requireSASL: requireSASL,
|
||||||
nextSessionID: 1,
|
nextSessionID: 1,
|
||||||
writerSemaphore: utils.NewSemaphore(1),
|
|
||||||
}
|
}
|
||||||
if requireSASL {
|
if requireSASL {
|
||||||
client.requireSASLMessage = banMsg
|
client.requireSASLMessage = banMsg
|
||||||
@ -424,8 +423,6 @@ func (server *Server) AddAlwaysOnClient(account ClientAccount, channelToStatus m
|
|||||||
realname: realname,
|
realname: realname,
|
||||||
|
|
||||||
nextSessionID: 1,
|
nextSessionID: 1,
|
||||||
|
|
||||||
writerSemaphore: utils.NewSemaphore(1),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if client.checkAlwaysOnExpirationNoMutex(config, true) {
|
if client.checkAlwaysOnExpirationNoMutex(config, true) {
|
||||||
@ -1772,7 +1769,7 @@ func (client *Client) markDirty(dirtyBits uint) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) wakeWriter() {
|
func (client *Client) wakeWriter() {
|
||||||
if client.writerSemaphore.TryAcquire() {
|
if client.writebackLock.TryLock() {
|
||||||
go client.writeLoop()
|
go client.writeLoop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1780,13 +1777,13 @@ func (client *Client) wakeWriter() {
|
|||||||
func (client *Client) writeLoop() {
|
func (client *Client) writeLoop() {
|
||||||
for {
|
for {
|
||||||
client.performWrite(0)
|
client.performWrite(0)
|
||||||
client.writerSemaphore.Release()
|
client.writebackLock.Unlock()
|
||||||
|
|
||||||
client.stateMutex.RLock()
|
client.stateMutex.RLock()
|
||||||
isDirty := client.dirtyBits != 0
|
isDirty := client.dirtyBits != 0
|
||||||
client.stateMutex.RUnlock()
|
client.stateMutex.RUnlock()
|
||||||
|
|
||||||
if !isDirty || !client.writerSemaphore.TryAcquire() {
|
if !isDirty || !client.writebackLock.TryLock() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1844,8 +1841,8 @@ func (client *Client) Store(dirtyBits uint) (err error) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
client.writerSemaphore.Acquire()
|
client.writebackLock.Lock()
|
||||||
defer client.writerSemaphore.Release()
|
defer client.writebackLock.Unlock()
|
||||||
client.performWrite(dirtyBits)
|
client.performWrite(dirtyBits)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -8,8 +8,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ergochat/ergo/irc/utils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -27,7 +25,7 @@ type Socket struct {
|
|||||||
maxSendQBytes int
|
maxSendQBytes int
|
||||||
|
|
||||||
// this is a trylock enforcing that only one goroutine can write to `conn` at a time
|
// this is a trylock enforcing that only one goroutine can write to `conn` at a time
|
||||||
writerSemaphore utils.Semaphore
|
writeLock sync.Mutex
|
||||||
|
|
||||||
buffers [][]byte
|
buffers [][]byte
|
||||||
totalLength int
|
totalLength int
|
||||||
@ -40,9 +38,8 @@ type Socket struct {
|
|||||||
// NewSocket returns a new Socket.
|
// NewSocket returns a new Socket.
|
||||||
func NewSocket(conn IRCConn, maxSendQBytes int) *Socket {
|
func NewSocket(conn IRCConn, maxSendQBytes int) *Socket {
|
||||||
result := Socket{
|
result := Socket{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
maxSendQBytes: maxSendQBytes,
|
maxSendQBytes: maxSendQBytes,
|
||||||
writerSemaphore: utils.NewSemaphore(1),
|
|
||||||
}
|
}
|
||||||
return &result
|
return &result
|
||||||
}
|
}
|
||||||
@ -128,8 +125,8 @@ func (socket *Socket) BlockingWrite(data []byte) (err error) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// blocking acquire of the trylock
|
// blocking acquire of the trylock
|
||||||
socket.writerSemaphore.Acquire()
|
socket.writeLock.Lock()
|
||||||
defer socket.writerSemaphore.Release()
|
defer socket.writeLock.Unlock()
|
||||||
|
|
||||||
// first, flush any buffered data, to preserve the ordering guarantees
|
// first, flush any buffered data, to preserve the ordering guarantees
|
||||||
closed := socket.performWrite()
|
closed := socket.performWrite()
|
||||||
@ -146,7 +143,7 @@ func (socket *Socket) BlockingWrite(data []byte) (err error) {
|
|||||||
|
|
||||||
// wakeWriter starts the goroutine that actually performs the write, without blocking
|
// wakeWriter starts the goroutine that actually performs the write, without blocking
|
||||||
func (socket *Socket) wakeWriter() {
|
func (socket *Socket) wakeWriter() {
|
||||||
if socket.writerSemaphore.TryAcquire() {
|
if socket.writeLock.TryLock() {
|
||||||
// acquired the trylock; send() will release it
|
// acquired the trylock; send() will release it
|
||||||
go socket.send()
|
go socket.send()
|
||||||
}
|
}
|
||||||
@ -182,12 +179,12 @@ func (socket *Socket) send() {
|
|||||||
socket.performWrite()
|
socket.performWrite()
|
||||||
// surrender the trylock, avoiding a race where a write comes in after we've
|
// surrender the trylock, avoiding a race where a write comes in after we've
|
||||||
// checked readyToWrite() and it returned false, but while we still hold the trylock:
|
// checked readyToWrite() and it returned false, but while we still hold the trylock:
|
||||||
socket.writerSemaphore.Release()
|
socket.writeLock.Unlock()
|
||||||
// check if more data came in while we held the trylock:
|
// check if more data came in while we held the trylock:
|
||||||
if !socket.readyToWrite() {
|
if !socket.readyToWrite() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !socket.writerSemaphore.TryAcquire() {
|
if !socket.writeLock.TryLock() {
|
||||||
// failed to acquire; exit and wait for the holder to observe readyToWrite()
|
// failed to acquire; exit and wait for the holder to observe readyToWrite()
|
||||||
// after releasing it
|
// after releasing it
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user