Async test for Broker's proxy handler

This commit is contained in:
Serene Han 2016-02-14 16:19:20 -08:00
parent e13b35faf3
commit 032ab6bcb8
2 changed files with 50 additions and 11 deletions

View file

@ -29,6 +29,7 @@ type BrokerContext struct {
// Map keeping track of snowflakeIDs required to match SDP answers from
// the second http POST.
snowflakeMap map[string]*Snowflake
createChan chan *ProxyRequest
}
func NewBrokerContext() *BrokerContext {
@ -37,6 +38,7 @@ func NewBrokerContext() *BrokerContext {
return &BrokerContext{
snowflakes: snowflakes,
snowflakeMap: make(map[string]*Snowflake),
createChan: make(chan *ProxyRequest),
}
}
@ -54,8 +56,6 @@ type ProxyRequest struct {
offerChan chan []byte
}
var createChan = make(chan *ProxyRequest)
// Create and add a Snowflake to the heap.
func (sc *BrokerContext) AddSnowflake(id string) *Snowflake {
snowflake := new(Snowflake)
@ -69,9 +69,11 @@ func (sc *BrokerContext) AddSnowflake(id string) *Snowflake {
}
// Match proxies to clients.
func (sc *BrokerContext) Broker(proxies <-chan *ProxyRequest) {
for p := range proxies {
snowflake := sc.AddSnowflake(p.id)
// func (ctx *BrokerContext) Broker(proxies <-chan *ProxyRequest) {
func (ctx *BrokerContext) Broker() {
// for p := range proxies {
for p := range ctx.createChan {
snowflake := ctx.AddSnowflake(p.id)
// Wait for a client to avail an offer to the snowflake, or timeout
// and ask the snowflake to poll later.
go func(p *ProxyRequest) {
@ -81,8 +83,8 @@ func (sc *BrokerContext) Broker(proxies <-chan *ProxyRequest) {
p.offerChan <- offer
case <-time.After(time.Second * ProxyTimeout):
// This snowflake is no longer available to serve clients.
heap.Remove(sc.snowflakes, snowflake.index)
delete(sc.snowflakeMap, snowflake.id)
heap.Remove(ctx.snowflakes, snowflake.index)
delete(ctx.snowflakeMap, snowflake.id)
p.offerChan <- nil
}
}(p)
@ -176,7 +178,7 @@ func proxyHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
p := new(ProxyRequest)
p.id = id
p.offerChan = make(chan []byte)
createChan <- p
ctx.createChan <- p
// Wait for a client to avail an offer to the snowflake, or timeout
// and ask the snowflake to poll later.
@ -225,7 +227,7 @@ func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
func init() {
ctx := NewBrokerContext()
go ctx.Broker(createChan)
go ctx.Broker()
http.HandleFunc("/robots.txt", robotsTxtHandler)
http.HandleFunc("/ip", ipHandler)