Add a new heap at the broker for restricted flakes

Now when proxies poll, they provide their NAT type to the broker. This
introduces a new snowflake heap of just restricted snowflakes that the
broker can pull from if the client has a known, unrestricted NAT. All
other clients will pull from a heap of snowflakes with unrestricted or
unknown NAT topologies.
This commit is contained in:
Cecylia Bocovich 2020-06-16 17:49:39 -04:00
parent f6cf9a453b
commit 0052c0e10c
2 changed files with 57 additions and 24 deletions

View file

@ -31,12 +31,18 @@ const (
ClientTimeout = 10 ClientTimeout = 10
ProxyTimeout = 10 ProxyTimeout = 10
readLimit = 100000 //Maximum number of bytes to be read from an HTTP request readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
NATUnknown = "unknown"
NATRestricted = "restricted"
NATUnrestricted = "unrestricted"
) )
type BrokerContext struct { type BrokerContext struct {
snowflakes *SnowflakeHeap snowflakes *SnowflakeHeap
// Map keeping track of snowflakeIDs required to match SDP answers from restrictedSnowflakes *SnowflakeHeap
// the second http POST. // Maps keeping track of snowflakeIDs required to match SDP answers from
// the second http POST. Restricted snowflakes can only be matched up with
// clients behind an unrestricted NAT.
idToSnowflake map[string]*Snowflake idToSnowflake map[string]*Snowflake
// Synchronization for the snowflake map and heap // Synchronization for the snowflake map and heap
snowflakeLock sync.Mutex snowflakeLock sync.Mutex
@ -47,6 +53,8 @@ type BrokerContext struct {
func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext { func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
snowflakes := new(SnowflakeHeap) snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes) heap.Init(snowflakes)
rSnowflakes := new(SnowflakeHeap)
heap.Init(rSnowflakes)
metrics, err := NewMetrics(metricsLogger) metrics, err := NewMetrics(metricsLogger)
if err != nil { if err != nil {
@ -59,6 +67,7 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
return &BrokerContext{ return &BrokerContext{
snowflakes: snowflakes, snowflakes: snowflakes,
restrictedSnowflakes: rSnowflakes,
idToSnowflake: make(map[string]*Snowflake), idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll), proxyPolls: make(chan *ProxyPoll),
metrics: metrics, metrics: metrics,
@ -79,7 +88,7 @@ type MetricsHandler struct {
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID") w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID, Snowflake-NAT-Type")
// Return early if it's CORS preflight. // Return early if it's CORS preflight.
if "OPTIONS" == r.Method { if "OPTIONS" == r.Method {
return return
@ -101,15 +110,17 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type ProxyPoll struct { type ProxyPoll struct {
id string id string
proxyType string proxyType string
natType string
offerChannel chan []byte offerChannel chan []byte
} }
// Registers a Snowflake and waits for some Client to send an offer, // Registers a Snowflake and waits for some Client to send an offer,
// as part of the polling logic of the proxy handler. // as part of the polling logic of the proxy handler.
func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte { func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) []byte {
request := new(ProxyPoll) request := new(ProxyPoll)
request.id = id request.id = id
request.proxyType = proxyType request.proxyType = proxyType
request.natType = natType
request.offerChannel = make(chan []byte) request.offerChannel = make(chan []byte)
ctx.proxyPolls <- request ctx.proxyPolls <- request
// Block until an offer is available, or timeout which sends a nil offer. // Block until an offer is available, or timeout which sends a nil offer.
@ -122,7 +133,7 @@ func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte {
// client offer or nil on timeout / none are available. // client offer or nil on timeout / none are available.
func (ctx *BrokerContext) Broker() { func (ctx *BrokerContext) Broker() {
for request := range ctx.proxyPolls { for request := range ctx.proxyPolls {
snowflake := ctx.AddSnowflake(request.id, request.proxyType) snowflake := ctx.AddSnowflake(request.id, request.proxyType, request.natType)
// Wait for a client to avail an offer to the snowflake. // Wait for a client to avail an offer to the snowflake.
go func(request *ProxyPoll) { go func(request *ProxyPoll) {
select { select {
@ -133,7 +144,11 @@ func (ctx *BrokerContext) Broker() {
ctx.snowflakeLock.Lock() ctx.snowflakeLock.Lock()
defer ctx.snowflakeLock.Unlock() defer ctx.snowflakeLock.Unlock()
if snowflake.index != -1 { if snowflake.index != -1 {
if request.natType == NATRestricted {
heap.Remove(ctx.restrictedSnowflakes, snowflake.index)
} else {
heap.Remove(ctx.snowflakes, snowflake.index) heap.Remove(ctx.snowflakes, snowflake.index)
}
delete(ctx.idToSnowflake, snowflake.id) delete(ctx.idToSnowflake, snowflake.id)
close(request.offerChannel) close(request.offerChannel)
} }
@ -145,7 +160,7 @@ func (ctx *BrokerContext) Broker() {
// Create and add a Snowflake to the heap. // Create and add a Snowflake to the heap.
// Required to keep track of proxies between providing them // Required to keep track of proxies between providing them
// with an offer and awaiting their second POST with an answer. // with an offer and awaiting their second POST with an answer.
func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake { func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType string) *Snowflake {
snowflake := new(Snowflake) snowflake := new(Snowflake)
snowflake.id = id snowflake.id = id
snowflake.clients = 0 snowflake.clients = 0
@ -153,7 +168,11 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
snowflake.offerChannel = make(chan []byte) snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte)
ctx.snowflakeLock.Lock() ctx.snowflakeLock.Lock()
if natType == NATRestricted {
heap.Push(ctx.restrictedSnowflakes, snowflake)
} else {
heap.Push(ctx.snowflakes, snowflake) heap.Push(ctx.snowflakes, snowflake)
}
ctx.snowflakeLock.Unlock() ctx.snowflakeLock.Unlock()
ctx.idToSnowflake[id] = snowflake ctx.idToSnowflake[id] = snowflake
return snowflake return snowflake
@ -170,7 +189,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return return
} }
sid, proxyType, _, err := messages.DecodePollRequest(body) sid, proxyType, natType, err := messages.DecodePollRequest(body)
if err != nil { if err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
@ -187,7 +206,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
} }
// Wait for a client to avail an offer to the snowflake, or timeout if nil. // Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer := ctx.RequestOffer(sid, proxyType) offer := ctx.RequestOffer(sid, proxyType, natType)
var b []byte var b []byte
if nil == offer { if nil == offer {
ctx.metrics.lock.Lock() ctx.metrics.lock.Lock()
@ -226,9 +245,23 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
natType := r.Header.Get("Snowflake-NAT-Type")
if natType == "" {
natType = NATUnknown
}
// Only hand out known restricted snowflakes to unrestricted clients
var snowflakeHeap *SnowflakeHeap
if natType == NATUnrestricted {
snowflakeHeap = ctx.restrictedSnowflakes
} else {
snowflakeHeap = ctx.snowflakes
}
// Immediately fail if there are no snowflakes available. // Immediately fail if there are no snowflakes available.
ctx.snowflakeLock.Lock() ctx.snowflakeLock.Lock()
numSnowflakes := ctx.snowflakes.Len() numSnowflakes := snowflakeHeap.Len()
ctx.snowflakeLock.Unlock() ctx.snowflakeLock.Unlock()
if numSnowflakes <= 0 { if numSnowflakes <= 0 {
ctx.metrics.lock.Lock() ctx.metrics.lock.Lock()
@ -240,7 +273,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
// Otherwise, find the most available snowflake proxy, and pass the offer to it. // Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later. // Delete must be deferred in order to correctly process answer request later.
ctx.snowflakeLock.Lock() ctx.snowflakeLock.Lock()
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake) snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
ctx.snowflakeLock.Unlock() ctx.snowflakeLock.Unlock()
snowflake.offerChannel <- offer snowflake.offerChannel <- offer

View file

@ -29,7 +29,7 @@ func TestBroker(t *testing.T) {
Convey("Adds Snowflake", func() { Convey("Adds Snowflake", func() {
So(ctx.snowflakes.Len(), ShouldEqual, 0) So(ctx.snowflakes.Len(), ShouldEqual, 0)
So(len(ctx.idToSnowflake), ShouldEqual, 0) So(len(ctx.idToSnowflake), ShouldEqual, 0)
ctx.AddSnowflake("foo", "") ctx.AddSnowflake("foo", "", NATUnknown)
So(ctx.snowflakes.Len(), ShouldEqual, 1) So(ctx.snowflakes.Len(), ShouldEqual, 1)
So(len(ctx.idToSnowflake), ShouldEqual, 1) So(len(ctx.idToSnowflake), ShouldEqual, 1)
}) })
@ -55,7 +55,7 @@ func TestBroker(t *testing.T) {
Convey("Request an offer from the Snowflake Heap", func() { Convey("Request an offer from the Snowflake Heap", func() {
done := make(chan []byte) done := make(chan []byte)
go func() { go func() {
offer := ctx.RequestOffer("test", "") offer := ctx.RequestOffer("test", "", NATUnknown)
done <- offer done <- offer
}() }()
request := <-ctx.proxyPolls request := <-ctx.proxyPolls
@ -79,7 +79,7 @@ func TestBroker(t *testing.T) {
Convey("with a proxy answer if available.", func() { Convey("with a proxy answer if available.", func() {
done := make(chan bool) done := make(chan bool)
// Prepare a fake proxy to respond with. // Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake", "") snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() { go func() {
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
done <- true done <- true
@ -97,7 +97,7 @@ func TestBroker(t *testing.T) {
return return
} }
done := make(chan bool) done := make(chan bool)
snowflake := ctx.AddSnowflake("fake", "") snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() { go func() {
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
// Takes a few seconds here... // Takes a few seconds here...
@ -147,7 +147,7 @@ func TestBroker(t *testing.T) {
}) })
Convey("Responds to proxy answers...", func() { Convey("Responds to proxy answers...", func() {
s := ctx.AddSnowflake("test", "") s := ctx.AddSnowflake("test", "", NATUnknown)
w := httptest.NewRecorder() w := httptest.NewRecorder()
data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`)) data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
@ -260,7 +260,7 @@ func TestBroker(t *testing.T) {
// Manually do the Broker goroutine action here for full control. // Manually do the Broker goroutine action here for full control.
p := <-ctx.proxyPolls p := <-ctx.proxyPolls
So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp")
s := ctx.AddSnowflake(p.id, "") s := ctx.AddSnowflake(p.id, "", NATUnknown)
go func() { go func() {
offer := <-s.offerChannel offer := <-s.offerChannel
p.offerChannel <- offer p.offerChannel <- offer
@ -537,7 +537,7 @@ func TestMetrics(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
// Prepare a fake proxy to respond with. // Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake", "") snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() { go func() {
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
done <- true done <- true