Convert Broker SnowflakeHeap test to goconvey, and async test for client handler

This commit is contained in:
Serene Han 2016-02-13 11:47:57 -08:00
parent b04d1f67fb
commit 0e1c5a1756
2 changed files with 120 additions and 78 deletions

View file

@ -24,20 +24,29 @@ const (
ProxyTimeout = 10 ProxyTimeout = 10
) )
type SnowflakeContext struct { type BrokerContext struct {
snowflakes *SnowflakeHeap snowflakes *SnowflakeHeap
// Map keeping track of snowflakeIDs required to match SDP answers from // Map keeping track of snowflakeIDs required to match SDP answers from
// the second http POST. // the second http POST.
snowflakeMap map[string]*Snowflake snowflakeMap map[string]*Snowflake
} }
func NewBrokerContext() *BrokerContext {
snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes)
return &BrokerContext{
snowflakes: snowflakes,
snowflakeMap: make(map[string]*Snowflake),
}
}
type SnowflakeHandler struct { type SnowflakeHandler struct {
*SnowflakeContext *BrokerContext
h func(*SnowflakeContext, http.ResponseWriter, *http.Request) h func(*BrokerContext, http.ResponseWriter, *http.Request)
} }
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sh.h(sh.SnowflakeContext, w, r) sh.h(sh.BrokerContext, w, r)
} }
type ProxyRequest struct { type ProxyRequest struct {
@ -48,9 +57,7 @@ type ProxyRequest struct {
var createChan = make(chan *ProxyRequest) var createChan = make(chan *ProxyRequest)
// Create and add a Snowflake to the heap. // Create and add a Snowflake to the heap.
func (sc *SnowflakeContext) AddSnowflake(id string) *Snowflake { func (sc *BrokerContext) AddSnowflake(id string) *Snowflake {
log.Println(sc.snowflakes)
snowflake := new(Snowflake) snowflake := new(Snowflake)
snowflake.id = id snowflake.id = id
snowflake.clients = 0 snowflake.clients = 0
@ -58,16 +65,12 @@ func (sc *SnowflakeContext) AddSnowflake(id string) *Snowflake {
snowflake.answerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte)
heap.Push(sc.snowflakes, snowflake) heap.Push(sc.snowflakes, snowflake)
sc.snowflakeMap[id] = snowflake sc.snowflakeMap[id] = snowflake
log.Println("Total snowflakes available: ", sc.snowflakes.Len())
log.Println(sc.snowflakes)
log.Println(sc.snowflakeMap)
return snowflake return snowflake
} }
func (sc *SnowflakeContext) Broker(proxies <-chan *ProxyRequest) { // Match proxies to clients.
func (sc *BrokerContext) Broker(proxies <-chan *ProxyRequest) {
for p := range proxies { for p := range proxies {
log.Println("adding ", p.id)
snowflake := sc.AddSnowflake(p.id) snowflake := sc.AddSnowflake(p.id)
// Wait for a client to avail an offer to the snowflake, or timeout // Wait for a client to avail an offer to the snowflake, or timeout
// and ask the snowflake to poll later. // and ask the snowflake to poll later.
@ -115,7 +118,7 @@ Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client. the HTTP response back to the client.
*/ */
func clientHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) { func clientHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
offer, err := ioutil.ReadAll(r.Body) offer, err := ioutil.ReadAll(r.Body)
if nil != err { if nil != err {
log.Println("Invalid data.") log.Println("Invalid data.")
@ -153,7 +156,7 @@ func clientHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request
/* /*
For snowflake proxies to request a client from the Broker. For snowflake proxies to request a client from the Broker.
*/ */
func proxyHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) { func proxyHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
if isPreflight(w, r) { if isPreflight(w, r) {
return return
} }
@ -192,7 +195,7 @@ Expects snowflake proxes which have previously successfully received
an offer from proxyHandler to respond with an answer in an HTTP POST, an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client. which the broker will pass back to the original client.
*/ */
func answerHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) { func answerHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
if isPreflight(w, r) { if isPreflight(w, r) {
return return
} }
@ -214,28 +217,19 @@ func answerHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request
snowflake.answerChannel <- body snowflake.answerChannel <- body
} }
func debugHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) { func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
s := fmt.Sprintf("current: %d", ctx.snowflakes.Len()) s := fmt.Sprintf("current: %d", ctx.snowflakes.Len())
w.Write([]byte(s)) w.Write([]byte(s))
} }
func init() { func init() {
// snowflakeMap = make(map[string]*Snowflake) ctx := NewBrokerContext()
snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes)
ctx := &SnowflakeContext{
snowflakes: snowflakes,
snowflakeMap: make(map[string]*Snowflake),
}
go ctx.Broker(createChan) go ctx.Broker(createChan)
http.HandleFunc("/robots.txt", robotsTxtHandler) http.HandleFunc("/robots.txt", robotsTxtHandler)
http.HandleFunc("/ip", ipHandler) http.HandleFunc("/ip", ipHandler)
// http.HandleFunc("/client", clientHandler)
// http.HandleFunc("/proxy", proxyHandler)
// http.HandleFunc("/answer", answerHandler)
http.Handle("/client", SnowflakeHandler{ctx, clientHandler}) http.Handle("/client", SnowflakeHandler{ctx, clientHandler})
http.Handle("/proxy", SnowflakeHandler{ctx, proxyHandler}) http.Handle("/proxy", SnowflakeHandler{ctx, proxyHandler})
http.Handle("/answer", SnowflakeHandler{ctx, answerHandler}) http.Handle("/answer", SnowflakeHandler{ctx, answerHandler})

View file

@ -1,67 +1,115 @@
package snowflake_broker package snowflake_broker
import ( import (
"bytes"
"container/heap" "container/heap"
. "github.com/smartystreets/goconvey/convey"
"net/http"
"net/http/httptest"
"testing" "testing"
) )
func TestSnowflakeHeap(t *testing.T) { func TestBroker(t *testing.T) {
h := new(SnowflakeHeap)
heap.Init(h)
if 0 != h.Len() {
t.Error("Unexpected length.")
}
s1 := new(Snowflake)
s2 := new(Snowflake)
s3 := new(Snowflake)
s4 := new(Snowflake)
s1.clients = 4 Convey("Context", t, func() {
s2.clients = 5 ctx := NewBrokerContext()
s3.clients = 3
s4.clients = 1
heap.Push(h, s1) Convey("Adds Snowflake", func() {
if 1 != h.Len() { ctx := NewBrokerContext()
} So(ctx.snowflakes.Len(), ShouldEqual, 0)
heap.Push(h, s2) So(len(ctx.snowflakeMap), ShouldEqual, 0)
heap.Push(h, s3) ctx.AddSnowflake("foo")
heap.Push(h, s4) So(ctx.snowflakes.Len(), ShouldEqual, 1)
So(len(ctx.snowflakeMap), ShouldEqual, 1)
})
if 4 != h.Len() { Convey("Responds to client offers...", func() {
t.Error("Unexpected length.")
}
heap.Remove(h, 0) w := httptest.NewRecorder()
if 3 != h.Len() { data := bytes.NewReader([]byte("test"))
t.Error("Unexpected length.") r, err := http.NewRequest("POST", "broker.com/client", data)
} So(err, ShouldBeNil)
r := heap.Pop(h).(*Snowflake) Convey("with 503 when no snowflakes are available.", func() {
if 2 != h.Len() { clientHandler(ctx, w, r)
t.Error("Unexpected length.") h := w.Header()
} So(h["Access-Control-Allow-Headers"], ShouldNotBeNil)
if r.clients != 3 { So(w.Code, ShouldEqual, http.StatusServiceUnavailable)
t.Error("Unexpected clients: ", r.clients) So(w.Body.String(), ShouldEqual, "")
} })
if r.index != -1 {
t.Error("Unexpected index: ", r.index)
}
r = heap.Pop(h).(*Snowflake) Convey("with a proxy answer if available.", func() {
if 1 != h.Len() { done := make(chan bool)
t.Error("Unexpected length.") // Prepare a fake proxy to respond with.
} snowflake := ctx.AddSnowflake("fake")
if r.clients != 4 { go func() {
t.Error("Unexpected clients: ", r.clients) clientHandler(ctx, w, r)
} done <- true
}()
offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test"))
snowflake.answerChannel <- []byte("fake answer")
<-done
So(w.Body.String(), ShouldEqual, "fake answer")
So(w.Code, ShouldEqual, http.StatusOK)
})
r = heap.Pop(h).(*Snowflake) Convey("Times out when no proxy responds.", func() {
if r.clients != 5 { done := make(chan bool)
t.Error("Unexpected clients: ", r.clients) snowflake := ctx.AddSnowflake("fake")
} go func() {
clientHandler(ctx, w, r)
done <- true
}()
offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test"))
<-done
So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
})
if 0 != h.Len() { })
t.Error("Unexpected length.") })
} }
func TestSnowflakeHeap(t *testing.T) {
Convey("SnowflakeHeap", t, func() {
h := new(SnowflakeHeap)
heap.Init(h)
So(h.Len(), ShouldEqual, 0)
s1 := new(Snowflake)
s2 := new(Snowflake)
s3 := new(Snowflake)
s4 := new(Snowflake)
s1.clients = 4
s2.clients = 5
s3.clients = 3
s4.clients = 1
heap.Push(h, s1)
So(h.Len(), ShouldEqual, 1)
heap.Push(h, s2)
So(h.Len(), ShouldEqual, 2)
heap.Push(h, s3)
So(h.Len(), ShouldEqual, 3)
heap.Push(h, s4)
So(h.Len(), ShouldEqual, 4)
heap.Remove(h, 0)
So(h.Len(), ShouldEqual, 3)
r := heap.Pop(h).(*Snowflake)
So(h.Len(), ShouldEqual, 2)
So(r.clients, ShouldEqual, 3)
So(r.index, ShouldEqual, -1)
r = heap.Pop(h).(*Snowflake)
So(h.Len(), ShouldEqual, 1)
So(r.clients, ShouldEqual, 4)
So(r.index, ShouldEqual, -1)
r = heap.Pop(h).(*Snowflake)
So(h.Len(), ShouldEqual, 0)
So(r.clients, ShouldEqual, 5)
So(r.index, ShouldEqual, -1)
})
} }