diff --git a/bridge/bridge.go b/bridge/bridge.go index 5754ca95..047370be 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -3,6 +3,7 @@ package bridge import ( //"fmt" "github.com/42wim/matterbridge/bridge/config" + "github.com/42wim/matterbridge/bridge/gitter" "github.com/42wim/matterbridge/bridge/irc" "github.com/42wim/matterbridge/bridge/mattermost" "github.com/42wim/matterbridge/bridge/xmpp" @@ -38,6 +39,9 @@ func NewBridge(cfg *config.Config) error { if cfg.Xmpp.Enable { b.Bridges = append(b.Bridges, bxmpp.New(cfg, c)) } + if cfg.Gitter.Enable { + b.Bridges = append(b.Bridges, bgitter.New(cfg, c)) + } if len(b.Bridges) < 2 { log.Fatalf("only %d sections enabled. Need at least 2 sections enabled (eg [IRC] and [mattermost]", len(b.Bridges)) } @@ -67,6 +71,7 @@ func (b *Bridge) mapChannels() error { m["irc"] = val.IRC m["mattermost"] = val.Mattermost m["xmpp"] = val.Xmpp + m["gitter"] = val.Gitter b.Channels = append(b.Channels, m) } return nil @@ -76,7 +81,8 @@ func (b *Bridge) mapIgnores() { m := make(map[string][]string) m["irc"] = strings.Fields(b.Config.IRC.IgnoreNicks) m["mattermost"] = strings.Fields(b.Config.Mattermost.IgnoreNicks) - m["xmpp"] = strings.Fields(b.Config.Mattermost.IgnoreNicks) + m["xmpp"] = strings.Fields(b.Config.Xmpp.IgnoreNicks) + m["gitter"] = strings.Fields(b.Config.Gitter.IgnoreNicks) b.ignoreNicks = m } @@ -126,6 +132,8 @@ func (b *Bridge) modifyMessage(msg *config.Message, dest string) { switch dest { case "irc": setNickFormat(msg, b.Config.IRC.RemoteNickFormat) + case "gitter": + setNickFormat(msg, b.Config.Gitter.RemoteNickFormat) case "xmpp": setNickFormat(msg, b.Config.Xmpp.RemoteNickFormat) case "mattermost": diff --git a/bridge/config/config.go b/bridge/config/config.go index 03836b63..cd353fcb 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -28,6 +28,14 @@ type Config struct { IgnoreNicks string Enable bool } + Gitter struct { + Enable bool + IgnoreNicks string + Nick string + RemoteNickFormat string + Token string + } + Mattermost struct { URL string ShowJoinPart bool @@ -48,6 +56,7 @@ type Config struct { Enable bool } Xmpp struct { + IgnoreNicks string Jid string Password string Server string @@ -60,6 +69,7 @@ type Config struct { IRC string Mattermost string Xmpp string + Gitter string } General struct { GiphyAPIKey string diff --git a/bridge/gitter/gitter.go b/bridge/gitter/gitter.go new file mode 100644 index 00000000..1d1b0a59 --- /dev/null +++ b/bridge/gitter/gitter.go @@ -0,0 +1,110 @@ +package bgitter + +import ( + "github.com/42wim/matterbridge/bridge/config" + log "github.com/Sirupsen/logrus" + "github.com/sromku/go-gitter" + "strings" +) + +type Bgitter struct { + c *gitter.Gitter + *config.Config + Remote chan config.Message + Rooms []gitter.Room +} + +type Message struct { + Text string + Channel string + Username string +} + +var flog *log.Entry + +func init() { + flog = log.WithFields(log.Fields{"module": "gitter"}) +} + +func New(config *config.Config, c chan config.Message) *Bgitter { + b := &Bgitter{} + b.Config = config + b.Remote = c + return b +} + +func (b *Bgitter) Connect() error { + var err error + flog.Info("Trying Gitter connection") + b.c = gitter.New(b.Config.Gitter.Token) + _, err = b.c.GetUser() + if err != nil { + flog.Debugf("%#v", err) + return err + } + flog.Info("Connection succeeded") + b.setupChannels() + go b.handleGitter() + return nil +} + +func (b *Bgitter) Name() string { + return "gitter" +} + +func (b *Bgitter) Send(msg config.Message) error { + roomID := b.getRoomID(msg.Channel) + if roomID == "" { + flog.Errorf("Could not find roomID for %v", msg.Channel) + return nil + } + // add ZWSP because gitter echoes our own messages + return b.c.SendMessage(roomID, msg.Username+msg.Text+" ​") +} + +func (b *Bgitter) getRoomID(channel string) string { + for _, v := range b.Rooms { + if v.URI == channel { + return v.ID + } + } + return "" +} + +func (b *Bgitter) handleGitter() { + for _, val := range b.Config.Channel { + room := val.Gitter + roomID := b.getRoomID(room) + if roomID == "" { + continue + } + stream := b.c.Stream(roomID) + go b.c.Listen(stream) + + go func(stream *gitter.Stream, room string) { + for { + event := <-stream.Event + switch ev := event.Data.(type) { + case *gitter.MessageReceived: + // check for ZWSP to see if it's not an echo + if !strings.HasSuffix(ev.Message.Text, "​") { + b.Remote <- config.Message{Username: ev.Message.From.Username, Text: ev.Message.Text, Channel: room, Origin: "gitter"} + } + case *gitter.GitterConnectionClosed: + flog.Errorf("connection with gitter closed for room %s", room) + } + } + }(stream, room) + } +} + +func (b *Bgitter) setupChannels() { + b.Rooms, _ = b.c.GetRooms() + for _, val := range b.Config.Channel { + flog.Infof("Joining %s as %s", val.Gitter, b.Gitter.Nick) + _, err := b.c.JoinRoom(val.Gitter) + if err != nil { + log.Errorf("Joining %s failed", val.Gitter) + } + } +} diff --git a/vendor/github.com/mreiferson/go-httpclient/LICENSE b/vendor/github.com/mreiferson/go-httpclient/LICENSE new file mode 100644 index 00000000..ba625cab --- /dev/null +++ b/vendor/github.com/mreiferson/go-httpclient/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2012 Matt Reiferson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/mreiferson/go-httpclient/httpclient.go b/vendor/github.com/mreiferson/go-httpclient/httpclient.go new file mode 100644 index 00000000..89e018bf --- /dev/null +++ b/vendor/github.com/mreiferson/go-httpclient/httpclient.go @@ -0,0 +1,237 @@ +/* +Provides an HTTP Transport that implements the `RoundTripper` interface and +can be used as a built in replacement for the standard library's, providing: + + * connection timeouts + * request timeouts + +This is a thin wrapper around `http.Transport` that sets dial timeouts and uses +Go's internal timer scheduler to call the Go 1.1+ `CancelRequest()` API. +*/ +package httpclient + +import ( + "crypto/tls" + "errors" + "io" + "net" + "net/http" + "net/url" + "sync" + "time" +) + +// returns the current version of the package +func Version() string { + return "0.4.1" +} + +// Transport implements the RoundTripper interface and can be used as a replacement +// for Go's built in http.Transport implementing end-to-end request timeouts. +// +// transport := &httpclient.Transport{ +// ConnectTimeout: 1*time.Second, +// ResponseHeaderTimeout: 5*time.Second, +// RequestTimeout: 10*time.Second, +// } +// defer transport.Close() +// +// client := &http.Client{Transport: transport} +// req, _ := http.NewRequest("GET", "http://127.0.0.1/test", nil) +// resp, err := client.Do(req) +// if err != nil { +// return err +// } +// defer resp.Body.Close() +// +type Transport struct { + // Proxy specifies a function to return a proxy for a given + // *http.Request. If the function returns a non-nil error, the + // request is aborted with the provided error. + // If Proxy is nil or returns a nil *url.URL, no proxy is used. + Proxy func(*http.Request) (*url.URL, error) + + // Dial specifies the dial function for creating TCP + // connections. This will override the Transport's ConnectTimeout and + // ReadWriteTimeout settings. + // If Dial is nil, a dialer is generated on demand matching the Transport's + // options. + Dial func(network, addr string) (net.Conn, error) + + // TLSClientConfig specifies the TLS configuration to use with + // tls.Client. If nil, the default configuration is used. + TLSClientConfig *tls.Config + + // DisableKeepAlives, if true, prevents re-use of TCP connections + // between different HTTP requests. + DisableKeepAlives bool + + // DisableCompression, if true, prevents the Transport from + // requesting compression with an "Accept-Encoding: gzip" + // request header when the Request contains no existing + // Accept-Encoding value. If the Transport requests gzip on + // its own and gets a gzipped response, it's transparently + // decoded in the Response.Body. However, if the user + // explicitly requested gzip it is not automatically + // uncompressed. + DisableCompression bool + + // MaxIdleConnsPerHost, if non-zero, controls the maximum idle + // (keep-alive) to keep per-host. If zero, + // http.DefaultMaxIdleConnsPerHost is used. + MaxIdleConnsPerHost int + + // ConnectTimeout, if non-zero, is the maximum amount of time a dial will wait for + // a connect to complete. + ConnectTimeout time.Duration + + // ResponseHeaderTimeout, if non-zero, specifies the amount of + // time to wait for a server's response headers after fully + // writing the request (including its body, if any). This + // time does not include the time to read the response body. + ResponseHeaderTimeout time.Duration + + // RequestTimeout, if non-zero, specifies the amount of time for the entire + // request to complete (including all of the above timeouts + entire response body). + // This should never be less than the sum total of the above two timeouts. + RequestTimeout time.Duration + + // ReadWriteTimeout, if non-zero, will set a deadline for every Read and + // Write operation on the request connection. + ReadWriteTimeout time.Duration + + // TCPWriteBufferSize, the size of the operating system's write + // buffer associated with the connection. + TCPWriteBufferSize int + + // TCPReadBuffserSize, the size of the operating system's read + // buffer associated with the connection. + TCPReadBufferSize int + + starter sync.Once + transport *http.Transport +} + +// Close cleans up the Transport, currently a no-op +func (t *Transport) Close() error { + return nil +} + +func (t *Transport) lazyStart() { + if t.Dial == nil { + t.Dial = func(netw, addr string) (net.Conn, error) { + c, err := net.DialTimeout(netw, addr, t.ConnectTimeout) + if err != nil { + return nil, err + } + + if t.TCPReadBufferSize != 0 || t.TCPWriteBufferSize != 0 { + if tcpCon, ok := c.(*net.TCPConn); ok { + if t.TCPWriteBufferSize != 0 { + if err = tcpCon.SetWriteBuffer(t.TCPWriteBufferSize); err != nil { + return nil, err + } + } + if t.TCPReadBufferSize != 0 { + if err = tcpCon.SetReadBuffer(t.TCPReadBufferSize); err != nil { + return nil, err + } + } + } else { + err = errors.New("Not Tcp Connection") + return nil, err + } + } + + if t.ReadWriteTimeout > 0 { + timeoutConn := &rwTimeoutConn{ + TCPConn: c.(*net.TCPConn), + rwTimeout: t.ReadWriteTimeout, + } + return timeoutConn, nil + } + return c, nil + } + } + + t.transport = &http.Transport{ + Dial: t.Dial, + Proxy: t.Proxy, + TLSClientConfig: t.TLSClientConfig, + DisableKeepAlives: t.DisableKeepAlives, + DisableCompression: t.DisableCompression, + MaxIdleConnsPerHost: t.MaxIdleConnsPerHost, + ResponseHeaderTimeout: t.ResponseHeaderTimeout, + } +} + +func (t *Transport) CancelRequest(req *http.Request) { + t.starter.Do(t.lazyStart) + + t.transport.CancelRequest(req) +} + +func (t *Transport) CloseIdleConnections() { + t.starter.Do(t.lazyStart) + + t.transport.CloseIdleConnections() +} + +func (t *Transport) RegisterProtocol(scheme string, rt http.RoundTripper) { + t.starter.Do(t.lazyStart) + + t.transport.RegisterProtocol(scheme, rt) +} + +func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) { + t.starter.Do(t.lazyStart) + + if t.RequestTimeout > 0 { + timer := time.AfterFunc(t.RequestTimeout, func() { + t.transport.CancelRequest(req) + }) + + resp, err = t.transport.RoundTrip(req) + if err != nil { + timer.Stop() + } else { + resp.Body = &bodyCloseInterceptor{ReadCloser: resp.Body, timer: timer} + } + } else { + resp, err = t.transport.RoundTrip(req) + } + + return +} + +type bodyCloseInterceptor struct { + io.ReadCloser + timer *time.Timer +} + +func (bci *bodyCloseInterceptor) Close() error { + bci.timer.Stop() + return bci.ReadCloser.Close() +} + +// A net.Conn that sets a deadline for every Read or Write operation +type rwTimeoutConn struct { + *net.TCPConn + rwTimeout time.Duration +} + +func (c *rwTimeoutConn) Read(b []byte) (int, error) { + err := c.TCPConn.SetDeadline(time.Now().Add(c.rwTimeout)) + if err != nil { + return 0, err + } + return c.TCPConn.Read(b) +} + +func (c *rwTimeoutConn) Write(b []byte) (int, error) { + err := c.TCPConn.SetDeadline(time.Now().Add(c.rwTimeout)) + if err != nil { + return 0, err + } + return c.TCPConn.Write(b) +} diff --git a/vendor/github.com/mrexodia/wray/examples/client.go b/vendor/github.com/mrexodia/wray/examples/client.go new file mode 100644 index 00000000..50edf446 --- /dev/null +++ b/vendor/github.com/mrexodia/wray/examples/client.go @@ -0,0 +1,17 @@ +package main + +import "github.com/pythonandchips/wray" +import "fmt" + +func main() { + wray.RegisterTransports([]wray.Transport{&wray.HttpTransport{}}) + client := wray.NewFayeClient("http://localhost:5000/faye") + + fmt.Println("subscribing") + client.Subscribe("/foo", false, func(message wray.Message) { + fmt.Println("-------------------------------------------") + fmt.Println(message.Data) + }) + + client.Listen() +} diff --git a/vendor/github.com/mrexodia/wray/examples/publish.go b/vendor/github.com/mrexodia/wray/examples/publish.go new file mode 100644 index 00000000..9a08a376 --- /dev/null +++ b/vendor/github.com/mrexodia/wray/examples/publish.go @@ -0,0 +1,15 @@ +package main + +import "github.com/pythonandchips/wray" +import "fmt" + +func main() { + wray.RegisterTransports([]wray.Transport{ &gofaye.HttpTransport{} }) + client := wray.NewFayeClient("http://localhost:5000/faye") + + params := map[string]interface{}{"hello": "from golang"} + fmt.Println("sending") + client.Publish("/foo", params) +} + + diff --git a/vendor/github.com/mrexodia/wray/go_faye.go b/vendor/github.com/mrexodia/wray/go_faye.go new file mode 100644 index 00000000..ab78eed7 --- /dev/null +++ b/vendor/github.com/mrexodia/wray/go_faye.go @@ -0,0 +1,140 @@ +package wray + +import ( + "fmt" + "path/filepath" + "strings" + "time" +) + +const ( + UNCONNECTED = 1 + CONNECTING = 2 + CONNECTED = 3 + DISCONNECTED = 4 + + HANDSHAKE = "handshake" + RETRY = "retry" + NONE = "none" + + CONNECTION_TIMEOUT = 60.0 + DEFAULT_RETRY = 5.0 + MAX_REQUEST_SIZE = 2048 +) + +var ( + MANDATORY_CONNECTION_TYPES = []string{"long-polling"} + registeredTransports = []Transport{} +) + +type FayeClient struct { + state int + url string + subscriptions []Subscription + transport Transport + clientId string + schedular Schedular +} + +type Subscription struct { + channel string + callback func(Message) +} + +type SubscriptionPromise struct { + subscription Subscription +} + +func NewFayeClient(url string) *FayeClient { + schedular := ChannelSchedular{} + client := &FayeClient{url: url, state: UNCONNECTED, schedular: schedular} + return client +} + +func (self *FayeClient) handshake() { + t, err := SelectTransport(self, MANDATORY_CONNECTION_TYPES, []string{}) + if err != nil { + panic("No usable transports available") + } + self.transport = t + self.transport.setUrl(self.url) + self.state = CONNECTING + handshakeParams := map[string]interface{}{"channel": "/meta/handshake", + "version": "1.0", + "supportedConnectionTypes": []string{"long-polling"}} + response, err := self.transport.send(handshakeParams) + if err != nil { + fmt.Println("Handshake failed. Retry in 10 seconds") + self.state = UNCONNECTED + self.schedular.wait(10*time.Second, func() { + fmt.Println("retying handshake") + self.handshake() + }) + return + } + self.clientId = response.clientId + self.state = CONNECTED + self.transport, err = SelectTransport(self, response.supportedConnectionTypes, []string{}) + if err != nil { + panic("Server does not support any available transports. Supported transports: " + strings.Join(response.supportedConnectionTypes, ",")) + } +} + +func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) SubscriptionPromise { + if self.state == UNCONNECTED { + self.handshake() + } + subscriptionParams := map[string]interface{}{"channel": "/meta/subscribe", "clientId": self.clientId, "subscription": channel, "id": "1"} + subscription := Subscription{channel: channel, callback: callback} + //TODO: deal with subscription failures + self.transport.send(subscriptionParams) + self.subscriptions = append(self.subscriptions, subscription) + return SubscriptionPromise{subscription} +} + +func (self *FayeClient) handleResponse(response Response) { + for _, message := range response.messages { + for _, subscription := range self.subscriptions { + matched, _ := filepath.Match(subscription.channel, message.Channel) + if matched { + go subscription.callback(message) + } + } + } +} + +func (self *FayeClient) connect() { + connectParams := map[string]interface{}{"channel": "/meta/connect", "clientId": self.clientId, "connectionType": self.transport.connectionType()} + responseChannel := make(chan Response) + go func() { + response, _ := self.transport.send(connectParams) + responseChannel <- response + }() + self.listen(responseChannel) +} + +func (self *FayeClient) listen(responseChannel chan Response) { + response := <-responseChannel + if response.successful == true { + go self.handleResponse(response) + } else { + } +} + +func (self *FayeClient) Listen() { + for { + self.connect() + } +} + +func (self *FayeClient) Publish(channel string, data map[string]interface{}) { + if self.state != CONNECTED { + self.handshake() + } + publishParams := map[string]interface{}{"channel": channel, "data": data, "clientId": self.clientId} + self.transport.send(publishParams) +} + +func RegisterTransports(transports []Transport) { + registeredTransports = transports +} diff --git a/vendor/github.com/mrexodia/wray/http_transport.go b/vendor/github.com/mrexodia/wray/http_transport.go new file mode 100644 index 00000000..f9465832 --- /dev/null +++ b/vendor/github.com/mrexodia/wray/http_transport.go @@ -0,0 +1,55 @@ +package wray + +import ( + "bytes" + "encoding/json" + "errors" + "io/ioutil" + "net/http" + "net/url" +) + +type HttpTransport struct { + url string + SendHook func(data map[string]interface{}) +} + +func (self HttpTransport) isUsable(clientUrl string) bool { + parsedUrl, err := url.Parse(clientUrl) + if err != nil { + return false + } + if parsedUrl.Scheme == "http" || parsedUrl.Scheme == "https" { + return true + } + return false +} + +func (self HttpTransport) connectionType() string { + return "long-polling" +} + +func (self HttpTransport) send(data map[string]interface{}) (Response, error) { + if self.SendHook != nil { + self.SendHook(data) + } + dataBytes, _ := json.Marshal(data) + buffer := bytes.NewBuffer(dataBytes) + responseData, err := http.Post(self.url, "application/json", buffer) + if err != nil { + return Response{}, err + } + if responseData.StatusCode != 200 { + return Response{}, errors.New(responseData.Status) + } + readData, _ := ioutil.ReadAll(responseData.Body) + responseData.Body.Close() + var jsonData []interface{} + json.Unmarshal(readData, &jsonData) + response := newResponse(jsonData) + return response, nil +} + +func (self *HttpTransport) setUrl(url string) { + self.url = url +} diff --git a/vendor/github.com/mrexodia/wray/response.go b/vendor/github.com/mrexodia/wray/response.go new file mode 100644 index 00000000..e9815c3f --- /dev/null +++ b/vendor/github.com/mrexodia/wray/response.go @@ -0,0 +1,61 @@ +package wray + +type Response struct { + id string + channel string + successful bool + clientId string + supportedConnectionTypes []string + messages []Message + error error +} + +type Message struct { + Channel string + Id string + Data map[string]interface{} +} + +func newResponse(data []interface{}) Response { + headerData := data[0].(map[string]interface{}) + messagesData := data[1.:] + messages := parseMessages(messagesData) + var id string + if headerData["id"] != nil { + id = headerData["id"].(string) + } + supportedConnectionTypes := []string{} + if headerData["supportedConnectionTypes"] != nil { + d := headerData["supportedConnectionTypes"].([]interface{}) + for _, sct := range(d) { + supportedConnectionTypes = append(supportedConnectionTypes, sct.(string)) + } + } + var clientId string + if headerData["clientId"] != nil { + clientId = headerData["clientId"].(string) + } + return Response{id: id, + clientId: clientId, + channel: headerData["channel"].(string), + successful: headerData["successful"].(bool), + messages: messages, + supportedConnectionTypes: supportedConnectionTypes} +} + +func parseMessages(data []interface{}) []Message { + messages := []Message{} + for _, messageData := range(data) { + m := messageData.(map[string]interface{}) + var id string + if m["id"] != nil { + id = m["id"].(string) + } + message := Message{Channel: m["channel"].(string), + Id: id, + Data: m["data"].(map[string]interface{})} + messages = append(messages, message) + } + return messages +} + diff --git a/vendor/github.com/mrexodia/wray/schedular.go b/vendor/github.com/mrexodia/wray/schedular.go new file mode 100644 index 00000000..3453fd61 --- /dev/null +++ b/vendor/github.com/mrexodia/wray/schedular.go @@ -0,0 +1,22 @@ +package wray + +import "time" + +type Schedular interface { + wait(time.Duration, func()) + delay() time.Duration +} + +type ChannelSchedular struct { +} + +func (self ChannelSchedular) wait(delay time.Duration, callback func()) { + go func() { + time.Sleep(delay) + callback() + }() +} + +func (self ChannelSchedular) delay() time.Duration { + return (1 * time.Minute) +} diff --git a/vendor/github.com/mrexodia/wray/transport.go b/vendor/github.com/mrexodia/wray/transport.go new file mode 100644 index 00000000..82f4d4d3 --- /dev/null +++ b/vendor/github.com/mrexodia/wray/transport.go @@ -0,0 +1,21 @@ +package wray + +import ( + "errors" +) + +type Transport interface { + isUsable(string) bool + connectionType() string + send(map[string]interface{}) (Response, error) + setUrl(string) +} + +func SelectTransport(client *FayeClient, transportTypes []string, disabled []string) (Transport, error) { + for _, transport := range registeredTransports { + if contains(transport.connectionType(), transportTypes) && transport.isUsable(client.url) { + return transport, nil + } + } + return nil, errors.New("No usable transports available") +} diff --git a/vendor/github.com/mrexodia/wray/utils.go b/vendor/github.com/mrexodia/wray/utils.go new file mode 100644 index 00000000..c587f7c6 --- /dev/null +++ b/vendor/github.com/mrexodia/wray/utils.go @@ -0,0 +1,10 @@ +package wray + +func contains(target string, slice []string) bool { + for _, t := range(slice) { + if t == target { + return true + } + } + return false +} diff --git a/vendor/github.com/sromku/go-gitter/LICENSE b/vendor/github.com/sromku/go-gitter/LICENSE new file mode 100644 index 00000000..8dada3ed --- /dev/null +++ b/vendor/github.com/sromku/go-gitter/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/sromku/go-gitter/faye.go b/vendor/github.com/sromku/go-gitter/faye.go new file mode 100644 index 00000000..dcd3e210 --- /dev/null +++ b/vendor/github.com/sromku/go-gitter/faye.go @@ -0,0 +1,70 @@ +package gitter + +import ( + "encoding/json" + "fmt" + + "github.com/mrexodia/wray" +) + +type Faye struct { + endpoint string + Event chan Event + client *wray.FayeClient + gitter *Gitter +} + +func (gitter *Gitter) Faye(roomID string) *Faye { + wray.RegisterTransports([]wray.Transport{ + &wray.HttpTransport{ + SendHook: func(data map[string]interface{}) { + if channel, ok := data["channel"]; ok && channel == "/meta/handshake" { + data["ext"] = map[string]interface{}{"token": gitter.config.token} + } + }, + }, + }) + return &Faye{ + endpoint: "/api/v1/rooms/" + roomID + "/chatMessages", + Event: make(chan Event), + client: wray.NewFayeClient(fayeBaseURL), + gitter: gitter, + } +} + +func (faye *Faye) Listen() { + defer faye.destroy() + + faye.client.Subscribe(faye.endpoint, false, func(message wray.Message) { + dataBytes, err := json.Marshal(message.Data["model"]) + if err != nil { + fmt.Printf("JSON Marshal error: %v\n", err) + return + } + var gitterMessage Message + err = json.Unmarshal(dataBytes, &gitterMessage) + if err != nil { + fmt.Printf("JSON Unmarshal error: %v\n", err) + return + } + faye.Event <- Event{ + Data: &MessageReceived{ + Message: gitterMessage, + }, + } + }) + + //TODO: this might be needed in the future + /*go func() { + for { + faye.client.Publish("/api/v1/ping2", map[string]interface{}{"reason": "ping"}) + time.Sleep(60 * time.Second) + } + }()*/ + + faye.client.Listen() +} + +func (faye *Faye) destroy() { + close(faye.Event) +} diff --git a/vendor/github.com/sromku/go-gitter/gitter.go b/vendor/github.com/sromku/go-gitter/gitter.go new file mode 100644 index 00000000..375e6e35 --- /dev/null +++ b/vendor/github.com/sromku/go-gitter/gitter.go @@ -0,0 +1,367 @@ +// Package gitter is a Go client library for the Gitter API. +// +// Author: sromku +package gitter + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" + "strconv" + "time" + + "github.com/mreiferson/go-httpclient" +) + +var ( + apiBaseURL = "https://api.gitter.im/v1/" + streamBaseURL = "https://stream.gitter.im/v1/" + fayeBaseURL = "https://ws.gitter.im/faye" +) + +type Gitter struct { + config struct { + apiBaseURL string + streamBaseURL string + token string + client *http.Client + } + debug bool + logWriter io.Writer +} + +// New initializes the Gitter API client +// +// For example: +// api := gitter.New("YOUR_ACCESS_TOKEN") +func New(token string) *Gitter { + + transport := &httpclient.Transport{ + ConnectTimeout: 5 * time.Second, + ReadWriteTimeout: 40 * time.Second, + } + defer transport.Close() + + s := &Gitter{} + s.config.apiBaseURL = apiBaseURL + s.config.streamBaseURL = streamBaseURL + s.config.token = token + s.config.client = &http.Client{ + Transport: transport, + } + return s +} + +// SetClient sets a custom http client. Can be useful in App Engine case. +func (gitter *Gitter) SetClient(client *http.Client) { + gitter.config.client = client +} + +// GetUser returns the current user +func (gitter *Gitter) GetUser() (*User, error) { + + var users []User + response, err := gitter.get(gitter.config.apiBaseURL + "user") + if err != nil { + gitter.log(err) + return nil, err + } + + err = json.Unmarshal(response, &users) + if err != nil { + gitter.log(err) + return nil, err + } + + if len(users) > 0 { + return &users[0], nil + } + + err = APIError{What: "Failed to retrieve current user"} + gitter.log(err) + return nil, err +} + +// GetUserRooms returns a list of Rooms the user is part of +func (gitter *Gitter) GetUserRooms(userID string) ([]Room, error) { + + var rooms []Room + response, err := gitter.get(gitter.config.apiBaseURL + "user/" + userID + "/rooms") + if err != nil { + gitter.log(err) + return nil, err + } + + err = json.Unmarshal(response, &rooms) + if err != nil { + gitter.log(err) + return nil, err + } + + return rooms, nil +} + +// GetRooms returns a list of rooms the current user is in +func (gitter *Gitter) GetRooms() ([]Room, error) { + + var rooms []Room + response, err := gitter.get(gitter.config.apiBaseURL + "rooms") + if err != nil { + gitter.log(err) + return nil, err + } + + err = json.Unmarshal(response, &rooms) + if err != nil { + gitter.log(err) + return nil, err + } + + return rooms, nil +} + +// GetRoom returns a room with the passed id +func (gitter *Gitter) GetRoom(roomID string) (*Room, error) { + + var room Room + response, err := gitter.get(gitter.config.apiBaseURL + "rooms/" + roomID) + if err != nil { + gitter.log(err) + return nil, err + } + + err = json.Unmarshal(response, &room) + if err != nil { + gitter.log(err) + return nil, err + } + + return &room, nil +} + +// GetMessages returns a list of messages in a room. +// Pagination is optional. You can pass nil or specific pagination params. +func (gitter *Gitter) GetMessages(roomID string, params *Pagination) ([]Message, error) { + + var messages []Message + url := gitter.config.apiBaseURL + "rooms/" + roomID + "/chatMessages" + if params != nil { + url += "?" + params.encode() + } + response, err := gitter.get(url) + if err != nil { + gitter.log(err) + return nil, err + } + + err = json.Unmarshal(response, &messages) + if err != nil { + gitter.log(err) + return nil, err + } + + return messages, nil +} + +// GetMessage returns a message in a room. +func (gitter *Gitter) GetMessage(roomID, messageID string) (*Message, error) { + + var message Message + response, err := gitter.get(gitter.config.apiBaseURL + "rooms/" + roomID + "/chatMessages/" + messageID) + if err != nil { + gitter.log(err) + return nil, err + } + + err = json.Unmarshal(response, &message) + if err != nil { + gitter.log(err) + return nil, err + } + + return &message, nil +} + +// SendMessage sends a message to a room +func (gitter *Gitter) SendMessage(roomID, text string) error { + + message := Message{Text: text} + body, _ := json.Marshal(message) + err := gitter.post(gitter.config.apiBaseURL+"rooms/"+roomID+"/chatMessages", body) + if err != nil { + gitter.log(err) + return err + } + + return nil +} + +// JoinRoom joins a room +func (gitter *Gitter) JoinRoom(uri string) (*Room, error) { + + message := Room{URI: uri} + body, _ := json.Marshal(message) + err := gitter.post(apiBaseURL+"rooms", body) + if err != nil { + gitter.log(err) + return nil, err + } + + rooms, err := gitter.GetRooms() + if err != nil { + gitter.log(err) + return nil, err + } + + for _, room := range rooms { + if room.URI == uri { + return &room, nil + } + } + + err = APIError{What: fmt.Sprintf("Joined room (%v) not found in list of rooms", uri)} + gitter.log(err) + return nil, err +} + +// SetDebug traces errors if it's set to true. +func (gitter *Gitter) SetDebug(debug bool, logWriter io.Writer) { + gitter.debug = debug + gitter.logWriter = logWriter +} + +// Pagination params +type Pagination struct { + + // Skip n messages + Skip int + + // Get messages before beforeId + BeforeID string + + // Get messages after afterId + AfterID string + + // Maximum number of messages to return + Limit int + + // Search query + Query string +} + +func (messageParams *Pagination) encode() string { + values := url.Values{} + + if messageParams.AfterID != "" { + values.Add("afterId", messageParams.AfterID) + } + + if messageParams.BeforeID != "" { + values.Add("beforeId", messageParams.BeforeID) + } + + if messageParams.Skip > 0 { + values.Add("skip", strconv.Itoa(messageParams.Skip)) + } + + if messageParams.Limit > 0 { + values.Add("limit", strconv.Itoa(messageParams.Limit)) + } + + return values.Encode() +} + +func (gitter *Gitter) getResponse(url string, stream *Stream) (*http.Response, error) { + r, err := http.NewRequest("GET", url, nil) + if err != nil { + gitter.log(err) + return nil, err + } + r.Header.Set("Content-Type", "application/json") + r.Header.Set("Accept", "application/json") + r.Header.Set("Authorization", "Bearer "+gitter.config.token) + if stream != nil { + stream.streamConnection.request = r + } + response, err := gitter.config.client.Do(r) + if err != nil { + gitter.log(err) + return nil, err + } + return response, nil +} + +func (gitter *Gitter) get(url string) ([]byte, error) { + resp, err := gitter.getResponse(url, nil) + if err != nil { + gitter.log(err) + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + err = APIError{What: fmt.Sprintf("Status code: %v", resp.StatusCode)} + gitter.log(err) + return nil, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + gitter.log(err) + return nil, err + } + + return body, nil +} + +func (gitter *Gitter) post(url string, body []byte) error { + r, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) + if err != nil { + gitter.log(err) + return err + } + + r.Header.Set("Content-Type", "application/json") + r.Header.Set("Accept", "application/json") + r.Header.Set("Authorization", "Bearer "+gitter.config.token) + + resp, err := gitter.config.client.Do(r) + if err != nil { + gitter.log(err) + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + err = APIError{What: fmt.Sprintf("Status code: %v", resp.StatusCode)} + gitter.log(err) + return err + } + + return nil +} + +func (gitter *Gitter) log(a interface{}) { + if gitter.debug { + log.Println(a) + if gitter.logWriter != nil { + timestamp := time.Now().Format(time.RFC3339) + msg := fmt.Sprintf("%v: %v", timestamp, a) + fmt.Fprintln(gitter.logWriter, msg) + } + } +} + +// APIError holds data of errors returned from the API. +type APIError struct { + What string +} + +func (e APIError) Error() string { + return fmt.Sprintf("%v", e.What) +} diff --git a/vendor/github.com/sromku/go-gitter/model.go b/vendor/github.com/sromku/go-gitter/model.go new file mode 100644 index 00000000..7a3d0729 --- /dev/null +++ b/vendor/github.com/sromku/go-gitter/model.go @@ -0,0 +1,142 @@ +package gitter + +import "time" + +// A Room in Gitter can represent a GitHub Organization, a GitHub Repository, a Gitter Channel or a One-to-one conversation. +// In the case of the Organizations and Repositories, the access control policies are inherited from GitHub. +type Room struct { + + // Room ID + ID string `json:"id"` + + // Room name + Name string `json:"name"` + + // Room topic. (default: GitHub repo description) + Topic string `json:"topic"` + + // Room URI on Gitter + URI string `json:"uri"` + + // Indicates if the room is a one-to-one chat + OneToOne bool `json:"oneToOne"` + + // Count of users in the room + UserCount int `json:"userCount"` + + // Number of unread messages for the current user + UnreadItems int `json:"unreadItems"` + + // Number of unread mentions for the current user + Mentions int `json:"mentions"` + + // Last time the current user accessed the room in ISO format + LastAccessTime time.Time `json:"lastAccessTime"` + + // Indicates if the current user has disabled notifications + Lurk bool `json:"lurk"` + + // Path to the room on gitter + URL string `json:"url"` + + // Type of the room + // - ORG: A room that represents a GitHub Organization. + // - REPO: A room that represents a GitHub Repository. + // - ONETOONE: A one-to-one chat. + // - ORG_CHANNEL: A Gitter channel nested under a GitHub Organization. + // - REPO_CHANNEL A Gitter channel nested under a GitHub Repository. + // - USER_CHANNEL A Gitter channel nested under a GitHub User. + GithubType string `json:"githubType"` + + // Tags that define the room + Tags []string `json:"tags"` + + RoomMember bool `json:"roomMember"` + + // Room version. + Version int `json:"v"` +} + +type User struct { + + // Gitter User ID + ID string `json:"id"` + + // Gitter/GitHub username + Username string `json:"username"` + + // Gitter/GitHub user real name + DisplayName string `json:"displayName"` + + // Path to the user on Gitter + URL string `json:"url"` + + // User avatar URI (small) + AvatarURLSmall string `json:"avatarUrlSmall"` + + // User avatar URI (medium) + AvatarURLMedium string `json:"avatarUrlMedium"` +} + +type Message struct { + + // ID of the message + ID string `json:"id"` + + // Original message in plain-text/markdown + Text string `json:"text"` + + // HTML formatted message + HTML string `json:"html"` + + // ISO formatted date of the message + Sent time.Time `json:"sent"` + + // ISO formatted date of the message if edited + EditedAt time.Time `json:"editedAt"` + + // User that sent the message + From User `json:"fromUser"` + + // Boolean that indicates if the current user has read the message. + Unread bool `json:"unread"` + + // Number of users that have read the message + ReadBy int `json:"readBy"` + + // List of URLs present in the message + Urls []URL `json:"urls"` + + // List of @Mentions in the message + Mentions []Mention `json:"mentions"` + + // List of #Issues referenced in the message + Issues []Issue `json:"issues"` + + // Version + Version int `json:"v"` +} + +// Mention holds data about mentioned user in the message +type Mention struct { + + // User's username + ScreenName string `json:"screenName"` + + // Gitter User ID + UserID string `json:"userID"` +} + +// Issue references issue in the message +type Issue struct { + + // Issue number + Number string `json:"number"` +} + +// URL presented in the message +type URL struct { + + // URL + URL string `json:"url"` +} diff --git a/vendor/github.com/sromku/go-gitter/stream.go b/vendor/github.com/sromku/go-gitter/stream.go new file mode 100644 index 00000000..5f1cd78f --- /dev/null +++ b/vendor/github.com/sromku/go-gitter/stream.go @@ -0,0 +1,220 @@ +package gitter + +import ( + "bufio" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/mreiferson/go-httpclient" +) + +var defaultConnectionWaitTime time.Duration = 3000 // millis +var defaultConnectionMaxRetries = 5 + +// Stream initialize stream +func (gitter *Gitter) Stream(roomID string) *Stream { + return &Stream{ + url: streamBaseURL + "rooms/" + roomID + "/chatMessages", + Event: make(chan Event), + gitter: gitter, + streamConnection: gitter.newStreamConnection( + defaultConnectionWaitTime, + defaultConnectionMaxRetries), + } +} + +// Implemented to conform with https://developer.gitter.im/docs/streaming-api +func (gitter *Gitter) Listen(stream *Stream) { + + defer stream.destroy() + + var reader *bufio.Reader + var gitterMessage Message + lastKeepalive := time.Now().Unix() + + // connect + stream.connect() + +Loop: + for { + + // if closed then stop trying + if stream.isClosed() { + stream.Event <- Event{ + Data: &GitterConnectionClosed{}, + } + break Loop + } + + resp := stream.getResponse() + if resp.StatusCode != 200 { + gitter.log(fmt.Sprintf("Unexpected response code %v", resp.StatusCode)) + continue + } + + //"The JSON stream returns messages as JSON objects that are delimited by carriage return (\r)" <- Not true crap it's (\n) only + reader = bufio.NewReader(resp.Body) + line, err := reader.ReadBytes('\n') + + //Check if the line only consists of whitespace + onlyWhitespace := true + for _, b := range line { + if b != ' ' && b != '\t' && b != '\r' && b != '\n' { + onlyWhitespace = false + } + } + + if onlyWhitespace { + //"Parsers must be tolerant of occasional extra newline characters placed between messages." + currentKeepalive := time.Now().Unix() //interesting behavior of 100+ keepalives per seconds was observed + if currentKeepalive-lastKeepalive > 10 { + lastKeepalive = currentKeepalive + gitter.log("Keepalive was received") + } + continue + } else if stream.isClosed() { + gitter.log("Stream closed") + continue + } else if err != nil { + gitter.log("ReadBytes error: " + err.Error()) + stream.connect() + continue + } + + // unmarshal the streamed data + err = json.Unmarshal(line, &gitterMessage) + if err != nil { + gitter.log("JSON Unmarshal error: " + err.Error()) + continue + } + + // we are here, then we got the good message. pipe it forward. + stream.Event <- Event{ + Data: &MessageReceived{ + Message: gitterMessage, + }, + } + } + + gitter.log("Listening was completed") +} + +// Stream holds stream data. +type Stream struct { + url string + Event chan Event + streamConnection *streamConnection + gitter *Gitter +} + +func (stream *Stream) destroy() { + close(stream.Event) +} + +type Event struct { + Data interface{} +} + +type GitterConnectionClosed struct { +} + +type MessageReceived struct { + Message Message +} + +// connect and try to reconnect with +func (stream *Stream) connect() { + + if stream.streamConnection.retries == stream.streamConnection.currentRetries { + stream.Close() + stream.gitter.log("Number of retries exceeded the max retries number, we are done here") + return + } + + res, err := stream.gitter.getResponse(stream.url, stream) + if stream.streamConnection.canceled { + // do nothing + } else if err != nil || res.StatusCode != 200 { + stream.gitter.log("Failed to get response, trying reconnect ") + stream.gitter.log(err) + + // sleep and wait + stream.streamConnection.currentRetries++ + time.Sleep(time.Millisecond * stream.streamConnection.wait * time.Duration(stream.streamConnection.currentRetries)) + + // connect again + stream.Close() + stream.connect() + } else { + stream.gitter.log("Response was received") + stream.streamConnection.currentRetries = 0 + stream.streamConnection.closed = false + stream.streamConnection.response = res + } +} + +type streamConnection struct { + + // connection was closed + closed bool + + // canceled + canceled bool + + // wait time till next try + wait time.Duration + + // max tries to recover + retries int + + // current streamed response + response *http.Response + + // current request + request *http.Request + + // current status + currentRetries int +} + +// Close the stream connection and stop receiving streamed data +func (stream *Stream) Close() { + conn := stream.streamConnection + conn.closed = true + if conn.response != nil { + stream.gitter.log("Stream connection close response") + defer conn.response.Body.Close() + } + if conn.request != nil { + stream.gitter.log("Stream connection close request") + switch transport := stream.gitter.config.client.Transport.(type) { + case *httpclient.Transport: + stream.streamConnection.canceled = true + transport.CancelRequest(conn.request) + default: + } + + } + conn.currentRetries = 0 +} + +func (stream *Stream) isClosed() bool { + return stream.streamConnection.closed +} + +func (stream *Stream) getResponse() *http.Response { + return stream.streamConnection.response +} + +// Optional, set stream connection properties +// wait - time in milliseconds of waiting between reconnections. Will grow exponentially. +// retries - number of reconnections retries before dropping the stream. +func (gitter *Gitter) newStreamConnection(wait time.Duration, retries int) *streamConnection { + return &streamConnection{ + closed: true, + wait: wait, + retries: retries, + } +} diff --git a/vendor/github.com/sromku/go-gitter/test_utils.go b/vendor/github.com/sromku/go-gitter/test_utils.go new file mode 100644 index 00000000..6703da2e --- /dev/null +++ b/vendor/github.com/sromku/go-gitter/test_utils.go @@ -0,0 +1,30 @@ +package gitter + +import ( + "net/http" + "net/http/httptest" + "net/url" +) + +var ( + mux *http.ServeMux + gitter *Gitter + server *httptest.Server +) + +func setup() { + mux = http.NewServeMux() + server = httptest.NewServer(mux) + + gitter = New("abc") + + // Fake the API and Stream base URLs by using the test + // server URL instead. + url, _ := url.Parse(server.URL) + gitter.config.apiBaseURL = url.String() + "/" + gitter.config.streamBaseURL = url.String() + "/" +} + +func teardown() { + server.Close() +} diff --git a/vendor/manifest b/vendor/manifest index 1fe265ac..dc2f9253 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -85,6 +85,22 @@ "branch": "master", "notests": true }, + { + "importpath": "github.com/mreiferson/go-httpclient", + "repository": "https://github.com/mreiferson/go-httpclient", + "vcs": "git", + "revision": "31f0106b4474f14bc441575c19d3a5fa21aa1f6c", + "branch": "master", + "notests": true + }, + { + "importpath": "github.com/mrexodia/wray", + "repository": "https://github.com/mrexodia/wray", + "vcs": "git", + "revision": "78a2c1f284ffe6ada7e2dfbd97c644e0d0f23fea", + "branch": "master", + "notests": true + }, { "importpath": "github.com/nicksnyder/go-i18n/i18n", "repository": "https://github.com/nicksnyder/go-i18n", @@ -118,6 +134,14 @@ "branch": "master", "notests": true }, + { + "importpath": "github.com/sromku/go-gitter", + "repository": "https://github.com/sromku/go-gitter", + "vcs": "git", + "revision": "932bf9af423ac2da1544cb73540b3b08b1bdb181", + "branch": "master", + "notests": true + }, { "importpath": "github.com/thoj/go-ircevent", "repository": "https://github.com/thoj/go-ircevent",