initial client roundtrip estimate on broker

This commit is contained in:
Serene H 2016-09-20 06:26:21 -07:00
parent 6cecd31fd8
commit 47e1338290
4 changed files with 37 additions and 8 deletions

View file

@ -26,6 +26,7 @@ type BrokerContext struct {
// the second http POST.
idToSnowflake map[string]*Snowflake
proxyPolls chan *ProxyPoll
metrics *Metrics
}
func NewBrokerContext() *BrokerContext {
@ -35,6 +36,7 @@ func NewBrokerContext() *BrokerContext {
snowflakes: snowflakes,
idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll),
metrics: new(Metrics),
}
}
@ -67,7 +69,7 @@ func (ctx *BrokerContext) RequestOffer(id string) []byte {
request.id = id
request.offerChannel = make(chan []byte)
ctx.proxyPolls <- request
// Block until an offer is available...
// Block until an offer is available, or timeout which sends a nil offer.
offer := <-request.offerChannel
return offer
}
@ -96,6 +98,8 @@ func (ctx *BrokerContext) Broker() {
}
// Create and add a Snowflake to the heap.
// Required to keep track of proxies between providing them
// with an offer and awaiting their second POST with an answer.
func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake {
snowflake := new(Snowflake)
snowflake.id = id
@ -141,29 +145,32 @@ snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client.
*/
func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
offer, err := ioutil.ReadAll(r.Body)
if nil != err {
log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return
}
// Find the most available snowflake proxy, and pass the offer to it.
// TODO: Needs improvement - maybe shouldn't immediately fail?
// Immediately fail if there are no snowflakes available.
if ctx.snowflakes.Len() <= 0 {
log.Println("Client: No snowflake proxies available.")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
// Otherwise, find the most available snowflake proxy, and pass the offer to it.
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
defer delete(ctx.idToSnowflake, snowflake.id)
delete(ctx.idToSnowflake, snowflake.id)
snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel.
// Wait for the answer to be returned on the channel or timeout.
select {
case answer := <-snowflake.answerChannel:
log.Println("Client: Retrieving answer")
w.Write(answer)
// Initial tracking of elapsed time.
ctx.metrics.clientRoundtripEstimate = time.Since(startTime) /
time.Millisecond
case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.")
w.WriteHeader(http.StatusGatewayTimeout)
@ -196,7 +203,11 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
}
func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
s := fmt.Sprintf("current: %d", ctx.snowflakes.Len())
s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
for _, snowflake := range ctx.idToSnowflake {
s += fmt.Sprintf("\nsnowflake %d: %s", snowflake.index, snowflake.id)
}
s += fmt.Sprintf("\n\nroundtrip avg: %d", ctx.metrics.clientRoundtripEstimate)
w.Write([]byte(s))
}

17
broker/metrics.go Normal file
View file

@ -0,0 +1,17 @@
package snowflake_broker
import (
// "golang.org/x/net/internal/timeseries"
"time"
)
// Implements Observable
type Metrics struct {
// snowflakes timeseries.Float
clientRoundtripEstimate time.Duration
}
func NewMetrics() *Metrics {
m := new(Metrics)
return m
}