Keep connection state. Fixes #856

Actually check if we're connected when trying to Send() a message.
Messages now will get dropped when not connected.

TODO: Ideally this should be in a ring buffer to retransmit when the
connection comes back up.
This commit is contained in:
Wim 2019-06-30 18:34:38 +02:00
parent c52664f22e
commit cf3cddafab

View File

@ -2,7 +2,9 @@ package bxmpp
import ( import (
"crypto/tls" "crypto/tls"
"fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/42wim/matterbridge/bridge" "github.com/42wim/matterbridge/bridge"
@ -19,6 +21,8 @@ type Bxmpp struct {
startTime time.Time startTime time.Time
xc *xmpp.Client xc *xmpp.Client
xmppMap map[string]string xmppMap map[string]string
connected bool
sync.RWMutex
} }
func New(cfg *bridge.Config) bridge.Bridger { func New(cfg *bridge.Config) bridge.Bridger {
@ -55,6 +59,10 @@ func (b *Bxmpp) JoinChannel(channel config.ChannelInfo) error {
} }
func (b *Bxmpp) Send(msg config.Message) (string, error) { func (b *Bxmpp) Send(msg config.Message) (string, error) {
// should be fixed by using a cache instead of dropping
if !b.Connected() {
return "", fmt.Errorf("bridge %s not connected, dropping message %#v to bridge", b.Account, msg)
}
// ignore delete messages // ignore delete messages
if msg.Event == config.EventMsgDelete { if msg.Event == config.EventMsgDelete {
return "", nil return "", nil
@ -124,6 +132,7 @@ func (b *Bxmpp) createXMPP() error {
} }
func (b *Bxmpp) manageConnection() { func (b *Bxmpp) manageConnection() {
b.setConnected(true)
initial := true initial := true
bf := &backoff.Backoff{ bf := &backoff.Backoff{
Min: time.Second, Min: time.Second,
@ -148,6 +157,7 @@ func (b *Bxmpp) manageConnection() {
if err := b.handleXMPP(); err != nil { if err := b.handleXMPP(); err != nil {
b.Log.WithError(err).Error("Disconnected.") b.Log.WithError(err).Error("Disconnected.")
b.setConnected(false)
} }
// Reconnection loop using an exponential back-off strategy. We // Reconnection loop using an exponential back-off strategy. We
@ -159,6 +169,7 @@ func (b *Bxmpp) manageConnection() {
b.Log.Infof("Reconnecting now.") b.Log.Infof("Reconnecting now.")
if err := b.createXMPP(); err == nil { if err := b.createXMPP(); err == nil {
b.setConnected(true)
bf.Reset() bf.Reset()
break break
} }
@ -334,3 +345,15 @@ func (b *Bxmpp) skipMessage(message xmpp.Chat) bool {
// skip delayed messages // skip delayed messages
return !message.Stamp.IsZero() && time.Since(message.Stamp).Minutes() > 5 return !message.Stamp.IsZero() && time.Since(message.Stamp).Minutes() > 5
} }
func (b *Bxmpp) setConnected(state bool) {
b.Lock()
b.connected = state
defer b.Unlock()
}
func (b *Bxmpp) Connected() bool {
b.RLock()
defer b.RUnlock()
return b.connected
}