From 163f55f9c27e9e8a75774424d22598799e8306c6 Mon Sep 17 00:00:00 2001 From: Wim Date: Tue, 14 Feb 2017 21:12:02 +0100 Subject: [PATCH 1/3] Refactor to handle disconnects/reconnects better. Now try to reconnect every 60 seconds until forever. --- bridge/bridge.go | 24 +++++++++++++++++--- bridge/config/config.go | 1 + bridge/discord/discord.go | 4 ++++ bridge/gitter/gitter.go | 5 +++++ bridge/irc/irc.go | 14 ++++++++++-- bridge/mattermost/mattermost.go | 4 ++++ bridge/rocketchat/rocketchat.go | 5 +++++ bridge/slack/slack.go | 5 +++++ bridge/telegram/telegram.go | 5 +++++ bridge/xmpp/xmpp.go | 16 ++++++++----- gateway/gateway.go | 40 ++++++++++++++++++++++----------- 11 files changed, 99 insertions(+), 24 deletions(-) diff --git a/bridge/bridge.go b/bridge/bridge.go index b387812a..c8c6ac4e 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -10,6 +10,8 @@ import ( "github.com/42wim/matterbridge/bridge/slack" "github.com/42wim/matterbridge/bridge/telegram" "github.com/42wim/matterbridge/bridge/xmpp" + log "github.com/Sirupsen/logrus" + "strings" ) @@ -17,14 +19,18 @@ type Bridger interface { Send(msg config.Message) error Connect() error JoinChannel(channel string) error + Disconnect() error } type Bridge struct { Config config.Protocol Bridger - Name string - Account string - Protocol string + Name string + Account string + Protocol string + ChannelsOut []string + ChannelsIn []string + ChannelOptions config.ChannelOptions } func New(cfg *config.Config, bridge *config.Bridge, c chan config.Message) *Bridge { @@ -66,3 +72,15 @@ func New(cfg *config.Config, bridge *config.Bridge, c chan config.Message) *Brid } return b } + +func (b *Bridge) JoinChannels() error { + exists := make(map[string]bool) + for _, channel := range append(b.ChannelsIn, b.ChannelsOut...) { + if !exists[channel] { + log.Infof("%s: joining %s", b.Account, channel) + b.JoinChannel(channel) + exists[channel] = true + } + } + return nil +} diff --git a/bridge/config/config.go b/bridge/config/config.go index ac3e939b..811c97ae 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -10,6 +10,7 @@ import ( const ( EVENT_JOIN_LEAVE = "join_leave" + EVENT_FAILURE = "failure" ) type Message struct { diff --git a/bridge/discord/discord.go b/bridge/discord/discord.go index 620493b2..06508b84 100644 --- a/bridge/discord/discord.go +++ b/bridge/discord/discord.go @@ -80,6 +80,10 @@ func (b *bdiscord) Connect() error { return nil } +func (b *bdiscord) Disconnect() error { + return nil +} + func (b *bdiscord) JoinChannel(channel string) error { idcheck := strings.Split(channel, "ID:") if len(idcheck) > 1 { diff --git a/bridge/gitter/gitter.go b/bridge/gitter/gitter.go index 0400b4ba..d1f4b401 100644 --- a/bridge/gitter/gitter.go +++ b/bridge/gitter/gitter.go @@ -45,6 +45,11 @@ func (b *Bgitter) Connect() error { return nil } +func (b *Bgitter) Disconnect() error { + return nil + +} + func (b *Bgitter) JoinChannel(channel string) error { room := channel roomID := b.getRoomID(room) diff --git a/bridge/irc/irc.go b/bridge/irc/irc.go index fe8dc741..db430800 100644 --- a/bridge/irc/irc.go +++ b/bridge/irc/irc.go @@ -46,7 +46,6 @@ func New(cfg config.Protocol, account string, c chan config.Message) *Birc { if b.Config.MessageQueue == 0 { b.Config.MessageQueue = 30 } - b.Local = make(chan config.Message, b.Config.MessageQueue+10) return b } @@ -61,6 +60,7 @@ func (b *Birc) Command(msg *config.Message) string { } func (b *Birc) Connect() error { + b.Local = make(chan config.Message, b.Config.MessageQueue+10) flog.Infof("Connecting %s", b.Config.Server) i := irc.IRC(b.Config.Nick, b.Config.Nick) if log.GetLevel() == log.DebugLevel { @@ -91,6 +91,12 @@ func (b *Birc) Connect() error { return nil } +func (b *Birc) Disconnect() error { + b.i.Disconnect() + close(b.Local) + return nil +} + func (b *Birc) JoinChannel(channel string) error { b.i.Join(channel) return nil @@ -170,7 +176,11 @@ func (b *Birc) handleJoinPart(event *irc.Event) { flog.Debugf("Sending JOIN_LEAVE event from %s to gateway", b.Account) channel := event.Arguments[0] if event.Code == "QUIT" { - channel = "" + if event.Nick == b.Nick && strings.Contains(event.Raw, "Ping timeout") { + flog.Infof("%s reconnecting ..", b.Account) + b.Remote <- config.Message{Username: "system", Text: "reconnect", Channel: channel, Account: b.Account, Event: config.EVENT_FAILURE} + return + } } b.Remote <- config.Message{Username: "system", Text: event.Nick + " " + strings.ToLower(event.Code) + "s", Channel: channel, Account: b.Account, Event: config.EVENT_JOIN_LEAVE} flog.Debugf("handle %#v", event) diff --git a/bridge/mattermost/mattermost.go b/bridge/mattermost/mattermost.go index e2bf228d..126bab43 100644 --- a/bridge/mattermost/mattermost.go +++ b/bridge/mattermost/mattermost.go @@ -77,6 +77,10 @@ func (b *Bmattermost) Connect() error { return nil } +func (b *Bmattermost) Disconnect() error { + return nil +} + func (b *Bmattermost) JoinChannel(channel string) error { // we can only join channels using the API if b.Config.UseAPI { diff --git a/bridge/rocketchat/rocketchat.go b/bridge/rocketchat/rocketchat.go index d87450ec..4590a895 100644 --- a/bridge/rocketchat/rocketchat.go +++ b/bridge/rocketchat/rocketchat.go @@ -49,6 +49,11 @@ func (b *Brocketchat) Connect() error { return nil } +func (b *Brocketchat) Disconnect() error { + return nil + +} + func (b *Brocketchat) JoinChannel(channel string) error { return nil } diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index 763231d8..0f8806a2 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -65,6 +65,11 @@ func (b *Bslack) Connect() error { return nil } +func (b *Bslack) Disconnect() error { + return nil + +} + func (b *Bslack) JoinChannel(channel string) error { // we can only join channels using the API if b.Config.UseAPI { diff --git a/bridge/telegram/telegram.go b/bridge/telegram/telegram.go index 38d7fd12..aa637457 100644 --- a/bridge/telegram/telegram.go +++ b/bridge/telegram/telegram.go @@ -51,6 +51,11 @@ func (b *Btelegram) Connect() error { return nil } +func (b *Btelegram) Disconnect() error { + return nil + +} + func (b *Btelegram) JoinChannel(channel string) error { return nil } diff --git a/bridge/xmpp/xmpp.go b/bridge/xmpp/xmpp.go index 8899e718..4dcb8ef7 100644 --- a/bridge/xmpp/xmpp.go +++ b/bridge/xmpp/xmpp.go @@ -1,10 +1,10 @@ package bxmpp import ( + "crypto/tls" "github.com/42wim/matterbridge/bridge/config" log "github.com/Sirupsen/logrus" "github.com/mattn/go-xmpp" - "crypto/tls" "strings" "time" @@ -47,6 +47,10 @@ func (b *Bxmpp) Connect() error { return nil } +func (b *Bxmpp) Disconnect() error { + return nil +} + func (b *Bxmpp) JoinChannel(channel string) error { b.xc.JoinMUCNoHistory(channel+"@"+b.Config.Muc, b.Config.Nick) return nil @@ -63,11 +67,11 @@ func (b *Bxmpp) createXMPP() (*xmpp.Client, error) { tc.InsecureSkipVerify = b.Config.SkipTLSVerify tc.ServerName = strings.Split(b.Config.Server, ":")[0] options := xmpp.Options{ - Host: b.Config.Server, - User: b.Config.Jid, - Password: b.Config.Password, - NoTLS: true, - StartTLS: true, + Host: b.Config.Server, + User: b.Config.Jid, + Password: b.Config.Password, + NoTLS: true, + StartTLS: true, TLSConfig: tc, //StartTLS: false, diff --git a/gateway/gateway.go b/gateway/gateway.go index 82a76ef8..19571776 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -7,6 +7,7 @@ import ( log "github.com/Sirupsen/logrus" "reflect" "strings" + "time" ) type Gateway struct { @@ -39,24 +40,16 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { } log.Infof("Starting bridge: %s ", cfg.Account) br := bridge.New(gw.Config, cfg, gw.Message) + br.ChannelsOut = gw.ChannelsOut[br.Account] + br.ChannelsIn = gw.ChannelsIn[br.Account] + br.ChannelOptions = gw.ChannelOptions[br.Account] + gw.Bridges[cfg.Account] = br err := br.Connect() if err != nil { return fmt.Errorf("Bridge %s failed to start: %v", br.Account, err) } - exists := make(map[string]bool) - for _, channel := range append(gw.ChannelsOut[br.Account], gw.ChannelsIn[br.Account]...) { - if !exists[br.Account+channel] { - mychannel := channel - log.Infof("%s: joining %s", br.Account, channel) - if br.Protocol == "irc" && gw.ChannelOptions[br.Account+channel].Key != "" { - log.Debugf("using key %s for channel %s", gw.ChannelOptions[br.Account+channel].Key, channel) - mychannel = mychannel + " " + gw.ChannelOptions[br.Account+channel].Key - } - br.JoinChannel(mychannel) - exists[br.Account+channel] = true - } - } + br.JoinChannels() return nil } @@ -76,6 +69,13 @@ func (gw *Gateway) handleReceive() { for { select { case msg := <-gw.Message: + if msg.Event == config.EVENT_FAILURE { + for _, br := range gw.Bridges { + if msg.Account == br.Account { + go gw.reconnectBridge(br) + } + } + } if !gw.ignoreMessage(&msg) { for _, br := range gw.Bridges { gw.handleMessage(msg, br) @@ -85,6 +85,20 @@ func (gw *Gateway) handleReceive() { } } +func (gw *Gateway) reconnectBridge(br *bridge.Bridge) { + br.Disconnect() + time.Sleep(time.Second * 5) +RECONNECT: + log.Infof("Reconnecting %s", br.Account) + err := br.Connect() + if err != nil { + log.Errorf("Reconnection failed: %s. Trying again in 60 seconds", err) + time.Sleep(time.Second * 60) + goto RECONNECT + } + br.JoinChannels() +} + func (gw *Gateway) mapChannels() error { options := make(map[string]config.ChannelOptions) m := make(map[string][]string) From dc3723210076d7f7fecdfbf98b5de2f4540900ea Mon Sep 17 00:00:00 2001 From: Wim Date: Tue, 14 Feb 2017 23:52:45 +0100 Subject: [PATCH 2/3] Refactor. Make extra options easier for other protocols --- bridge/bridge.go | 29 +++++++++++++++++++++-------- gateway/gateway.go | 16 ++++++++++++---- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/bridge/bridge.go b/bridge/bridge.go index c8c6ac4e..db26c422 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -25,16 +25,17 @@ type Bridger interface { type Bridge struct { Config config.Protocol Bridger - Name string - Account string - Protocol string - ChannelsOut []string - ChannelsIn []string - ChannelOptions config.ChannelOptions + Name string + Account string + Protocol string + ChannelsIn map[string]config.ChannelOptions + ChannelsOut map[string]config.ChannelOptions } func New(cfg *config.Config, bridge *config.Bridge, c chan config.Message) *Bridge { b := new(Bridge) + b.ChannelsIn = make(map[string]config.ChannelOptions) + b.ChannelsOut = make(map[string]config.ChannelOptions) accInfo := strings.Split(bridge.Account, ".") protocol := accInfo[0] name := accInfo[1] @@ -75,10 +76,22 @@ func New(cfg *config.Config, bridge *config.Bridge, c chan config.Message) *Brid func (b *Bridge) JoinChannels() error { exists := make(map[string]bool) - for _, channel := range append(b.ChannelsIn, b.ChannelsOut...) { + b.joinChannels(b.ChannelsIn, exists) + b.joinChannels(b.ChannelsOut, exists) + return nil +} + +func (b *Bridge) joinChannels(cMap map[string]config.ChannelOptions, exists map[string]bool) error { + mychannel := "" + for channel, info := range cMap { if !exists[channel] { + mychannel = channel log.Infof("%s: joining %s", b.Account, channel) - b.JoinChannel(channel) + if b.Protocol == "irc" && info.Key != "" { + log.Debugf("using key %s for channel %s", info.Key, channel) + mychannel = mychannel + " " + info.Key + } + b.JoinChannel(mychannel) exists[channel] = true } } diff --git a/gateway/gateway.go b/gateway/gateway.go index 19571776..95bf902a 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -40,10 +40,8 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { } log.Infof("Starting bridge: %s ", cfg.Account) br := bridge.New(gw.Config, cfg, gw.Message) - br.ChannelsOut = gw.ChannelsOut[br.Account] - br.ChannelsIn = gw.ChannelsIn[br.Account] - br.ChannelOptions = gw.ChannelOptions[br.Account] - + gw.mapChannelsToBridge(br, gw.ChannelsOut) + gw.mapChannelsToBridge(br, gw.ChannelsIn) gw.Bridges[cfg.Account] = br err := br.Connect() if err != nil { @@ -53,6 +51,16 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { return nil } +func (gw *Gateway) mapChannelsToBridge(br *bridge.Bridge, cMap map[string][]string) { + for _, channel := range cMap[br.Account] { + if _, ok := gw.ChannelOptions[br.Account+channel]; ok { + br.ChannelsOut[channel] = gw.ChannelOptions[br.Account+channel] + } else { + br.ChannelsOut[channel] = config.ChannelOptions{} + } + } +} + func (gw *Gateway) Start() error { gw.mapChannels() for _, br := range append(gw.MyConfig.In, append(gw.MyConfig.InOut, gw.MyConfig.Out...)...) { From 62b165c0b4052f96ab89c358301bf246d239eba7 Mon Sep 17 00:00:00 2001 From: Wim Date: Fri, 17 Feb 2017 22:08:30 +0100 Subject: [PATCH 3/3] Refactor samechannelgateway --- gateway/gateway.go | 19 ++--- gateway/samechannel/samechannel.go | 114 ++++++++--------------------- matterbridge.go | 11 ++- 3 files changed, 44 insertions(+), 100 deletions(-) diff --git a/gateway/gateway.go b/gateway/gateway.go index 95bf902a..f965f8a6 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -12,14 +12,14 @@ import ( type Gateway struct { *config.Config - MyConfig *config.Gateway - //Bridges []*bridge.Bridge - Bridges map[string]*bridge.Bridge - ChannelsOut map[string][]string - ChannelsIn map[string][]string - ChannelOptions map[string]config.ChannelOptions - Name string - Message chan config.Message + MyConfig *config.Gateway + Bridges map[string]*bridge.Bridge + ChannelsOut map[string][]string + ChannelsIn map[string][]string + ChannelOptions map[string]config.ChannelOptions + Name string + Message chan config.Message + DestChannelFunc func(msg *config.Message, dest string) []string } func New(cfg *config.Config, gateway *config.Gateway) *Gateway { @@ -29,6 +29,7 @@ func New(cfg *config.Config, gateway *config.Gateway) *Gateway { gw.MyConfig = gateway gw.Message = make(chan config.Message) gw.Bridges = make(map[string]*bridge.Bridge) + gw.DestChannelFunc = gw.getDestChannel return gw } @@ -151,7 +152,7 @@ func (gw *Gateway) handleMessage(msg config.Message, dest *bridge.Bridge) { return } originchannel := msg.Channel - channels := gw.getDestChannel(&msg, dest.Account) + channels := gw.DestChannelFunc(&msg, dest.Account) for _, channel := range channels { // do not send the message to the bridge we come from if also the channel is the same if msg.Account == dest.Account && channel == originchannel { diff --git a/gateway/samechannel/samechannel.go b/gateway/samechannel/samechannel.go index bd8e3607..47bdfca1 100644 --- a/gateway/samechannel/samechannel.go +++ b/gateway/samechannel/samechannel.go @@ -1,105 +1,49 @@ package samechannelgateway import ( - "github.com/42wim/matterbridge/bridge" "github.com/42wim/matterbridge/bridge/config" - log "github.com/Sirupsen/logrus" - "strings" + "github.com/42wim/matterbridge/gateway" ) type SameChannelGateway struct { *config.Config - MyConfig *config.SameChannelGateway - Bridges map[string]*bridge.Bridge - Channels []string - ignoreNicks map[string][]string - Name string + MyConfig *config.SameChannelGateway + Channels []string + Name string } -func New(cfg *config.Config, gateway *config.SameChannelGateway) error { - c := make(chan config.Message) - gw := &SameChannelGateway{} - gw.Bridges = make(map[string]*bridge.Bridge) - gw.Name = gateway.Name - gw.Config = cfg - gw.MyConfig = gateway - gw.Channels = gateway.Channels - for _, account := range gateway.Accounts { - br := config.Bridge{Account: account} - log.Infof("Starting bridge: %s", account) - gw.Bridges[account] = bridge.New(cfg, &br, c) - } - for _, br := range gw.Bridges { - err := br.Connect() - if err != nil { - log.Fatalf("Bridge %s failed to start: %v", br.Account, err) - } - for _, channel := range gw.Channels { - log.Infof("%s: joining %s", br.Account, channel) - br.JoinChannel(channel) +func New(cfg *config.Config, gatewayCfg *config.SameChannelGateway) *SameChannelGateway { + return &SameChannelGateway{ + MyConfig: gatewayCfg, + Channels: gatewayCfg.Channels, + Name: gatewayCfg.Name, + Config: cfg} +} + +func (sgw *SameChannelGateway) Start() error { + gw := gateway.New(sgw.Config, &config.Gateway{Name: sgw.Name}) + gw.DestChannelFunc = sgw.getDestChannel + for _, account := range sgw.MyConfig.Accounts { + for _, channel := range sgw.Channels { + br := config.Bridge{Account: account, Channel: channel} + gw.MyConfig.InOut = append(gw.MyConfig.InOut, br) } } - gw.handleReceive(c) - return nil + return gw.Start() } -func (gw *SameChannelGateway) handleReceive(c chan config.Message) { - for { - select { - case msg := <-c: - if !gw.ignoreMessage(&msg) { - for _, br := range gw.Bridges { - gw.handleMessage(msg, br) - } - } - } - } -} - -func (gw *SameChannelGateway) handleMessage(msg config.Message, dest *bridge.Bridge) { - // is this a configured channel - if !gw.validChannel(msg.Channel) { - return - } - // do not send the message to the bridge we come from if also the channel is the same - if msg.Account == dest.Account { - return - } - gw.modifyUsername(&msg, dest) - log.Debugf("Sending %#v from %s (%s) to %s (%s)", msg, msg.Account, msg.Channel, dest.Account, msg.Channel) - err := dest.Send(msg) - if err != nil { - log.Error(err) - } -} - -func (gw *SameChannelGateway) ignoreMessage(msg *config.Message) bool { - for _, entry := range strings.Fields(gw.Bridges[msg.Account].Config.IgnoreNicks) { - if msg.Username == entry { - log.Debugf("ignoring %s from %s", msg.Username, msg.Account) - return true - } - } - return false -} - -func (gw *SameChannelGateway) modifyUsername(msg *config.Message, dest *bridge.Bridge) { - br := gw.Bridges[msg.Account] - nick := gw.Config.General.RemoteNickFormat - if nick == "" { - nick = dest.Config.RemoteNickFormat - } - nick = strings.Replace(nick, "{NICK}", msg.Username, -1) - nick = strings.Replace(nick, "{BRIDGE}", br.Name, -1) - nick = strings.Replace(nick, "{PROTOCOL}", br.Protocol, -1) - msg.Username = nick -} - -func (gw *SameChannelGateway) validChannel(channel string) bool { - for _, c := range gw.Channels { +func (sgw *SameChannelGateway) validChannel(channel string) bool { + for _, c := range sgw.Channels { if c == channel { return true } } return false } + +func (sgw *SameChannelGateway) getDestChannel(msg *config.Message, dest string) []string { + if sgw.validChannel(msg.Channel) { + return []string{msg.Channel} + } + return []string{} +} diff --git a/matterbridge.go b/matterbridge.go index bb125c82..32bc4285 100644 --- a/matterbridge.go +++ b/matterbridge.go @@ -36,12 +36,11 @@ func main() { continue } fmt.Printf("starting samechannel gateway %#v\n", gw.Name) - go func(gw config.SameChannelGateway) { - err := samechannelgateway.New(cfg, &gw) - if err != nil { - log.Fatalf("starting gateway failed %#v", err) - } - }(gw) + g := samechannelgateway.New(cfg, &gw) + err := g.Start() + if err != nil { + log.Fatalf("starting gateway failed %#v", err) + } } for _, gw := range cfg.Gateway {