Refactor and clean-up handlers. (slack) (#533)

This commit is contained in:
Duco van Amstel 2018-11-07 20:35:59 +00:00 committed by Wim
parent 06d66a0b2b
commit d2a1dc792f
4 changed files with 255 additions and 222 deletions

View File

@ -56,13 +56,10 @@ func SplitStringLength(input string, length int) string {
func HandleExtra(msg *config.Message, general *config.Protocol) []config.Message { func HandleExtra(msg *config.Message, general *config.Protocol) []config.Message {
extra := msg.Extra extra := msg.Extra
rmsg := []config.Message{} rmsg := []config.Message{}
if len(extra[config.EVENT_FILE_FAILURE_SIZE]) > 0 { for _, f := range extra[config.EVENT_FILE_FAILURE_SIZE] {
for _, f := range extra[config.EVENT_FILE_FAILURE_SIZE] { fi := f.(config.FileInfo)
fi := f.(config.FileInfo) text := fmt.Sprintf("file %s too big to download (%#v > allowed size: %#v)", fi.Name, fi.Size, general.MediaDownloadSize)
text := fmt.Sprintf("file %s too big to download (%#v > allowed size: %#v)", fi.Name, fi.Size, general.MediaDownloadSize) rmsg = append(rmsg, config.Message{Text: text, Username: "<system> ", Channel: msg.Channel, Account: msg.Account})
rmsg = append(rmsg, config.Message{Text: text, Username: "<system> ", Channel: msg.Channel, Account: msg.Account})
}
return rmsg
} }
return rmsg return rmsg
} }

View File

@ -1,7 +1,6 @@
package bslack package bslack
import ( import (
"bytes"
"fmt" "fmt"
"html" "html"
"regexp" "regexp"
@ -61,7 +60,15 @@ func (b *Bslack) handleSlackClient(messages chan *config.Message) {
case *slack.OutgoingErrorEvent: case *slack.OutgoingErrorEvent:
b.Log.Debugf("%#v", ev.Error()) b.Log.Debugf("%#v", ev.Error())
case *slack.ChannelJoinedEvent: case *slack.ChannelJoinedEvent:
// When we join a channel we update the full list of users as
// well as the information for the channel that we joined as this
// should now tell that we are a member of it.
b.populateUsers() b.populateUsers()
b.channelsMutex.Lock()
b.channelsByID[ev.Channel.ID] = &ev.Channel
b.channelsByName[ev.Channel.Name] = &ev.Channel
b.channelsMutex.Unlock()
case *slack.ConnectedEvent: case *slack.ConnectedEvent:
b.si = ev.Info b.si = ev.Info
b.populateChannels() b.populateChannels()
@ -90,108 +97,115 @@ func (b *Bslack) handleMatterHook(messages chan *config.Message) {
} }
} }
var commentRE = regexp.MustCompile(`.*?commented: (.*)`) // skipMessageEvent skips event that need to be skipped :-)
func (b *Bslack) skipMessageEvent(ev *slack.MessageEvent) bool {
switch ev.SubType {
case sChannelLeave, sChannelJoin:
return b.GetBool(noSendJoinConfig)
case sPinnedItem, sUnpinnedItem:
return true
}
// handleDownloadFile handles file download // Skip any messages that we made ourselves or from 'slackbot' (see #527).
func (b *Bslack) handleDownloadFile(rmsg *config.Message, file *slack.File) error { if ev.Username == sSlackBotUser ||
// if we have a file attached, download it (in memory) and put a pointer to it in msg.Extra (b.rtm != nil && ev.Username == b.si.User.Name) ||
// limit to 1MB for now (len(ev.Attachments) > 0 && ev.Attachments[0].CallbackID == "matterbridge_"+b.uuid) {
comment := "" return true
results := commentRE.FindAllStringSubmatch(rmsg.Text, -1)
if len(results) > 0 {
comment = results[0][1]
} }
err := helper.HandleDownloadSize(b.Log, rmsg, file.Name, int64(file.Size), b.General)
if err != nil { // It seems ev.SubMessage.Edited == nil when slack unfurls.
return err // Do not forward these messages. See Github issue #266.
if ev.SubMessage != nil &&
ev.SubMessage.ThreadTimestamp != ev.SubMessage.Timestamp &&
ev.SubMessage.Edited == nil {
return true
} }
// actually download the file return false
data, err := helper.DownloadFileAuth(file.URLPrivateDownload, "Bearer "+b.GetString(tokenConfig))
if err != nil {
return fmt.Errorf("download %s failed %#v", file.URLPrivateDownload, err)
}
// add the downloaded data to the message
helper.HandleDownloadData(b.Log, rmsg, file.Name, comment, file.URLPrivateDownload, data, b.General)
return nil
} }
// handleUploadFile handles native upload of files // handleMessageEvent handles the message events. Together with any called sub-methods,
func (b *Bslack) handleUploadFile(msg *config.Message, channelID string) { // this method implements the following event processing pipeline:
for _, f := range msg.Extra["file"] { //
fi := f.(config.FileInfo) // 1. Check if the message should be ignored.
if msg.Text == fi.Comment { // NOTE: This is not actually part of the method below but is done just before it
msg.Text = "" // is called via the 'skipMessageEvent()' method.
} // 2. Populate the Matterbridge message that will be sent to the router based on the
/* because the result of the UploadFile is slower than the MessageEvent from slack // received event and logic that is common to all events that are not skipped.
we can't match on the file ID yet, so we have to match on the filename too // 3. Detect and handle any message that is "status" related (think join channel, etc.).
*/ // This might result in an early exit from the pipeline and passing of the
b.Log.Debugf("Adding file %s to cache %s", fi.Name, time.Now().String()) // pre-populated message to the Matterbridge router.
b.cache.Add("filename"+fi.Name, time.Now()) // 4. Handle the specific case of messages that edit existing messages depending on
res, err := b.sc.UploadFile(slack.FileUploadParameters{ // configuration.
Reader: bytes.NewReader(*fi.Data), // 5. Handle any attachments of the received event.
Filename: fi.Name, // 6. Check that the Matterbridge message that we end up with after at the end of the
Channels: []string{channelID}, // pipeline is valid before sending it to the Matterbridge router.
InitialComment: fi.Comment,
})
if res.ID != "" {
b.Log.Debugf("Adding fileid %s to cache %s", res.ID, time.Now().String())
b.cache.Add("file"+res.ID, time.Now())
}
if err != nil {
b.Log.Errorf("uploadfile %#v", err)
}
}
}
// handleMessageEvent handles the message events
func (b *Bslack) handleMessageEvent(ev *slack.MessageEvent) (*config.Message, error) { func (b *Bslack) handleMessageEvent(ev *slack.MessageEvent) (*config.Message, error) {
var err error rmsg, err := b.populateReceivedMessage(ev)
// update the userlist on a channel_join
if ev.SubType == sChannelJoin {
b.populateUsers()
}
// Edit message
if !b.GetBool(editDisableConfig) && ev.SubMessage != nil && ev.SubMessage.ThreadTimestamp != ev.SubMessage.Timestamp {
b.Log.Debugf("SubMessage %#v", ev.SubMessage)
ev.User = ev.SubMessage.User
ev.Text = ev.SubMessage.Text + b.GetString(editSuffixConfig)
}
// use our own func because rtm.GetChannelInfo doesn't work for private channels
channelInfo, err := b.getChannelByID(ev.Channel)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rmsg := config.Message{ // Handle some message types early.
Text: ev.Text, if b.handleStatusEvent(ev, rmsg) {
Channel: channelInfo.Name, return rmsg, nil
Account: b.Account,
ID: "slack " + ev.Timestamp,
Extra: map[string][]interface{}{},
ParentID: ev.ThreadTimestamp,
} }
if b.useChannelID { // Handle 'edit' messages.
rmsg.Channel = "ID:" + channelInfo.ID if ev.SubMessage != nil && !b.GetBool(editDisableConfig) {
} rmsg.ID = "slack " + ev.SubMessage.Timestamp
if ev.SubMessage.ThreadTimestamp != ev.SubMessage.Timestamp {
// find the user id and name b.Log.Debugf("SubMessage %#v", ev.SubMessage)
if ev.User != "" && ev.SubType != sMessageDeleted && ev.SubType != sFileComment { rmsg.Username = ev.SubMessage.User
user, err := b.rtm.GetUserInfo(ev.User) rmsg.Text = ev.SubMessage.Text + b.GetString(editSuffixConfig)
if err != nil {
return nil, err
}
rmsg.UserID = user.ID
rmsg.Username = user.Name
if user.Profile.DisplayName != "" {
rmsg.Username = user.Profile.DisplayName
} }
} }
// See if we have some text in the attachments b.handleAttachments(ev, rmsg)
// Verify that we have the right information and the message
// is well-formed before sending it out to the router.
if len(ev.Files) == 0 && (rmsg.Text == "" || rmsg.Username == "") {
if ev.BotID != "" {
// This is probably a webhook we couldn't resolve.
return nil, fmt.Errorf("message handling resulted in an empty bot message (probably an incoming webhook we couldn't resolve): %#v", ev)
}
return nil, fmt.Errorf("message handling resulted in an empty message: %#v", ev)
}
return rmsg, nil
}
func (b *Bslack) handleStatusEvent(ev *slack.MessageEvent, rmsg *config.Message) bool {
switch ev.SubType {
case sChannelJoined, sMemberJoined:
b.populateUsers()
// There's no further processing needed on channel events
// so we return 'true'.
return true
case sChannelJoin, sChannelLeave:
rmsg.Username = sSystemUser
rmsg.Event = config.EVENT_JOIN_LEAVE
case sChannelTopic, sChannelPurpose:
rmsg.Event = config.EVENT_TOPIC_CHANGE
case sMessageDeleted:
rmsg.Text = config.EVENT_MSG_DELETE
rmsg.Event = config.EVENT_MSG_DELETE
rmsg.ID = "slack " + ev.DeletedTimestamp
// If a message is being deleted we do not need to process
// the event any further so we return 'true'.
return true
case sMeMessage:
rmsg.Event = config.EVENT_USER_ACTION
}
return false
}
func (b *Bslack) handleAttachments(ev *slack.MessageEvent, rmsg *config.Message) {
// File comments are set by the system (because there is no username given).
if ev.SubType == sFileComment {
rmsg.Username = sSystemUser
}
// See if we have some text in the attachments.
if rmsg.Text == "" { if rmsg.Text == "" {
for _, attach := range ev.Attachments { for _, attach := range ev.Attachments {
if attach.Text != "" { if attach.Text != "" {
@ -205,134 +219,55 @@ func (b *Bslack) handleMessageEvent(ev *slack.MessageEvent) (*config.Message, er
} }
} }
// when using webhookURL we can't check if it's our webhook or not for now // Save the attachments, so that we can send them to other slack (compatible) bridges.
if rmsg.Username == "" && ev.BotID != "" && b.GetString(outgoingWebhookConfig) == "" {
bot, err := b.rtm.GetBotInfo(ev.BotID)
if err != nil {
return nil, err
}
if bot.Name != "" {
rmsg.Username = bot.Name
if ev.Username != "" {
rmsg.Username = ev.Username
}
rmsg.UserID = bot.ID
}
// fixes issues with matterircd users
if bot.Name == "Slack API Tester" {
user, err := b.rtm.GetUserInfo(ev.User)
if err != nil {
return nil, err
}
rmsg.UserID = user.ID
rmsg.Username = user.Name
if user.Profile.DisplayName != "" {
rmsg.Username = user.Profile.DisplayName
}
}
}
// file comments are set by the system (because there is no username given)
if ev.SubType == sFileComment {
rmsg.Username = sSystemUser
}
// do we have a /me action
if ev.SubType == sMeMessage {
rmsg.Event = config.EVENT_USER_ACTION
}
// Handle join/leave
if ev.SubType == sChannelLeave || ev.SubType == sChannelJoin {
rmsg.Username = sSystemUser
rmsg.Event = config.EVENT_JOIN_LEAVE
}
// edited messages have a submessage, use this timestamp
if ev.SubMessage != nil {
rmsg.ID = "slack " + ev.SubMessage.Timestamp
}
// deleted message event
if ev.SubType == sMessageDeleted {
rmsg.Text = config.EVENT_MSG_DELETE
rmsg.Event = config.EVENT_MSG_DELETE
rmsg.ID = "slack " + ev.DeletedTimestamp
}
// topic change event
if ev.SubType == sChannelTopic || ev.SubType == sChannelPurpose {
rmsg.Event = config.EVENT_TOPIC_CHANGE
}
// Only deleted messages can have a empty username and text
if (rmsg.Text == "" || rmsg.Username == "") && ev.SubType != sMessageDeleted && len(ev.Files) == 0 {
// this is probably a webhook we couldn't resolve
if ev.BotID != "" {
return nil, fmt.Errorf("probably an incoming webhook we couldn't resolve (maybe ourselves)")
}
return nil, fmt.Errorf("empty message and not a deleted message")
}
// save the attachments, so that we can send them to other slack (compatible) bridges
if len(ev.Attachments) > 0 { if len(ev.Attachments) > 0 {
rmsg.Extra[sSlackAttachment] = append(rmsg.Extra[sSlackAttachment], ev.Attachments) rmsg.Extra[sSlackAttachment] = append(rmsg.Extra[sSlackAttachment], ev.Attachments)
} }
// if we have a file attached, download it (in memory) and put a pointer to it in msg.Extra // If we have files attached, download them (in memory) and put a pointer to it in msg.Extra.
for _, f := range ev.Files { for _, f := range ev.Files {
err := b.handleDownloadFile(&rmsg, &f) err := b.handleDownloadFile(rmsg, &f)
if err != nil { if err != nil {
b.Log.Errorf("download failed: %s", err) b.Log.Errorf("Could not download incoming file: %#v", err)
} }
} }
return &rmsg, nil
} }
// skipMessageEvent skips event that need to be skipped :-) var commentRE = regexp.MustCompile(`.*?commented: (.*)`)
func (b *Bslack) skipMessageEvent(ev *slack.MessageEvent) bool {
if ev.SubType == sChannelLeave || ev.SubType == sChannelJoin { // handleDownloadFile handles file download
return b.GetBool(noSendJoinConfig) func (b *Bslack) handleDownloadFile(rmsg *config.Message, file *slack.File) error {
if b.fileIsAvailable(file) {
return nil
} }
// ignore pinned items // Check that the file is neither too large nor blacklisted.
if ev.SubType == sPinnedItem || ev.SubType == sUnpinnedItem { if err := helper.HandleDownloadSize(b.Log, rmsg, file.Name, int64(file.Size), b.General); err != nil {
b.Log.WithError(err).Infof("Skipping download of incoming file.")
return nil
}
// Actually download the file.
data, err := helper.DownloadFileAuth(file.URLPrivateDownload, "Bearer "+b.GetString(tokenConfig))
if err != nil {
return fmt.Errorf("download %s failed %#v", file.URLPrivateDownload, err)
}
// Add the downloaded data to the message.
var comment string
if results := commentRE.FindAllStringSubmatch(rmsg.Text, -1); len(results) > 0 {
comment = results[0][1]
}
helper.HandleDownloadData(b.Log, rmsg, file.Name, comment, file.URLPrivateDownload, data, b.General)
return nil
}
func (b *Bslack) fileIsAvailable(file *slack.File) bool {
// Only download a file if it is not in the cache or if it has been entered more than a minute ago.
if ts, ok := b.cache.Get("file" + file.ID); ok && time.Since(ts.(time.Time)) > time.Minute {
return true
} else if ts, ok = b.cache.Get("filename" + file.Name); ok && time.Since(ts.(time.Time)) > 10*time.Second {
return true return true
} }
// do not send messages from ourself
if b.GetString(outgoingWebhookConfig) == "" && b.GetString(incomingWebhookConfig) == "" && ev.Username == b.si.User.Name {
return true
}
// skip messages we made ourselves
if len(ev.Attachments) > 0 {
if ev.Attachments[0].CallbackID == "matterbridge_"+b.uuid {
return true
}
}
if !b.GetBool(editDisableConfig) && ev.SubMessage != nil && ev.SubMessage.ThreadTimestamp != ev.SubMessage.Timestamp {
// it seems ev.SubMessage.Edited == nil when slack unfurls
// do not forward these messages #266
if ev.SubMessage.Edited == nil {
return true
}
}
for _, f := range ev.Files {
// if the file is in the cache and isn't older then a minute, skip it
if ts, ok := b.cache.Get("file" + f.ID); ok && time.Since(ts.(time.Time)) < time.Minute {
b.Log.Debugf("Not downloading file id %s which we uploaded", f.ID)
return true
} else if ts, ok := b.cache.Get("filename" + f.Name); ok && time.Since(ts.(time.Time)) < 10*time.Second {
b.Log.Debugf("Not downloading file name %s which we uploaded", f.Name)
return true
}
b.Log.Debugf("Not skipping %s %s", f.Name, time.Now().String())
}
return false return false
} }

View File

@ -6,27 +6,31 @@ import (
"strings" "strings"
"time" "time"
"github.com/42wim/matterbridge/bridge/config"
"github.com/nlopes/slack" "github.com/nlopes/slack"
) )
func (b *Bslack) getUser(id string) *slack.User {
b.usersMutex.RLock()
defer b.usersMutex.RUnlock()
return b.users[id]
}
func (b *Bslack) getUsername(id string) string { func (b *Bslack) getUsername(id string) string {
for _, u := range b.users { if user := b.getUser(id); user != nil {
if u.ID == id { if user.Profile.DisplayName != "" {
if u.Profile.DisplayName != "" { return user.Profile.DisplayName
return u.Profile.DisplayName
}
return u.Name
} }
return user.Name
} }
b.Log.Warnf("Could not find user with ID '%s'", id) b.Log.Warnf("Could not find user with ID '%s'", id)
return "" return ""
} }
func (b *Bslack) getAvatar(userid string) string { func (b *Bslack) getAvatar(id string) string {
for _, u := range b.users { if user := b.getUser(id); user != nil {
if userid == u.ID { return user.Profile.Image48
return u.Profile.Image48
}
} }
return "" return ""
} }
@ -142,6 +146,66 @@ func (b *Bslack) populateChannels() {
b.refreshInProgress = false b.refreshInProgress = false
} }
// populateReceivedMessage shapes the initial Matterbridge message that we will forward to the
// router before we apply message-dependent modifications.
func (b *Bslack) populateReceivedMessage(ev *slack.MessageEvent) (*config.Message, error) {
// Use our own func because rtm.GetChannelInfo doesn't work for private channels.
channel, err := b.getChannelByID(ev.Channel)
if err != nil {
return nil, err
}
rmsg := &config.Message{
Text: ev.Text,
Channel: channel.Name,
Account: b.Account,
ID: "slack " + ev.Timestamp,
Extra: make(map[string][]interface{}),
ParentID: ev.ThreadTimestamp,
}
if b.useChannelID {
rmsg.Channel = "ID:" + channel.ID
}
if err = b.populateMessageWithUserInfo(ev, rmsg); err != nil {
return nil, err
}
return rmsg, err
}
func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *config.Message) error {
if ev.SubType == sMessageDeleted || ev.SubType == sFileComment {
return nil
}
if ev.BotID != "" && b.GetString(outgoingWebhookConfig) == "" {
bot, err := b.rtm.GetBotInfo(ev.BotID)
if err != nil {
return err
}
if bot.Name != "" && bot.Name != "Slack API Tester" {
rmsg.Username = bot.Name
if ev.Username != "" {
rmsg.Username = ev.Username
}
rmsg.UserID = bot.ID
}
}
if ev.User != "" {
user := b.getUser(ev.User)
if user == nil {
return fmt.Errorf("could not find information for user with id %s", ev.User)
}
rmsg.UserID = user.ID
rmsg.Username = user.Name
if user.Profile.DisplayName != "" {
rmsg.Username = user.Profile.DisplayName
}
}
return nil
}
var ( var (
mentionRE = regexp.MustCompile(`<@([a-zA-Z0-9]+)>`) mentionRE = regexp.MustCompile(`<@([a-zA-Z0-9]+)>`)
channelRE = regexp.MustCompile(`<#[a-zA-Z0-9]+\|(.+?)>`) channelRE = regexp.MustCompile(`<#[a-zA-Z0-9]+\|(.+?)>`)

View File

@ -1,6 +1,7 @@
package bslack package bslack
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
@ -45,6 +46,8 @@ type Bslack struct {
const ( const (
sChannelJoin = "channel_join" sChannelJoin = "channel_join"
sChannelLeave = "channel_leave" sChannelLeave = "channel_leave"
sChannelJoined = "channel_joined"
sMemberJoined = "member_joined_channel"
sMessageDeleted = "message_deleted" sMessageDeleted = "message_deleted"
sSlackAttachment = "slack_attachment" sSlackAttachment = "slack_attachment"
sPinnedItem = "pinned_item" sPinnedItem = "pinned_item"
@ -56,6 +59,7 @@ const (
sUserTyping = "user_typing" sUserTyping = "user_typing"
sLatencyReport = "latency_report" sLatencyReport = "latency_report"
sSystemUser = "system" sSystemUser = "system"
sSlackBotUser = "slackbot"
tokenConfig = "Token" tokenConfig = "Token"
incomingWebhookConfig = "WebhookBindAddress" incomingWebhookConfig = "WebhookBindAddress"
@ -295,7 +299,7 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {
messageParameters := b.prepareMessageParameters(&msg) messageParameters := b.prepareMessageParameters(&msg)
// Upload a file if it exists // Upload a file if it exists.
if msg.Extra != nil { if msg.Extra != nil {
for _, rmsg := range helper.HandleExtra(&msg, b.General) { for _, rmsg := range helper.HandleExtra(&msg, b.General) {
_, _, err = b.rtm.PostMessage(channelInfo.ID, rmsg.Username+rmsg.Text, *messageParameters) _, _, err = b.rtm.PostMessage(channelInfo.ID, rmsg.Username+rmsg.Text, *messageParameters)
@ -315,6 +319,39 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {
return "slack " + id, nil return "slack " + id, nil
} }
// handleUploadFile handles native upload of files
func (b *Bslack) handleUploadFile(msg *config.Message, channelID string) {
for _, f := range msg.Extra["file"] {
fi := f.(config.FileInfo)
if msg.Text == fi.Comment {
msg.Text = ""
}
// Because the result of the UploadFile is slower than the MessageEvent from slack
// we can't match on the file ID yet, so we have to match on the filename too.
ts := time.Now()
b.Log.Debugf("Adding file %s to cache at %s with timestamp", fi.Name, ts.String())
if !b.cache.Add("filename"+fi.Name, ts) {
b.Log.Warnf("Failed to add file %s to cache at %s with timestamp", fi.Name, ts.String())
}
res, err := b.sc.UploadFile(slack.FileUploadParameters{
Reader: bytes.NewReader(*fi.Data),
Filename: fi.Name,
Channels: []string{channelID},
InitialComment: fi.Comment,
})
if err != nil {
b.Log.Errorf("uploadfile %#v", err)
return
}
if res.ID != "" {
b.Log.Debugf("Adding file ID %s to cache with timestamp %s", res.ID, ts.String())
if !b.cache.Add("file"+res.ID, ts) {
b.Log.Warnf("Failed to add file ID %s to cache with timestamp %s", res.ID, ts.String())
}
}
}
}
func (b *Bslack) prepareMessageParameters(msg *config.Message) *slack.PostMessageParameters { func (b *Bslack) prepareMessageParameters(msg *config.Message) *slack.PostMessageParameters {
params := slack.NewPostMessageParameters() params := slack.NewPostMessageParameters()
if b.GetBool(useNickPrefixConfig) { if b.GetBool(useNickPrefixConfig) {