Add synchronization to prevent race in broker

There's a race condition in the broker where both the proxy and the
client processes try to pop/remove the same snowflake from the heap.
This patch adds synchronization to prevent simultaneous accesses to
snowflakes.
This commit is contained in:
Cecylia Bocovich 2019-11-22 17:15:06 -05:00
parent 07f2cd8073
commit dccc15a6e9

View file

@ -18,6 +18,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@ -37,6 +38,8 @@ type BrokerContext struct {
// Map keeping track of snowflakeIDs required to match SDP answers from // Map keeping track of snowflakeIDs required to match SDP answers from
// the second http POST. // the second http POST.
idToSnowflake map[string]*Snowflake idToSnowflake map[string]*Snowflake
// Synchronization for the
snowflakeLock sync.Mutex
proxyPolls chan *ProxyPoll proxyPolls chan *ProxyPoll
metrics *Metrics metrics *Metrics
} }
@ -127,10 +130,13 @@ func (ctx *BrokerContext) Broker() {
request.offerChannel <- offer request.offerChannel <- offer
case <-time.After(time.Second * ProxyTimeout): case <-time.After(time.Second * ProxyTimeout):
// This snowflake is no longer available to serve clients. // This snowflake is no longer available to serve clients.
// TODO: Fix race using a delete channel ctx.snowflakeLock.Lock()
defer ctx.snowflakeLock.Unlock()
if snowflake.index != -1 {
heap.Remove(ctx.snowflakes, snowflake.index) heap.Remove(ctx.snowflakes, snowflake.index)
delete(ctx.idToSnowflake, snowflake.id) delete(ctx.idToSnowflake, snowflake.id)
request.offerChannel <- nil close(request.offerChannel)
}
} }
}(request) }(request)
} }
@ -146,7 +152,9 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
snowflake.proxyType = proxyType snowflake.proxyType = proxyType
snowflake.offerChannel = make(chan []byte) snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte)
ctx.snowflakeLock.Lock()
heap.Push(ctx.snowflakes, snowflake) heap.Push(ctx.snowflakes, snowflake)
ctx.snowflakeLock.Unlock()
ctx.idToSnowflake[id] = snowflake ctx.idToSnowflake[id] = snowflake
return snowflake return snowflake
} }
@ -215,15 +223,19 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return return
} }
// Immediately fail if there are no snowflakes available. // Immediately fail if there are no snowflakes available.
if ctx.snowflakes.Len() <= 0 { ctx.snowflakeLock.Lock()
numSnowflakes := ctx.snowflakes.Len()
ctx.snowflakeLock.Unlock()
if numSnowflakes <= 0 {
ctx.metrics.clientDeniedCount++ ctx.metrics.clientDeniedCount++
w.WriteHeader(http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable)
return return
} }
// Otherwise, find the most available snowflake proxy, and pass the offer to it. // Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later. // Delete must be deferred in order to correctly process answer request later.
ctx.snowflakeLock.Lock()
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake) snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
defer delete(ctx.idToSnowflake, snowflake.id) ctx.snowflakeLock.Unlock()
snowflake.offerChannel <- offer snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel or timeout. // Wait for the answer to be returned on the channel or timeout.
@ -243,6 +255,10 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
log.Printf("unable to write timeout error, failed with error: %v", err) log.Printf("unable to write timeout error, failed with error: %v", err)
} }
} }
ctx.snowflakeLock.Lock()
delete(ctx.idToSnowflake, snowflake.id)
ctx.snowflakeLock.Unlock()
} }
/* /*
@ -266,7 +282,9 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
} }
var success = true var success = true
ctx.snowflakeLock.Lock()
snowflake, ok := ctx.idToSnowflake[id] snowflake, ok := ctx.idToSnowflake[id]
ctx.snowflakeLock.Unlock()
if !ok || nil == snowflake { if !ok || nil == snowflake {
// The snowflake took too long to respond with an answer, so its client // The snowflake took too long to respond with an answer, so its client
// disappeared / the snowflake is no longer recognized by the Broker. // disappeared / the snowflake is no longer recognized by the Broker.
@ -287,9 +305,10 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
} }
func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
var webexts, browsers, standalones, unknowns int var webexts, browsers, standalones, unknowns int
ctx.snowflakeLock.Lock()
s := fmt.Sprintf("current snowflakes available: %d\n", len(ctx.idToSnowflake))
for _, snowflake := range ctx.idToSnowflake { for _, snowflake := range ctx.idToSnowflake {
if snowflake.proxyType == "badge" { if snowflake.proxyType == "badge" {
browsers++ browsers++
@ -302,6 +321,7 @@ func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
} }
} }
ctx.snowflakeLock.Unlock()
s += fmt.Sprintf("\tstandalone proxies: %d", standalones) s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers) s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
s += fmt.Sprintf("\n\twebext proxies: %d", webexts) s += fmt.Sprintf("\n\twebext proxies: %d", webexts)