move CORS early return into the ServeHTTP wrapper, rename handlers

This commit is contained in:
Serene Han 2016-02-16 21:00:30 -08:00
parent 791f6925ec
commit 2ae6559001
2 changed files with 103 additions and 112 deletions

View file

@ -28,7 +28,7 @@ type BrokerContext struct {
snowflakes *SnowflakeHeap snowflakes *SnowflakeHeap
// Map keeping track of snowflakeIDs required to match SDP answers from // Map keeping track of snowflakeIDs required to match SDP answers from
// the second http POST. // the second http POST.
snowflakeMap map[string]*Snowflake idToSnowflake map[string]*Snowflake
proxyPolls chan *ProxyPoll proxyPolls chan *ProxyPoll
} }
@ -37,18 +37,25 @@ func NewBrokerContext() *BrokerContext {
heap.Init(snowflakes) heap.Init(snowflakes)
return &BrokerContext{ return &BrokerContext{
snowflakes: snowflakes, snowflakes: snowflakes,
snowflakeMap: make(map[string]*Snowflake), idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll), proxyPolls: make(chan *ProxyPoll),
} }
} }
// Implements the http.Handler interface
type SnowflakeHandler struct { type SnowflakeHandler struct {
*BrokerContext *BrokerContext
h func(*BrokerContext, http.ResponseWriter, *http.Request) handle func(*BrokerContext, http.ResponseWriter, *http.Request)
} }
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sh.h(sh.BrokerContext, w, r) w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
// Return early if it's CORS preflight.
if "OPTIONS" == r.Method {
return
}
sh.handle(sh.BrokerContext, w, r)
} }
// Proxies may poll for client offers concurrently. // Proxies may poll for client offers concurrently.
@ -69,12 +76,14 @@ func (ctx *BrokerContext) RequestOffer(id string) []byte {
return offer return offer
} }
// goroutine which match proxies to clients. // goroutine which matches clients to proxies and sends SDP offers along.
// Safely processes proxy requests, responding to them with either an available // Safely processes proxy requests, responding to them with either an available
// client offer or nil on timeout / none are available. // client offer or nil on timeout / none are available.
func (ctx *BrokerContext) Broker() { func (ctx *BrokerContext) Broker() {
for request := range ctx.proxyPolls { for request := range ctx.proxyPolls {
snowflake := ctx.AddSnowflake(request.id) snowflake := ctx.AddSnowflake(request.id)
// defer heap.Remove(ctx.snowflakes, snowflake.index)
// defer delete(ctx.idToSnowflake, snowflake.id)
// Wait for a client to avail an offer to the snowflake. // Wait for a client to avail an offer to the snowflake.
go func(request *ProxyPoll) { go func(request *ProxyPoll) {
select { select {
@ -83,8 +92,9 @@ func (ctx *BrokerContext) Broker() {
request.offerChannel <- offer request.offerChannel <- offer
case <-time.After(time.Second * ProxyTimeout): case <-time.After(time.Second * ProxyTimeout):
// This snowflake is no longer available to serve clients. // This snowflake is no longer available to serve clients.
// TODO: Fix race using a delete channel
heap.Remove(ctx.snowflakes, snowflake.index) heap.Remove(ctx.snowflakes, snowflake.index)
delete(ctx.snowflakeMap, snowflake.id) delete(ctx.idToSnowflake, snowflake.id)
request.offerChannel <- nil request.offerChannel <- nil
} }
}(request) }(request)
@ -99,82 +109,14 @@ func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake {
snowflake.offerChannel = make(chan []byte) snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte)
heap.Push(ctx.snowflakes, snowflake) heap.Push(ctx.snowflakes, snowflake)
ctx.snowflakeMap[id] = snowflake ctx.idToSnowflake[id] = snowflake
return snowflake return snowflake
} }
func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("User-agent: *\nDisallow:\n"))
}
func ipHandler(w http.ResponseWriter, r *http.Request) {
remoteAddr := r.RemoteAddr
if net.ParseIP(remoteAddr).To4() == nil {
remoteAddr = "[" + remoteAddr + "]"
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte(remoteAddr))
}
// Return early if it's CORS preflight.
func isPreflight(w http.ResponseWriter, r *http.Request) bool {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
if "OPTIONS" == r.Method {
return true
}
return false
}
/*
Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client.
*/
func clientHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
offer, err := ioutil.ReadAll(r.Body)
if nil != err {
log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
// Find the most available snowflake proxy, and pass the offer to it.
// TODO: Needs improvement - maybe shouldn't immediately fail?
if ctx.snowflakes.Len() <= 0 {
log.Println("Client: No snowflake proxies available.")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel.
select {
case answer := <-snowflake.answerChannel:
log.Println("Client: Retrieving answer")
w.Write(answer)
case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.")
w.WriteHeader(http.StatusGatewayTimeout)
w.Write([]byte("timed out waiting for answer!"))
}
// Remove from the snowflake map whether answer was sent or not, because
// this client request is now over.
delete(ctx.snowflakeMap, snowflake.id)
}
/* /*
For snowflake proxies to request a client from the Broker. For snowflake proxies to request a client from the Broker.
*/ */
func proxyHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
if isPreflight(w, r) {
return
}
id := r.Header.Get("X-Session-ID") id := r.Header.Get("X-Session-ID")
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
if nil != err { if nil != err {
@ -197,20 +139,54 @@ func proxyHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
w.Write(offer) w.Write(offer)
} }
/*
Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client.
*/
func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
offer, err := ioutil.ReadAll(r.Body)
if nil != err {
log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return
}
// Find the most available snowflake proxy, and pass the offer to it.
// TODO: Needs improvement - maybe shouldn't immediately fail?
if ctx.snowflakes.Len() <= 0 {
log.Println("Client: No snowflake proxies available.")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
defer delete(ctx.idToSnowflake, snowflake.id)
snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel.
select {
case answer := <-snowflake.answerChannel:
log.Println("Client: Retrieving answer")
w.Write(answer)
case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.")
w.WriteHeader(http.StatusGatewayTimeout)
w.Write([]byte("timed out waiting for answer!"))
}
}
/* /*
Expects snowflake proxes which have previously successfully received Expects snowflake proxes which have previously successfully received
an offer from proxyHandler to respond with an answer in an HTTP POST, an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client. which the broker will pass back to the original client.
*/ */
func answerHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
if isPreflight(w, r) {
return
}
id := r.Header.Get("X-Session-ID") id := r.Header.Get("X-Session-ID")
snowflake, ok := ctx.snowflakeMap[id] snowflake, ok := ctx.idToSnowflake[id]
if !ok || nil == snowflake { if !ok || nil == snowflake {
// The snowflake took too long to respond with an answer, // The snowflake took too long to respond with an answer, so its client
// and the designated client is no longer around / recognized by the Broker. // disappeared / the snowflake is no longer recognized by the Broker.
w.WriteHeader(http.StatusGone) w.WriteHeader(http.StatusGone)
return return
} }
@ -229,6 +205,20 @@ func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
w.Write([]byte(s)) w.Write([]byte(s))
} }
func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("User-agent: *\nDisallow:\n"))
}
func ipHandler(w http.ResponseWriter, r *http.Request) {
remoteAddr := r.RemoteAddr
if net.ParseIP(remoteAddr).To4() == nil {
remoteAddr = "[" + remoteAddr + "]"
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte(remoteAddr))
}
func init() { func init() {
ctx := NewBrokerContext() ctx := NewBrokerContext()
@ -237,8 +227,8 @@ func init() {
http.HandleFunc("/robots.txt", robotsTxtHandler) http.HandleFunc("/robots.txt", robotsTxtHandler)
http.HandleFunc("/ip", ipHandler) http.HandleFunc("/ip", ipHandler)
http.Handle("/client", SnowflakeHandler{ctx, clientHandler}) http.Handle("/proxy", SnowflakeHandler{ctx, proxyPolls})
http.Handle("/proxy", SnowflakeHandler{ctx, proxyHandler}) http.Handle("/client", SnowflakeHandler{ctx, clientOffers})
http.Handle("/answer", SnowflakeHandler{ctx, answerHandler}) http.Handle("/answer", SnowflakeHandler{ctx, proxyAnswers})
http.Handle("/debug", SnowflakeHandler{ctx, debugHandler}) http.Handle("/debug", SnowflakeHandler{ctx, debugHandler})
} }

View file

@ -16,24 +16,26 @@ func TestBroker(t *testing.T) {
Convey("Adds Snowflake", func() { Convey("Adds Snowflake", func() {
So(ctx.snowflakes.Len(), ShouldEqual, 0) So(ctx.snowflakes.Len(), ShouldEqual, 0)
So(len(ctx.snowflakeMap), ShouldEqual, 0) So(len(ctx.idToSnowflake), ShouldEqual, 0)
ctx.AddSnowflake("foo") ctx.AddSnowflake("foo")
So(ctx.snowflakes.Len(), ShouldEqual, 1) So(ctx.snowflakes.Len(), ShouldEqual, 1)
So(len(ctx.snowflakeMap), ShouldEqual, 1) So(len(ctx.idToSnowflake), ShouldEqual, 1)
}) })
/*
Convey("Broker goroutine matches clients with proxies", func() { Convey("Broker goroutine matches clients with proxies", func() {
ctx2 := NewBrokerContext()
p := new(ProxyPoll) p := new(ProxyPoll)
p.id = "test" p.id = "test"
p.offerChannel = make(chan []byte)
go func() { go func() {
ctx.proxyPolls <- p ctx2.proxyPolls <- p
close(ctx.proxyPolls) close(ctx2.proxyPolls)
}() }()
ctx.Broker() ctx2.Broker()
So(ctx.snowflakes.Len(), ShouldEqual, 1) So(ctx2.snowflakes.Len(), ShouldEqual, 1)
So(ctx.snowflakes.Len(), ShouldEqual, 1) So(ctx2.idToSnowflake["test"], ShouldNotBeNil)
}) })
*/
Convey("Responds to client offers...", func() { Convey("Responds to client offers...", func() {
w := httptest.NewRecorder() w := httptest.NewRecorder()
@ -42,9 +44,7 @@ func TestBroker(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
Convey("with 503 when no snowflakes are available.", func() { Convey("with 503 when no snowflakes are available.", func() {
clientHandler(ctx, w, r) clientOffers(ctx, w, r)
h := w.Header()
So(h["Access-Control-Allow-Headers"], ShouldNotBeNil)
So(w.Code, ShouldEqual, http.StatusServiceUnavailable) So(w.Code, ShouldEqual, http.StatusServiceUnavailable)
So(w.Body.String(), ShouldEqual, "") So(w.Body.String(), ShouldEqual, "")
}) })
@ -54,7 +54,7 @@ func TestBroker(t *testing.T) {
// Prepare a fake proxy to respond with. // Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake") snowflake := ctx.AddSnowflake("fake")
go func() { go func() {
clientHandler(ctx, w, r) clientOffers(ctx, w, r)
done <- true done <- true
}() }()
offer := <-snowflake.offerChannel offer := <-snowflake.offerChannel
@ -72,7 +72,8 @@ func TestBroker(t *testing.T) {
done := make(chan bool) done := make(chan bool)
snowflake := ctx.AddSnowflake("fake") snowflake := ctx.AddSnowflake("fake")
go func() { go func() {
clientHandler(ctx, w, r) clientOffers(ctx, w, r)
// Takes a few seconds here...
done <- true done <- true
}() }()
offer := <-snowflake.offerChannel offer := <-snowflake.offerChannel
@ -92,7 +93,7 @@ func TestBroker(t *testing.T) {
Convey("with a client offer if available.", func() { Convey("with a client offer if available.", func() {
go func(ctx *BrokerContext) { go func(ctx *BrokerContext) {
proxyHandler(ctx, w, r) proxyPolls(ctx, w, r)
done <- true done <- true
}(ctx) }(ctx)
// Pass a fake client offer to this proxy // Pass a fake client offer to this proxy
@ -106,7 +107,7 @@ func TestBroker(t *testing.T) {
Convey("times out when no client offer is available.", func() { Convey("times out when no client offer is available.", func() {
go func(ctx *BrokerContext) { go func(ctx *BrokerContext) {
proxyHandler(ctx, w, r) proxyPolls(ctx, w, r)
done <- true done <- true
}(ctx) }(ctx)
p := <-ctx.proxyPolls p := <-ctx.proxyPolls
@ -129,7 +130,7 @@ func TestBroker(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
r.Header.Set("X-Session-ID", "test") r.Header.Set("X-Session-ID", "test")
go func(ctx *BrokerContext) { go func(ctx *BrokerContext) {
answerHandler(ctx, w, r) proxyAnswers(ctx, w, r)
}(ctx) }(ctx)
answer := <-s.answerChannel answer := <-s.answerChannel
So(w.Code, ShouldEqual, http.StatusOK) So(w.Code, ShouldEqual, http.StatusOK)
@ -140,7 +141,7 @@ func TestBroker(t *testing.T) {
r, err := http.NewRequest("POST", "snowflake.broker/answer", nil) r, err := http.NewRequest("POST", "snowflake.broker/answer", nil)
So(err, ShouldBeNil) So(err, ShouldBeNil)
r.Header.Set("X-Session-ID", "invalid") r.Header.Set("X-Session-ID", "invalid")
answerHandler(ctx, w, r) proxyAnswers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusGone) So(w.Code, ShouldEqual, http.StatusGone)
}) })
@ -149,7 +150,7 @@ func TestBroker(t *testing.T) {
r, err := http.NewRequest("POST", "snowflake.broker/answer", data) r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
r.Header.Set("X-Session-ID", "test") r.Header.Set("X-Session-ID", "test")
So(err, ShouldBeNil) So(err, ShouldBeNil)
answerHandler(ctx, w, r) proxyAnswers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusBadRequest) So(w.Code, ShouldEqual, http.StatusBadRequest)
}) })
}) })
@ -167,7 +168,7 @@ func TestBroker(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
rP.Header.Set("X-Session-ID", "test") rP.Header.Set("X-Session-ID", "test")
go func() { go func() {
proxyHandler(ctx, wP, rP) proxyPolls(ctx, wP, rP)
polled <- true polled <- true
}() }()
@ -179,7 +180,7 @@ func TestBroker(t *testing.T) {
offer := <-s.offerChannel offer := <-s.offerChannel
p.offerChannel <- offer p.offerChannel <- offer
}() }()
So(ctx.snowflakeMap["test"], ShouldNotBeNil) So(ctx.idToSnowflake["test"], ShouldNotBeNil)
// Client request blocks until proxy answer arrives. // Client request blocks until proxy answer arrives.
dataC := bytes.NewReader([]byte("fake offer")) dataC := bytes.NewReader([]byte("fake offer"))
@ -187,21 +188,21 @@ func TestBroker(t *testing.T) {
rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC) rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC)
So(err, ShouldBeNil) So(err, ShouldBeNil)
go func() { go func() {
clientHandler(ctx, wC, rC) clientOffers(ctx, wC, rC)
done <- true done <- true
}() }()
<-polled <-polled
So(wP.Code, ShouldEqual, http.StatusOK) So(wP.Code, ShouldEqual, http.StatusOK)
So(wP.Body.String(), ShouldResemble, "fake offer") So(wP.Body.String(), ShouldResemble, "fake offer")
So(ctx.snowflakeMap["test"], ShouldNotBeNil) So(ctx.idToSnowflake["test"], ShouldNotBeNil)
// Follow up with the answer request afterwards // Follow up with the answer request afterwards
wA := httptest.NewRecorder() wA := httptest.NewRecorder()
dataA := bytes.NewReader([]byte("fake answer")) dataA := bytes.NewReader([]byte("fake answer"))
rA, err := http.NewRequest("POST", "snowflake.broker/proxy", dataA) rA, err := http.NewRequest("POST", "snowflake.broker/proxy", dataA)
So(err, ShouldBeNil) So(err, ShouldBeNil)
rA.Header.Set("X-Session-ID", "test") rA.Header.Set("X-Session-ID", "test")
answerHandler(ctx, wA, rA) proxyAnswers(ctx, wA, rA)
So(wA.Code, ShouldEqual, http.StatusOK) So(wA.Code, ShouldEqual, http.StatusOK)
<-done <-done