diff --git a/broker/broker.go b/broker/broker.go index 13d2575..3edfe84 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -97,14 +97,16 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Proxies may poll for client offers concurrently. type ProxyPoll struct { id string + ptype string offerChannel chan []byte } // Registers a Snowflake and waits for some Client to send an offer, // as part of the polling logic of the proxy handler. -func (ctx *BrokerContext) RequestOffer(id string) []byte { +func (ctx *BrokerContext) RequestOffer(id string, ptype string) []byte { request := new(ProxyPoll) request.id = id + request.ptype = ptype request.offerChannel = make(chan []byte) ctx.proxyPolls <- request // Block until an offer is available, or timeout which sends a nil offer. @@ -117,7 +119,7 @@ func (ctx *BrokerContext) RequestOffer(id string) []byte { // client offer or nil on timeout / none are available. func (ctx *BrokerContext) Broker() { for request := range ctx.proxyPolls { - snowflake := ctx.AddSnowflake(request.id) + snowflake := ctx.AddSnowflake(request.id, request.ptype) // Wait for a client to avail an offer to the snowflake. go func(request *ProxyPoll) { select { @@ -137,10 +139,11 @@ func (ctx *BrokerContext) Broker() { // Create and add a Snowflake to the heap. // Required to keep track of proxies between providing them // with an offer and awaiting their second POST with an answer. -func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake { +func (ctx *BrokerContext) AddSnowflake(id string, ptype string) *Snowflake { snowflake := new(Snowflake) snowflake.id = id snowflake.clients = 0 + snowflake.ptype = ptype snowflake.offerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte) heap.Push(ctx.snowflakes, snowflake) @@ -159,7 +162,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return } - sid, err := messages.DecodePollRequest(body) + sid, ptype, err := messages.DecodePollRequest(body) if err != nil { w.WriteHeader(http.StatusBadRequest) return @@ -174,7 +177,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { } // Wait for a client to avail an offer to the snowflake, or timeout if nil. - offer := ctx.RequestOffer(sid) + offer := ctx.RequestOffer(sid, ptype) var b []byte if nil == offer { ctx.metrics.proxyIdleCount++ @@ -286,16 +289,23 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len()) - var browsers, standalones int + var webexts, browsers, standalones, unknowns int for _, snowflake := range ctx.idToSnowflake { - if len(snowflake.id) < 16 { + if snowflake.ptype == "badge" { browsers++ - } else { + } else if snowflake.ptype == "webext" { + webexts++ + } else if snowflake.ptype == "standalone" { standalones++ + } else { + unknowns++ } + } s += fmt.Sprintf("\tstandalone proxies: %d", standalones) s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers) + s += fmt.Sprintf("\n\twebext proxies: %d", webexts) + s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns) if _, err := w.Write([]byte(s)); err != nil { log.Printf("writing proxy information returned error: %v ", err) } diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index c35c1d6..cb5f34f 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -29,7 +29,7 @@ func TestBroker(t *testing.T) { Convey("Adds Snowflake", func() { So(ctx.snowflakes.Len(), ShouldEqual, 0) So(len(ctx.idToSnowflake), ShouldEqual, 0) - ctx.AddSnowflake("foo") + ctx.AddSnowflake("foo", "") So(ctx.snowflakes.Len(), ShouldEqual, 1) So(len(ctx.idToSnowflake), ShouldEqual, 1) }) @@ -55,7 +55,7 @@ func TestBroker(t *testing.T) { Convey("Request an offer from the Snowflake Heap", func() { done := make(chan []byte) go func() { - offer := ctx.RequestOffer("test") + offer := ctx.RequestOffer("test", "") done <- offer }() request := <-ctx.proxyPolls @@ -79,7 +79,7 @@ func TestBroker(t *testing.T) { Convey("with a proxy answer if available.", func() { done := make(chan bool) // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake") + snowflake := ctx.AddSnowflake("fake", "") go func() { clientOffers(ctx, w, r) done <- true @@ -97,7 +97,7 @@ func TestBroker(t *testing.T) { return } done := make(chan bool) - snowflake := ctx.AddSnowflake("fake") + snowflake := ctx.AddSnowflake("fake", "") go func() { clientOffers(ctx, w, r) // Takes a few seconds here... @@ -147,7 +147,7 @@ func TestBroker(t *testing.T) { }) Convey("Responds to proxy answers...", func() { - s := ctx.AddSnowflake("test") + s := ctx.AddSnowflake("test", "") w := httptest.NewRecorder() data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`)) @@ -211,7 +211,7 @@ func TestBroker(t *testing.T) { // Manually do the Broker goroutine action here for full control. p := <-ctx.proxyPolls So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") - s := ctx.AddSnowflake(p.id) + s := ctx.AddSnowflake(p.id, "") go func() { offer := <-s.offerChannel p.offerChannel <- offer @@ -449,7 +449,7 @@ func TestMetrics(t *testing.T) { So(err, ShouldBeNil) // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake") + snowflake := ctx.AddSnowflake("fake", "") go func() { clientOffers(ctx, w, r) done <- true diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go index 419956f..cf209ec 100644 --- a/broker/snowflake-heap.go +++ b/broker/snowflake-heap.go @@ -10,6 +10,7 @@ over the offer and answer channels. */ type Snowflake struct { id string + ptype string offerChannel chan []byte answerChannel chan []byte clients int diff --git a/common/messages/proxy.go b/common/messages/proxy.go index 042caf9..7ebab1d 100644 --- a/common/messages/proxy.go +++ b/common/messages/proxy.go @@ -6,16 +6,18 @@ package messages import ( "encoding/json" "fmt" + "strings" ) -const version = "1.0" +const version = "1.1" -/* Version 1.0 specification: +/* Version 1.1 specification: == ProxyPollRequest == { - Sid: [generated session id of proxy] - Version: 1.0 + Sid: [generated session id of proxy], + Version: 1.1, + Type: [badge|webext|standalone] } == ProxyPollResponse == @@ -41,11 +43,11 @@ HTTP 400 BadRequest == ProxyAnswerRequest == { - Sid: [generated session id of proxy] - Version: 1.0 + Sid: [generated session id of proxy], + Version: 1.1, Answer: { - type: answer + type: answer, sdp: [WebRTC SDP] } } @@ -73,34 +75,38 @@ HTTP 400 BadRequest type ProxyPollRequest struct { Sid string Version string + Type string } -func EncodePollRequest(sid string) ([]byte, error) { +func EncodePollRequest(sid string, ptype string) ([]byte, error) { return json.Marshal(ProxyPollRequest{ Sid: sid, Version: version, + Type: ptype, }) } // Decodes a poll message from a snowflake proxy and returns the // sid of the proxy on success and an error if it failed -func DecodePollRequest(data []byte) (string, error) { +func DecodePollRequest(data []byte) (string, string, error) { var message ProxyPollRequest err := json.Unmarshal(data, &message) if err != nil { - return "", err - } - if message.Version != "1.0" { - return "", fmt.Errorf("using unknown version") + return "", "", err } - // Version 1.0 requires an Sid + majorVersion := strings.Split(message.Version, ".")[0] + if majorVersion != "1" { + return "", "", fmt.Errorf("using unknown version") + } + + // Version 1.x requires an Sid if message.Sid == "" { - return "", fmt.Errorf("no supplied session id") + return "", "", fmt.Errorf("no supplied session id") } - return message.Sid, nil + return message.Sid, message.Type, nil } type ProxyPollResponse struct { @@ -153,7 +159,7 @@ type ProxyAnswerRequest struct { func EncodeAnswerRequest(answer string, sid string) ([]byte, error) { return json.Marshal(ProxyAnswerRequest{ - Version: "1.0", + Version: "1.1", Sid: sid, Answer: answer, }) @@ -167,7 +173,9 @@ func DecodeAnswerRequest(data []byte) (string, string, error) { if err != nil { return "", "", err } - if message.Version != "1.0" { + + majorVersion := strings.Split(message.Version, ".")[0] + if majorVersion != "1" { return "", "", fmt.Errorf("using unknown version") } diff --git a/common/messages/proxy_test.go b/common/messages/proxy_test.go index f2f006e..83553a0 100644 --- a/common/messages/proxy_test.go +++ b/common/messages/proxy_test.go @@ -11,45 +11,60 @@ import ( func TestDecodeProxyPollRequest(t *testing.T) { Convey("Context", t, func() { for _, test := range []struct { - sid string - data string - err error + sid string + ptype string + data string + err error }{ { //Version 1.0 proxy message "ymbcCMto7KHNGYlp", + "", `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`, nil, }, + { + //Version 1.1 proxy message + "ymbcCMto7KHNGYlp", + "standalone", + `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.1","Type":"standalone"}`, + nil, + }, { //Version 0.X proxy message: "", + "", "ymbcCMto7KHNGYlp", &json.SyntaxError{}, }, { + "", "", `{"Sid":"ymbcCMto7KHNGYlp"}`, fmt.Errorf(""), }, { + "", "", "{}", fmt.Errorf(""), }, { + "", "", `{"Version":"1.0"}`, fmt.Errorf(""), }, { + "", "", `{"Version":"2.0"}`, fmt.Errorf(""), }, } { - sid, err := DecodePollRequest([]byte(test.data)) + sid, ptype, err := DecodePollRequest([]byte(test.data)) So(sid, ShouldResemble, test.sid) + So(ptype, ShouldResemble, test.ptype) So(err, ShouldHaveSameTypeAs, test.err) } @@ -58,10 +73,11 @@ func TestDecodeProxyPollRequest(t *testing.T) { func TestEncodeProxyPollRequests(t *testing.T) { Convey("Context", t, func() { - b, err := EncodePollRequest("ymbcCMto7KHNGYlp") + b, err := EncodePollRequest("ymbcCMto7KHNGYlp", "standalone") So(err, ShouldEqual, nil) - sid, err := DecodePollRequest(b) + sid, ptype, err := DecodePollRequest(b) So(sid, ShouldEqual, "ymbcCMto7KHNGYlp") + So(ptype, ShouldEqual, "standalone") So(err, ShouldEqual, nil) }) } diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go index c10093a..dce7b70 100644 --- a/proxy-go/snowflake.go +++ b/proxy-go/snowflake.go @@ -175,7 +175,7 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription { timeOfNextPoll = now } - body, err := messages.EncodePollRequest(sid) + body, err := messages.EncodePollRequest(sid, "standalone") if err != nil { log.Printf("Error encoding poll message: %s", err.Error()) return nil