Clean-up XMPP handling code (#831)

This commit is contained in:
Duco van Amstel 2019-05-30 11:31:54 +01:00 committed by Wim
parent 3418e8c9af
commit 3724cc3a15

View File

@ -14,50 +14,29 @@ import (
) )
type Bxmpp struct { type Bxmpp struct {
xc *xmpp.Client
xmppMap map[string]string
*bridge.Config *bridge.Config
startTime time.Time startTime time.Time
xc *xmpp.Client
xmppMap map[string]string
} }
func New(cfg *bridge.Config) bridge.Bridger { func New(cfg *bridge.Config) bridge.Bridger {
b := &Bxmpp{Config: cfg} return &Bxmpp{
b.xmppMap = make(map[string]string) Config: cfg,
return b xmppMap: make(map[string]string),
}
} }
func (b *Bxmpp) Connect() error { func (b *Bxmpp) Connect() error {
var err error
b.Log.Infof("Connecting %s", b.GetString("Server")) b.Log.Infof("Connecting %s", b.GetString("Server"))
b.xc, err = b.createXMPP() if err := b.createXMPP(); err != nil {
if err != nil {
b.Log.Debugf("%#v", err) b.Log.Debugf("%#v", err)
return err return err
} }
b.Log.Info("Connection succeeded") b.Log.Info("Connection succeeded")
go func() { go b.manageConnection()
initial := true
bf := &backoff.Backoff{
Min: time.Second,
Max: 5 * time.Minute,
Jitter: true,
}
for {
if initial {
b.handleXMPP()
initial = false
}
d := bf.Duration()
b.Log.Infof("Disconnected. Reconnecting in %s", d)
time.Sleep(d)
b.xc, err = b.createXMPP()
if err == nil {
b.Remote <- config.Message{Username: "system", Text: "rejoin", Channel: "", Account: b.Account, Event: config.EventRejoinChannels}
b.handleXMPP()
bf.Reset()
}
}
}()
return nil return nil
} }
@ -82,34 +61,48 @@ func (b *Bxmpp) Send(msg config.Message) (string, error) {
} }
b.Log.Debugf("=> Receiving %#v", msg) b.Log.Debugf("=> Receiving %#v", msg)
// Upload a file (in xmpp case send the upload URL because xmpp has no native upload support) // Upload a file (in XMPP case send the upload URL because XMPP has no native upload support).
if msg.Extra != nil { if msg.Extra != nil {
for _, rmsg := range helper.HandleExtra(&msg, b.General) { for _, rmsg := range helper.HandleExtra(&msg, b.General) {
b.xc.Send(xmpp.Chat{Type: "groupchat", Remote: rmsg.Channel + "@" + b.GetString("Muc"), Text: rmsg.Username + rmsg.Text}) b.Log.Debugf("=> Sending attachement message %#v", rmsg)
if _, err := b.xc.Send(xmpp.Chat{
Type: "groupchat",
Remote: rmsg.Channel + "@" + b.GetString("Muc"),
Text: rmsg.Username + rmsg.Text,
}); err != nil {
b.Log.WithError(err).Error("Unable to send message with share URL.")
}
} }
if len(msg.Extra["file"]) > 0 { if len(msg.Extra["file"]) > 0 {
return b.handleUploadFile(&msg) return "", b.handleUploadFile(&msg)
} }
} }
var msgreplaceid string var msgReplaceID string
msgid := xid.New().String() msgID := xid.New().String()
if msg.ID != "" { if msg.ID != "" {
msgid = msg.ID msgID = msg.ID
msgreplaceid = msg.ID msgReplaceID = msg.ID
} }
// Post normal message // Post normal message.
_, err := b.xc.Send(xmpp.Chat{Type: "groupchat", Remote: msg.Channel + "@" + b.GetString("Muc"), Text: msg.Username + msg.Text, ID: msgid, ReplaceID: msgreplaceid}) b.Log.Debugf("=> Sending message %#v", msg)
if err != nil { if _, err := b.xc.Send(xmpp.Chat{
Type: "groupchat",
Remote: msg.Channel + "@" + b.GetString("Muc"),
Text: msg.Username + msg.Text,
ID: msgID,
ReplaceID: msgReplaceID,
}); err != nil {
return "", err return "", err
} }
return msgid, nil return msgID, nil
} }
func (b *Bxmpp) createXMPP() (*xmpp.Client, error) { func (b *Bxmpp) createXMPP() error {
tc := new(tls.Config) tc := &tls.Config{
tc.InsecureSkipVerify = b.GetBool("SkipTLSVerify") ServerName: strings.Split(b.GetString("Server"), ":")[0],
tc.ServerName = strings.Split(b.GetString("Server"), ":")[0] InsecureSkipVerify: b.GetBool("SkipTLSVerify"), // nolint: gosec
}
options := xmpp.Options{ options := xmpp.Options{
Host: b.GetString("Server"), Host: b.GetString("Server"),
User: b.GetString("Jid"), User: b.GetString("Jid"),
@ -127,7 +120,51 @@ func (b *Bxmpp) createXMPP() (*xmpp.Client, error) {
} }
var err error var err error
b.xc, err = options.NewClient() b.xc, err = options.NewClient()
return b.xc, err return err
}
func (b *Bxmpp) manageConnection() {
initial := true
bf := &backoff.Backoff{
Min: time.Second,
Max: 5 * time.Minute,
Jitter: true,
}
// Main connection loop. Each iteration corresponds to a successful
// connection attempt and the subsequent handling of the connection.
for {
if initial {
initial = false
} else {
b.Remote <- config.Message{
Username: "system",
Text: "rejoin",
Channel: "",
Account: b.Account,
Event: config.EventRejoinChannels,
}
}
if err := b.handleXMPP(); err != nil {
b.Log.WithError(err).Error("Disconnected.")
}
// Reconnection loop using an exponential back-off strategy. We
// only break out of the loop if we have successfully reconnected.
for {
d := bf.Duration()
b.Log.Infof("Reconnecting in %s.", d)
time.Sleep(d)
b.Log.Infof("Reconnecting now.")
if err := b.createXMPP(); err == nil {
bf.Reset()
break
}
b.Log.Warn("Failed to reconnect.")
}
}
} }
func (b *Bxmpp) xmppKeepAlive() chan bool { func (b *Bxmpp) xmppKeepAlive() chan bool {
@ -139,8 +176,7 @@ func (b *Bxmpp) xmppKeepAlive() chan bool {
select { select {
case <-ticker.C: case <-ticker.C:
b.Log.Debugf("PING") b.Log.Debugf("PING")
err := b.xc.PingC2S("", "") if err := b.xc.PingC2S("", ""); err != nil {
if err != nil {
b.Log.Debugf("PING failed %#v", err) b.Log.Debugf("PING failed %#v", err)
} }
case <-done: case <-done:
@ -152,31 +188,35 @@ func (b *Bxmpp) xmppKeepAlive() chan bool {
} }
func (b *Bxmpp) handleXMPP() error { func (b *Bxmpp) handleXMPP() error {
var ok bool
var msgid string
b.startTime = time.Now() b.startTime = time.Now()
done := b.xmppKeepAlive() done := b.xmppKeepAlive()
defer close(done) defer close(done)
for { for {
m, err := b.xc.Recv() m, err := b.xc.Recv()
if err != nil { if err != nil {
return err return err
} }
switch v := m.(type) { switch v := m.(type) {
case xmpp.Chat: case xmpp.Chat:
if v.Type == "groupchat" { if v.Type == "groupchat" {
b.Log.Debugf("== Receiving %#v", v) b.Log.Debugf("== Receiving %#v", v)
event := ""
// skip invalid messages // Skip invalid messages.
if b.skipMessage(v) { if b.skipMessage(v) {
continue continue
} }
var event string
if strings.Contains(v.Text, "has set the subject to:") { if strings.Contains(v.Text, "has set the subject to:") {
event = config.EventTopicChange event = config.EventTopicChange
} }
msgid = v.ID
msgID := v.ID
if v.ReplaceID != "" { if v.ReplaceID != "" {
msgid = v.ReplaceID msgID = v.ReplaceID
} }
rmsg := config.Message{ rmsg := config.Message{
Username: b.parseNick(v.Remote), Username: b.parseNick(v.Remote),
@ -184,21 +224,23 @@ func (b *Bxmpp) handleXMPP() error {
Channel: b.parseChannel(v.Remote), Channel: b.parseChannel(v.Remote),
Account: b.Account, Account: b.Account,
UserID: v.Remote, UserID: v.Remote,
ID: msgid, ID: msgID,
Event: event, Event: event,
} }
// check if we have an action event // Check if we have an action event.
var ok bool
rmsg.Text, ok = b.replaceAction(rmsg.Text) rmsg.Text, ok = b.replaceAction(rmsg.Text)
if ok { if ok {
rmsg.Event = config.EventUserAction rmsg.Event = config.EventUserAction
} }
b.Log.Debugf("<= Sending message from %s on %s to gateway", rmsg.Username, b.Account) b.Log.Debugf("<= Sending message from %s on %s to gateway", rmsg.Username, b.Account)
b.Log.Debugf("<= Message is %#v", rmsg) b.Log.Debugf("<= Message is %#v", rmsg)
b.Remote <- rmsg b.Remote <- rmsg
} }
case xmpp.Presence: case xmpp.Presence:
// do nothing // Do nothing.
} }
} }
} }
@ -211,30 +253,41 @@ func (b *Bxmpp) replaceAction(text string) (string, bool) {
} }
// handleUploadFile handles native upload of files // handleUploadFile handles native upload of files
func (b *Bxmpp) handleUploadFile(msg *config.Message) (string, error) { func (b *Bxmpp) handleUploadFile(msg *config.Message) error {
var urldesc = "" var urlDesc string
for _, f := range msg.Extra["file"] { for _, file := range msg.Extra["file"] {
fi := f.(config.FileInfo) fileInfo := file.(config.FileInfo)
if fi.Comment != "" { if fileInfo.Comment != "" {
msg.Text += fi.Comment + ": " msg.Text += fileInfo.Comment + ": "
} }
if fi.URL != "" { if fileInfo.URL != "" {
msg.Text = fi.URL msg.Text = fileInfo.URL
if fi.Comment != "" { if fileInfo.Comment != "" {
msg.Text = fi.Comment + ": " + fi.URL msg.Text = fileInfo.Comment + ": " + fileInfo.URL
urldesc = fi.Comment urlDesc = fileInfo.Comment
} }
} }
_, err := b.xc.Send(xmpp.Chat{Type: "groupchat", Remote: msg.Channel + "@" + b.GetString("Muc"), Text: msg.Username + msg.Text}) if _, err := b.xc.Send(xmpp.Chat{
if err != nil { Type: "groupchat",
return "", err Remote: msg.Channel + "@" + b.GetString("Muc"),
Text: msg.Username + msg.Text,
}); err != nil {
return err
} }
if fi.URL != "" {
b.xc.SendOOB(xmpp.Chat{Type: "groupchat", Remote: msg.Channel + "@" + b.GetString("Muc"), Ooburl: fi.URL, Oobdesc: urldesc}) if fileInfo.URL != "" {
if _, err := b.xc.SendOOB(xmpp.Chat{
Type: "groupchat",
Remote: msg.Channel + "@" + b.GetString("Muc"),
Ooburl: fileInfo.URL,
Oobdesc: urlDesc,
}); err != nil {
b.Log.WithError(err).Warn("Failed to send share URL.")
}
} }
} }
return "", nil return nil
} }
func (b *Bxmpp) parseNick(remote string) string { func (b *Bxmpp) parseNick(remote string) string {
@ -279,6 +332,5 @@ func (b *Bxmpp) skipMessage(message xmpp.Chat) bool {
} }
// skip delayed messages // skip delayed messages
t := time.Time{} return message.Stamp.IsZero()
return message.Stamp != t
} }