Fix race-condition in populateUser() (#767)

Fix the root-cause of #759 by introducing synchronisation points for
individual user fetches.
This commit is contained in:
Duco van Amstel 2019-03-20 21:54:31 +00:00 committed by Wim
parent a27600046e
commit 8b754017ca

View File

@ -18,8 +18,9 @@ type users struct {
log *logrus.Entry log *logrus.Entry
sc *slack.Client sc *slack.Client
users map[string]*slack.User users map[string]*slack.User
usersMutex sync.RWMutex usersMutex sync.RWMutex
usersSyncPoints map[string]chan struct{}
refreshInProgress bool refreshInProgress bool
earliestRefresh time.Time earliestRefresh time.Time
@ -31,6 +32,7 @@ func newUserManager(log *logrus.Entry, sc *slack.Client) *users {
log: log, log: log,
sc: sc, sc: sc,
users: make(map[string]*slack.User), users: make(map[string]*slack.User),
usersSyncPoints: make(map[string]chan struct{}),
earliestRefresh: time.Now(), earliestRefresh: time.Now(),
} }
} }
@ -68,14 +70,32 @@ func (b *users) getAvatar(id string) string {
} }
func (b *users) populateUser(userID string) { func (b *users) populateUser(userID string) {
b.usersMutex.RLock() for {
_, exists := b.users[userID] b.usersMutex.Lock()
b.usersMutex.RUnlock() _, exists := b.users[userID]
if exists { if exists {
// already in cache // already in cache
return b.usersMutex.Unlock()
return
}
if syncPoint, ok := b.usersSyncPoints[userID]; ok {
// Another goroutine is already populating this user for us so wait on it to finish.
b.usersMutex.Unlock()
<-syncPoint
// We do not return and iterate again to check that the entry does indeed exist
// in case the previous query failed for some reason.
} else {
b.usersSyncPoints[userID] = make(chan struct{})
b.usersMutex.Unlock()
break
}
} }
// Do not hold the lock while fetching information from Slack
// as this might take an unbounded amount of time.
b.usersMutex.Unlock()
user, err := b.sc.GetUserInfo(userID) user, err := b.sc.GetUserInfo(userID)
if err != nil { if err != nil {
b.log.Debugf("GetUserInfo failed for %v: %v", userID, err) b.log.Debugf("GetUserInfo failed for %v: %v", userID, err)
@ -83,15 +103,20 @@ func (b *users) populateUser(userID string) {
} }
b.usersMutex.Lock() b.usersMutex.Lock()
defer b.usersMutex.Unlock()
// Register user information.
b.users[userID] = user b.users[userID] = user
b.usersMutex.Unlock()
// Wake up any waiting goroutines and remove the synchronization point.
close(b.usersSyncPoints[userID])
delete(b.usersSyncPoints, userID)
} }
func (b *users) populateUsers(wait bool) { func (b *users) populateUsers(wait bool) {
b.refreshMutex.Lock() b.refreshMutex.Lock()
if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) {
b.log.Debugf("Not refreshing user list as it was done less than %v ago.", b.log.Debugf("Not refreshing user list as it was done less than %v ago.", minimumRefreshInterval)
minimumRefreshInterval)
b.refreshMutex.Unlock() b.refreshMutex.Unlock()
return return
@ -230,9 +255,10 @@ func (b *channels) getChannelMembers(users *users) config.ChannelMembers {
func (b *channels) registerChannel(channel slack.Channel) { func (b *channels) registerChannel(channel slack.Channel) {
b.channelsMutex.Lock() b.channelsMutex.Lock()
defer b.channelsMutex.Unlock()
b.channelsByID[channel.ID] = &channel b.channelsByID[channel.ID] = &channel
b.channelsByName[channel.Name] = &channel b.channelsByName[channel.Name] = &channel
b.channelsMutex.Unlock()
} }
func (b *channels) populateChannels(wait bool) { func (b *channels) populateChannels(wait bool) {