Update broker--proxy protocol with proxy type

Proxies now include information about what type they are when they poll
for client offers. The broker saves this information along with
snowflake ids and outputs it on the /debug page.
This commit is contained in:
Cecylia Bocovich 2019-11-20 12:41:53 -05:00
parent 7092b2cb2c
commit 7277bb37cd
6 changed files with 75 additions and 40 deletions

View file

@ -97,14 +97,16 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Proxies may poll for client offers concurrently. // Proxies may poll for client offers concurrently.
type ProxyPoll struct { type ProxyPoll struct {
id string id string
ptype string
offerChannel chan []byte offerChannel chan []byte
} }
// Registers a Snowflake and waits for some Client to send an offer, // Registers a Snowflake and waits for some Client to send an offer,
// as part of the polling logic of the proxy handler. // as part of the polling logic of the proxy handler.
func (ctx *BrokerContext) RequestOffer(id string) []byte { func (ctx *BrokerContext) RequestOffer(id string, ptype string) []byte {
request := new(ProxyPoll) request := new(ProxyPoll)
request.id = id request.id = id
request.ptype = ptype
request.offerChannel = make(chan []byte) request.offerChannel = make(chan []byte)
ctx.proxyPolls <- request ctx.proxyPolls <- request
// Block until an offer is available, or timeout which sends a nil offer. // Block until an offer is available, or timeout which sends a nil offer.
@ -117,7 +119,7 @@ func (ctx *BrokerContext) RequestOffer(id string) []byte {
// client offer or nil on timeout / none are available. // client offer or nil on timeout / none are available.
func (ctx *BrokerContext) Broker() { func (ctx *BrokerContext) Broker() {
for request := range ctx.proxyPolls { for request := range ctx.proxyPolls {
snowflake := ctx.AddSnowflake(request.id) snowflake := ctx.AddSnowflake(request.id, request.ptype)
// Wait for a client to avail an offer to the snowflake. // Wait for a client to avail an offer to the snowflake.
go func(request *ProxyPoll) { go func(request *ProxyPoll) {
select { select {
@ -137,10 +139,11 @@ func (ctx *BrokerContext) Broker() {
// Create and add a Snowflake to the heap. // Create and add a Snowflake to the heap.
// Required to keep track of proxies between providing them // Required to keep track of proxies between providing them
// with an offer and awaiting their second POST with an answer. // with an offer and awaiting their second POST with an answer.
func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake { func (ctx *BrokerContext) AddSnowflake(id string, ptype string) *Snowflake {
snowflake := new(Snowflake) snowflake := new(Snowflake)
snowflake.id = id snowflake.id = id
snowflake.clients = 0 snowflake.clients = 0
snowflake.ptype = ptype
snowflake.offerChannel = make(chan []byte) snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte)
heap.Push(ctx.snowflakes, snowflake) heap.Push(ctx.snowflakes, snowflake)
@ -159,7 +162,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return return
} }
sid, err := messages.DecodePollRequest(body) sid, ptype, err := messages.DecodePollRequest(body)
if err != nil { if err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
@ -174,7 +177,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
} }
// Wait for a client to avail an offer to the snowflake, or timeout if nil. // Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer := ctx.RequestOffer(sid) offer := ctx.RequestOffer(sid, ptype)
var b []byte var b []byte
if nil == offer { if nil == offer {
ctx.metrics.proxyIdleCount++ ctx.metrics.proxyIdleCount++
@ -286,16 +289,23 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len()) s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
var browsers, standalones int var webexts, browsers, standalones, unknowns int
for _, snowflake := range ctx.idToSnowflake { for _, snowflake := range ctx.idToSnowflake {
if len(snowflake.id) < 16 { if snowflake.ptype == "badge" {
browsers++ browsers++
} else { } else if snowflake.ptype == "webext" {
webexts++
} else if snowflake.ptype == "standalone" {
standalones++ standalones++
} else {
unknowns++
} }
} }
s += fmt.Sprintf("\tstandalone proxies: %d", standalones) s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers) s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
s += fmt.Sprintf("\n\twebext proxies: %d", webexts)
s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns)
if _, err := w.Write([]byte(s)); err != nil { if _, err := w.Write([]byte(s)); err != nil {
log.Printf("writing proxy information returned error: %v ", err) log.Printf("writing proxy information returned error: %v ", err)
} }

View file

@ -29,7 +29,7 @@ func TestBroker(t *testing.T) {
Convey("Adds Snowflake", func() { Convey("Adds Snowflake", func() {
So(ctx.snowflakes.Len(), ShouldEqual, 0) So(ctx.snowflakes.Len(), ShouldEqual, 0)
So(len(ctx.idToSnowflake), ShouldEqual, 0) So(len(ctx.idToSnowflake), ShouldEqual, 0)
ctx.AddSnowflake("foo") ctx.AddSnowflake("foo", "")
So(ctx.snowflakes.Len(), ShouldEqual, 1) So(ctx.snowflakes.Len(), ShouldEqual, 1)
So(len(ctx.idToSnowflake), ShouldEqual, 1) So(len(ctx.idToSnowflake), ShouldEqual, 1)
}) })
@ -55,7 +55,7 @@ func TestBroker(t *testing.T) {
Convey("Request an offer from the Snowflake Heap", func() { Convey("Request an offer from the Snowflake Heap", func() {
done := make(chan []byte) done := make(chan []byte)
go func() { go func() {
offer := ctx.RequestOffer("test") offer := ctx.RequestOffer("test", "")
done <- offer done <- offer
}() }()
request := <-ctx.proxyPolls request := <-ctx.proxyPolls
@ -79,7 +79,7 @@ func TestBroker(t *testing.T) {
Convey("with a proxy answer if available.", func() { Convey("with a proxy answer if available.", func() {
done := make(chan bool) done := make(chan bool)
// Prepare a fake proxy to respond with. // Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake") snowflake := ctx.AddSnowflake("fake", "")
go func() { go func() {
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
done <- true done <- true
@ -97,7 +97,7 @@ func TestBroker(t *testing.T) {
return return
} }
done := make(chan bool) done := make(chan bool)
snowflake := ctx.AddSnowflake("fake") snowflake := ctx.AddSnowflake("fake", "")
go func() { go func() {
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
// Takes a few seconds here... // Takes a few seconds here...
@ -147,7 +147,7 @@ func TestBroker(t *testing.T) {
}) })
Convey("Responds to proxy answers...", func() { Convey("Responds to proxy answers...", func() {
s := ctx.AddSnowflake("test") s := ctx.AddSnowflake("test", "")
w := httptest.NewRecorder() w := httptest.NewRecorder()
data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`)) data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
@ -211,7 +211,7 @@ func TestBroker(t *testing.T) {
// Manually do the Broker goroutine action here for full control. // Manually do the Broker goroutine action here for full control.
p := <-ctx.proxyPolls p := <-ctx.proxyPolls
So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp")
s := ctx.AddSnowflake(p.id) s := ctx.AddSnowflake(p.id, "")
go func() { go func() {
offer := <-s.offerChannel offer := <-s.offerChannel
p.offerChannel <- offer p.offerChannel <- offer
@ -449,7 +449,7 @@ func TestMetrics(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
// Prepare a fake proxy to respond with. // Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake") snowflake := ctx.AddSnowflake("fake", "")
go func() { go func() {
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
done <- true done <- true

View file

@ -10,6 +10,7 @@ over the offer and answer channels.
*/ */
type Snowflake struct { type Snowflake struct {
id string id string
ptype string
offerChannel chan []byte offerChannel chan []byte
answerChannel chan []byte answerChannel chan []byte
clients int clients int

View file

@ -6,16 +6,18 @@ package messages
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
) )
const version = "1.0" const version = "1.1"
/* Version 1.0 specification: /* Version 1.1 specification:
== ProxyPollRequest == == ProxyPollRequest ==
{ {
Sid: [generated session id of proxy] Sid: [generated session id of proxy],
Version: 1.0 Version: 1.1,
Type: [badge|webext|standalone]
} }
== ProxyPollResponse == == ProxyPollResponse ==
@ -41,11 +43,11 @@ HTTP 400 BadRequest
== ProxyAnswerRequest == == ProxyAnswerRequest ==
{ {
Sid: [generated session id of proxy] Sid: [generated session id of proxy],
Version: 1.0 Version: 1.1,
Answer: Answer:
{ {
type: answer type: answer,
sdp: [WebRTC SDP] sdp: [WebRTC SDP]
} }
} }
@ -73,34 +75,38 @@ HTTP 400 BadRequest
type ProxyPollRequest struct { type ProxyPollRequest struct {
Sid string Sid string
Version string Version string
Type string
} }
func EncodePollRequest(sid string) ([]byte, error) { func EncodePollRequest(sid string, ptype string) ([]byte, error) {
return json.Marshal(ProxyPollRequest{ return json.Marshal(ProxyPollRequest{
Sid: sid, Sid: sid,
Version: version, Version: version,
Type: ptype,
}) })
} }
// Decodes a poll message from a snowflake proxy and returns the // Decodes a poll message from a snowflake proxy and returns the
// sid of the proxy on success and an error if it failed // sid of the proxy on success and an error if it failed
func DecodePollRequest(data []byte) (string, error) { func DecodePollRequest(data []byte) (string, string, error) {
var message ProxyPollRequest var message ProxyPollRequest
err := json.Unmarshal(data, &message) err := json.Unmarshal(data, &message)
if err != nil { if err != nil {
return "", err return "", "", err
}
if message.Version != "1.0" {
return "", fmt.Errorf("using unknown version")
} }
// Version 1.0 requires an Sid majorVersion := strings.Split(message.Version, ".")[0]
if majorVersion != "1" {
return "", "", fmt.Errorf("using unknown version")
}
// Version 1.x requires an Sid
if message.Sid == "" { if message.Sid == "" {
return "", fmt.Errorf("no supplied session id") return "", "", fmt.Errorf("no supplied session id")
} }
return message.Sid, nil return message.Sid, message.Type, nil
} }
type ProxyPollResponse struct { type ProxyPollResponse struct {
@ -153,7 +159,7 @@ type ProxyAnswerRequest struct {
func EncodeAnswerRequest(answer string, sid string) ([]byte, error) { func EncodeAnswerRequest(answer string, sid string) ([]byte, error) {
return json.Marshal(ProxyAnswerRequest{ return json.Marshal(ProxyAnswerRequest{
Version: "1.0", Version: "1.1",
Sid: sid, Sid: sid,
Answer: answer, Answer: answer,
}) })
@ -167,7 +173,9 @@ func DecodeAnswerRequest(data []byte) (string, string, error) {
if err != nil { if err != nil {
return "", "", err return "", "", err
} }
if message.Version != "1.0" {
majorVersion := strings.Split(message.Version, ".")[0]
if majorVersion != "1" {
return "", "", fmt.Errorf("using unknown version") return "", "", fmt.Errorf("using unknown version")
} }

View file

@ -11,45 +11,60 @@ import (
func TestDecodeProxyPollRequest(t *testing.T) { func TestDecodeProxyPollRequest(t *testing.T) {
Convey("Context", t, func() { Convey("Context", t, func() {
for _, test := range []struct { for _, test := range []struct {
sid string sid string
data string ptype string
err error data string
err error
}{ }{
{ {
//Version 1.0 proxy message //Version 1.0 proxy message
"ymbcCMto7KHNGYlp", "ymbcCMto7KHNGYlp",
"",
`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`, `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`,
nil, nil,
}, },
{
//Version 1.1 proxy message
"ymbcCMto7KHNGYlp",
"standalone",
`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.1","Type":"standalone"}`,
nil,
},
{ {
//Version 0.X proxy message: //Version 0.X proxy message:
"", "",
"",
"ymbcCMto7KHNGYlp", "ymbcCMto7KHNGYlp",
&json.SyntaxError{}, &json.SyntaxError{},
}, },
{ {
"",
"", "",
`{"Sid":"ymbcCMto7KHNGYlp"}`, `{"Sid":"ymbcCMto7KHNGYlp"}`,
fmt.Errorf(""), fmt.Errorf(""),
}, },
{ {
"",
"", "",
"{}", "{}",
fmt.Errorf(""), fmt.Errorf(""),
}, },
{ {
"",
"", "",
`{"Version":"1.0"}`, `{"Version":"1.0"}`,
fmt.Errorf(""), fmt.Errorf(""),
}, },
{ {
"",
"", "",
`{"Version":"2.0"}`, `{"Version":"2.0"}`,
fmt.Errorf(""), fmt.Errorf(""),
}, },
} { } {
sid, err := DecodePollRequest([]byte(test.data)) sid, ptype, err := DecodePollRequest([]byte(test.data))
So(sid, ShouldResemble, test.sid) So(sid, ShouldResemble, test.sid)
So(ptype, ShouldResemble, test.ptype)
So(err, ShouldHaveSameTypeAs, test.err) So(err, ShouldHaveSameTypeAs, test.err)
} }
@ -58,10 +73,11 @@ func TestDecodeProxyPollRequest(t *testing.T) {
func TestEncodeProxyPollRequests(t *testing.T) { func TestEncodeProxyPollRequests(t *testing.T) {
Convey("Context", t, func() { Convey("Context", t, func() {
b, err := EncodePollRequest("ymbcCMto7KHNGYlp") b, err := EncodePollRequest("ymbcCMto7KHNGYlp", "standalone")
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
sid, err := DecodePollRequest(b) sid, ptype, err := DecodePollRequest(b)
So(sid, ShouldEqual, "ymbcCMto7KHNGYlp") So(sid, ShouldEqual, "ymbcCMto7KHNGYlp")
So(ptype, ShouldEqual, "standalone")
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
}) })
} }

View file

@ -175,7 +175,7 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
timeOfNextPoll = now timeOfNextPoll = now
} }
body, err := messages.EncodePollRequest(sid) body, err := messages.EncodePollRequest(sid, "standalone")
if err != nil { if err != nil {
log.Printf("Error encoding poll message: %s", err.Error()) log.Printf("Error encoding poll message: %s", err.Error())
return nil return nil