Intermediary refactor teasing apart http / ipc

Introduces an IPC struct and moves the logic out of the http handlers
and into methods on that.
This commit is contained in:
Arlo Breault 2021-05-20 07:49:27 -04:00
parent ced539f234
commit 015958fbe6
4 changed files with 449 additions and 319 deletions

View file

@ -6,15 +6,13 @@ SessionDescriptions in order to negotiate a WebRTC connection.
package main
import (
"bytes"
"container/heap"
"crypto/tls"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
@ -31,23 +29,7 @@ import (
)
const (
ClientTimeout = 10
ProxyTimeout = 10
readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
NATUnknown = "unknown"
NATRestricted = "restricted"
NATUnrestricted = "unrestricted"
)
// We support two client message formats. The legacy format is for backwards
// combatability and relies heavily on HTTP headers and status codes to convey
// information.
type clientVersion int
const (
v0 clientVersion = iota //legacy version
v1
readLimit = 100000 // Maximum number of bytes to be read from an HTTP request
)
type BrokerContext struct {
@ -89,8 +71,8 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
// Implements the http.Handler interface
type SnowflakeHandler struct {
*BrokerContext
handle func(*BrokerContext, http.ResponseWriter, *http.Request)
*IPC
handle func(*IPC, http.ResponseWriter, *http.Request)
}
// Implements the http.Handler interface
@ -106,7 +88,7 @@ func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if "OPTIONS" == r.Method {
return
}
sh.handle(sh.BrokerContext, w, r)
sh.handle(sh.IPC, w, r)
}
func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -199,7 +181,7 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri
/*
For snowflake proxies to request a client from the Broker.
*/
func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
func proxyPolls(i *IPC, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if err != nil {
log.Println("Invalid data.")
@ -207,47 +189,28 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return
}
sid, proxyType, natType, clients, err := messages.DecodePollRequest(body)
if err != nil {
arg := messages.Arg{
Body: body,
RemoteAddr: r.RemoteAddr,
NatType: "",
}
var response []byte
err = i.ProxyPolls(arg, &response)
switch {
case err == nil:
case errors.Is(err, messages.ErrBadRequest):
w.WriteHeader(http.StatusBadRequest)
return
}
// Log geoip stats
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
log.Println("Error processing proxy IP: ", err.Error())
} else {
ctx.metrics.lock.Lock()
ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType)
ctx.metrics.lock.Unlock()
}
// Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer := ctx.RequestOffer(sid, proxyType, natType, clients)
var b []byte
if nil == offer {
ctx.metrics.lock.Lock()
ctx.metrics.proxyIdleCount++
ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc()
ctx.metrics.lock.Unlock()
b, err = messages.EncodePollResponse("", false, "")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(b)
return
}
ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc()
b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType)
if err != nil {
case errors.Is(err, messages.ErrInternal):
fallthrough
default:
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := w.Write(b); err != nil {
if _, err := w.Write(response); err != nil {
log.Printf("proxyPolls unable to write offer with error: %v", err)
}
}
@ -258,162 +221,44 @@ type ClientOffer struct {
sdp []byte
}
// Sends an encoded response to the client and an
// HTTP server error if the response encoding fails
func sendClientResponse(resp *messages.ClientPollResponse, w http.ResponseWriter) {
data, err := resp.EncodePollResponse()
if err != nil {
log.Printf("error encoding answer")
w.WriteHeader(http.StatusInternalServerError)
} else {
if _, err := w.Write([]byte(data)); err != nil {
log.Printf("unable to write answer with error: %v", err)
}
}
}
/*
Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client.
*/
func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
var err error
var version clientVersion
startTime := time.Now()
func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if err != nil {
log.Printf("Error reading client request: %s", err.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
arg := messages.Arg{
Body: body,
RemoteAddr: "",
NatType: r.Header.Get("Snowflake-NAT-Type"),
}
var response []byte
err = i.ClientOffers(arg, &response)
switch {
case err == nil:
case errors.Is(err, messages.ErrUnavailable):
w.WriteHeader(http.StatusServiceUnavailable)
return
case errors.Is(err, messages.ErrTimeout):
w.WriteHeader(http.StatusGatewayTimeout)
return
default:
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if len(body) > 0 && body[0] == '{' {
version = v0
} else {
parts := bytes.SplitN(body, []byte("\n"), 2)
if len(parts) < 2 {
// no version number found
err := fmt.Errorf("unsupported message version")
sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
return
}
body = parts[1]
if string(parts[0]) == "1.0" {
version = v1
} else {
err := fmt.Errorf("unsupported message version")
sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
return
}
if _, err := w.Write(response); err != nil {
log.Printf("clientOffers unable to write answer with error: %v", err)
}
var offer *ClientOffer
switch version {
case v0:
offer = &ClientOffer{
natType: r.Header.Get("Snowflake-NAT-Type"),
sdp: body,
}
case v1:
req, err := messages.DecodeClientPollRequest(body)
if err != nil {
sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
return
}
offer = &ClientOffer{
natType: req.NAT,
sdp: []byte(req.Offer),
}
default:
panic("unknown version")
}
// Only hand out known restricted snowflakes to unrestricted clients
var snowflakeHeap *SnowflakeHeap
if offer.natType == NATUnrestricted {
snowflakeHeap = ctx.restrictedSnowflakes
} else {
snowflakeHeap = ctx.snowflakes
}
// Immediately fail if there are no snowflakes available.
ctx.snowflakeLock.Lock()
numSnowflakes := snowflakeHeap.Len()
ctx.snowflakeLock.Unlock()
if numSnowflakes <= 0 {
ctx.metrics.lock.Lock()
ctx.metrics.clientDeniedCount++
ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc()
if offer.natType == NATUnrestricted {
ctx.metrics.clientUnrestrictedDeniedCount++
} else {
ctx.metrics.clientRestrictedDeniedCount++
}
ctx.metrics.lock.Unlock()
switch version {
case v0:
w.WriteHeader(http.StatusServiceUnavailable)
case v1:
resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"}
sendClientResponse(resp, w)
default:
panic("unknown version")
}
return
}
// Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later.
ctx.snowflakeLock.Lock()
snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
ctx.snowflakeLock.Unlock()
snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel or timeout.
select {
case answer := <-snowflake.answerChannel:
ctx.metrics.lock.Lock()
ctx.metrics.clientProxyMatchCount++
ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc()
ctx.metrics.lock.Unlock()
switch version {
case v0:
if _, err := w.Write([]byte(answer)); err != nil {
log.Printf("unable to write answer with error: %v", err)
}
case v1:
resp := &messages.ClientPollResponse{Answer: answer}
sendClientResponse(resp, w)
default:
panic("unknown version")
}
// Initial tracking of elapsed time.
ctx.metrics.clientRoundtripEstimate = time.Since(startTime) /
time.Millisecond
case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.")
switch version {
case v0:
w.WriteHeader(http.StatusGatewayTimeout)
if _, err := w.Write(
[]byte("timed out waiting for answer!")); err != nil {
log.Printf("unable to write timeout error, failed with error: %v",
err)
}
case v1:
resp := &messages.ClientPollResponse{
Error: "timed out waiting for answer!"}
sendClientResponse(resp, w)
default:
panic("unknown version")
}
}
ctx.snowflakeLock.Lock()
ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": snowflake.natType, "type": snowflake.proxyType}).Dec()
delete(ctx.idToSnowflake, snowflake.id)
ctx.snowflakeLock.Unlock()
}
/*
@ -421,82 +266,51 @@ Expects snowflake proxes which have previously successfully received
an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client.
*/
func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
func proxyAnswers(i *IPC, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if nil != err || nil == body || len(body) <= 0 {
if err != nil {
log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return
}
answer, id, err := messages.DecodeAnswerRequest(body)
if err != nil || answer == "" {
w.WriteHeader(http.StatusBadRequest)
return
arg := messages.Arg{
Body: body,
RemoteAddr: "",
NatType: "",
}
var success = true
ctx.snowflakeLock.Lock()
snowflake, ok := ctx.idToSnowflake[id]
ctx.snowflakeLock.Unlock()
if !ok || nil == snowflake {
// The snowflake took too long to respond with an answer, so its client
// disappeared / the snowflake is no longer recognized by the Broker.
success = false
}
b, err := messages.EncodeAnswerResponse(success)
if err != nil {
log.Printf("Error encoding answer: %s", err.Error())
var response []byte
err = i.ProxyAnswers(arg, &response)
switch {
case err == nil:
case errors.Is(err, messages.ErrBadRequest):
w.WriteHeader(http.StatusBadRequest)
return
case errors.Is(err, messages.ErrInternal):
fallthrough
default:
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(b)
if success {
snowflake.answerChannel <- answer
if _, err := w.Write(response); err != nil {
log.Printf("proxyAnswers unable to write answer response with error: %v", err)
}
}
func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
var webexts, browsers, standalones, unknowns int
var natRestricted, natUnrestricted, natUnknown int
ctx.snowflakeLock.Lock()
s := fmt.Sprintf("current snowflakes available: %d\n", len(ctx.idToSnowflake))
for _, snowflake := range ctx.idToSnowflake {
if snowflake.proxyType == "badge" {
browsers++
} else if snowflake.proxyType == "webext" {
webexts++
} else if snowflake.proxyType == "standalone" {
standalones++
} else {
unknowns++
}
switch snowflake.natType {
case NATRestricted:
natRestricted++
case NATUnrestricted:
natUnrestricted++
default:
natUnknown++
}
func debugHandler(i *IPC, w http.ResponseWriter, r *http.Request) {
var response string
err := i.Debug(new(interface{}), &response)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
ctx.snowflakeLock.Unlock()
s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
s += fmt.Sprintf("\n\twebext proxies: %d", webexts)
s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns)
s += fmt.Sprintf("\nNAT Types available:")
s += fmt.Sprintf("\n\trestricted: %d", natRestricted)
s += fmt.Sprintf("\n\tunrestricted: %d", natUnrestricted)
s += fmt.Sprintf("\n\tunknown: %d", natUnknown)
if _, err := w.Write([]byte(s)); err != nil {
if _, err := w.Write([]byte(response)); err != nil {
log.Printf("writing proxy information returned error: %v ", err)
}
}
@ -589,12 +403,14 @@ func main() {
go ctx.Broker()
i := &IPC{ctx}
http.HandleFunc("/robots.txt", robotsTxtHandler)
http.Handle("/proxy", SnowflakeHandler{ctx, proxyPolls})
http.Handle("/client", SnowflakeHandler{ctx, clientOffers})
http.Handle("/answer", SnowflakeHandler{ctx, proxyAnswers})
http.Handle("/debug", SnowflakeHandler{ctx, debugHandler})
http.Handle("/proxy", SnowflakeHandler{i, proxyPolls})
http.Handle("/client", SnowflakeHandler{i, clientOffers})
http.Handle("/answer", SnowflakeHandler{i, proxyAnswers})
http.Handle("/debug", SnowflakeHandler{i, debugHandler})
http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler})
http.Handle("/prometheus", promhttp.HandlerFor(ctx.metrics.promMetrics.registry, promhttp.HandlerOpts{}))

293
broker/ipc.go Normal file
View file

@ -0,0 +1,293 @@
package main
import (
"bytes"
"container/heap"
"fmt"
"log"
"net"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/common/messages"
"github.com/prometheus/client_golang/prometheus"
)
const (
ClientTimeout = 10
ProxyTimeout = 10
NATUnknown = "unknown"
NATRestricted = "restricted"
NATUnrestricted = "unrestricted"
)
// We support two client message formats. The legacy format is for backwards
// combatability and relies heavily on HTTP headers and status codes to convey
// information.
type clientVersion int
const (
v0 clientVersion = iota //legacy version
v1
)
type IPC struct {
ctx *BrokerContext
}
func (i *IPC) Debug(_ interface{}, response *string) error {
var webexts, browsers, standalones, unknowns int
var natRestricted, natUnrestricted, natUnknown int
i.ctx.snowflakeLock.Lock()
s := fmt.Sprintf("current snowflakes available: %d\n", len(i.ctx.idToSnowflake))
for _, snowflake := range i.ctx.idToSnowflake {
if snowflake.proxyType == "badge" {
browsers++
} else if snowflake.proxyType == "webext" {
webexts++
} else if snowflake.proxyType == "standalone" {
standalones++
} else {
unknowns++
}
switch snowflake.natType {
case NATRestricted:
natRestricted++
case NATUnrestricted:
natUnrestricted++
default:
natUnknown++
}
}
i.ctx.snowflakeLock.Unlock()
s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
s += fmt.Sprintf("\n\twebext proxies: %d", webexts)
s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns)
s += fmt.Sprintf("\nNAT Types available:")
s += fmt.Sprintf("\n\trestricted: %d", natRestricted)
s += fmt.Sprintf("\n\tunrestricted: %d", natUnrestricted)
s += fmt.Sprintf("\n\tunknown: %d", natUnknown)
*response = s
return nil
}
func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
sid, proxyType, natType, clients, err := messages.DecodePollRequest(arg.Body)
if err != nil {
return messages.ErrBadRequest
}
// Log geoip stats
remoteIP, _, err := net.SplitHostPort(arg.RemoteAddr)
if err != nil {
log.Println("Error processing proxy IP: ", err.Error())
} else {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType)
i.ctx.metrics.lock.Unlock()
}
var b []byte
// Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer := i.ctx.RequestOffer(sid, proxyType, natType, clients)
if offer == nil {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.proxyIdleCount++
i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc()
i.ctx.metrics.lock.Unlock()
b, err = messages.EncodePollResponse("", false, "")
if err != nil {
return messages.ErrInternal
}
*response = b
return nil
}
i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc()
b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType)
if err != nil {
return messages.ErrInternal
}
*response = b
return nil
}
func sendClientResponse(resp *messages.ClientPollResponse, response *[]byte) error {
data, err := resp.EncodePollResponse()
if err != nil {
log.Printf("error encoding answer")
return messages.ErrInternal
} else {
*response = []byte(data)
return nil
}
}
func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
var version clientVersion
startTime := time.Now()
body := arg.Body
if len(body) > 0 && body[0] == '{' {
version = v0
} else {
parts := bytes.SplitN(body, []byte("\n"), 2)
if len(parts) < 2 {
// no version number found
err := fmt.Errorf("unsupported message version")
return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
}
body = parts[1]
if string(parts[0]) == "1.0" {
version = v1
} else {
err := fmt.Errorf("unsupported message version")
return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
}
}
var offer *ClientOffer
switch version {
case v0:
offer = &ClientOffer{
natType: arg.NatType,
sdp: body,
}
case v1:
req, err := messages.DecodeClientPollRequest(body)
if err != nil {
return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
}
offer = &ClientOffer{
natType: req.NAT,
sdp: []byte(req.Offer),
}
default:
panic("unknown version")
}
// Only hand out known restricted snowflakes to unrestricted clients
var snowflakeHeap *SnowflakeHeap
if offer.natType == NATUnrestricted {
snowflakeHeap = i.ctx.restrictedSnowflakes
} else {
snowflakeHeap = i.ctx.snowflakes
}
// Immediately fail if there are no snowflakes available.
i.ctx.snowflakeLock.Lock()
numSnowflakes := snowflakeHeap.Len()
i.ctx.snowflakeLock.Unlock()
if numSnowflakes <= 0 {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientDeniedCount++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc()
if offer.natType == NATUnrestricted {
i.ctx.metrics.clientUnrestrictedDeniedCount++
} else {
i.ctx.metrics.clientRestrictedDeniedCount++
}
i.ctx.metrics.lock.Unlock()
switch version {
case v0:
return messages.ErrUnavailable
case v1:
resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"}
return sendClientResponse(resp, response)
default:
panic("unknown version")
}
}
// Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later.
i.ctx.snowflakeLock.Lock()
snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
i.ctx.snowflakeLock.Unlock()
snowflake.offerChannel <- offer
var err error
// Wait for the answer to be returned on the channel or timeout.
select {
case answer := <-snowflake.answerChannel:
i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientProxyMatchCount++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc()
i.ctx.metrics.lock.Unlock()
switch version {
case v0:
*response = []byte(answer)
case v1:
resp := &messages.ClientPollResponse{Answer: answer}
err = sendClientResponse(resp, response)
default:
panic("unknown version")
}
// Initial tracking of elapsed time.
i.ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / time.Millisecond
case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.")
switch version {
case v0:
err = messages.ErrTimeout
case v1:
resp := &messages.ClientPollResponse{
Error: "timed out waiting for answer!"}
err = sendClientResponse(resp, response)
default:
panic("unknown version")
}
}
i.ctx.snowflakeLock.Lock()
i.ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": snowflake.natType, "type": snowflake.proxyType}).Dec()
delete(i.ctx.idToSnowflake, snowflake.id)
i.ctx.snowflakeLock.Unlock()
return err
}
func (i *IPC) ProxyAnswers(arg messages.Arg, response *[]byte) error {
answer, id, err := messages.DecodeAnswerRequest(arg.Body)
if err != nil || answer == "" {
return messages.ErrBadRequest
}
var success = true
i.ctx.snowflakeLock.Lock()
snowflake, ok := i.ctx.idToSnowflake[id]
i.ctx.snowflakeLock.Unlock()
if !ok || snowflake == nil {
// The snowflake took too long to respond with an answer, so its client
// disappeared / the snowflake is no longer recognized by the Broker.
success = false
}
b, err := messages.EncodeAnswerResponse(success)
if err != nil {
log.Printf("Error encoding answer: %s", err.Error())
return messages.ErrInternal
}
*response = b
if success {
snowflake.answerChannel <- answer
}
return nil
}

View file

@ -28,6 +28,7 @@ func TestBroker(t *testing.T) {
Convey("Context", t, func() {
ctx := NewBrokerContext(NullLogger())
i := &IPC{ctx}
Convey("Adds Snowflake", func() {
So(ctx.snowflakes.Len(), ShouldEqual, 0)
@ -76,7 +77,7 @@ func TestBroker(t *testing.T) {
So(err, ShouldBeNil)
Convey("with error when no snowflakes are available.", func() {
clientOffers(ctx, w, r)
clientOffers(i, w, r)
So(w.Code, ShouldEqual, http.StatusOK)
So(w.Body.String(), ShouldEqual, `{"error":"no snowflake proxies currently available"}`)
})
@ -86,7 +87,7 @@ func TestBroker(t *testing.T) {
// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
go func() {
clientOffers(ctx, w, r)
clientOffers(i, w, r)
done <- true
}()
offer := <-snowflake.offerChannel
@ -104,7 +105,7 @@ func TestBroker(t *testing.T) {
done := make(chan bool)
snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
go func() {
clientOffers(ctx, w, r)
clientOffers(i, w, r)
// Takes a few seconds here...
done <- true
}()
@ -124,7 +125,7 @@ func TestBroker(t *testing.T) {
r.Header.Set("Snowflake-NAT-TYPE", "restricted")
Convey("with 503 when no snowflakes are available.", func() {
clientOffers(ctx, w, r)
clientOffers(i, w, r)
So(w.Code, ShouldEqual, http.StatusServiceUnavailable)
So(w.Body.String(), ShouldEqual, "")
})
@ -134,7 +135,7 @@ func TestBroker(t *testing.T) {
// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
go func() {
clientOffers(ctx, w, r)
clientOffers(i, w, r)
done <- true
}()
offer := <-snowflake.offerChannel
@ -152,7 +153,7 @@ func TestBroker(t *testing.T) {
done := make(chan bool)
snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
go func() {
clientOffers(ctx, w, r)
clientOffers(i, w, r)
// Takes a few seconds here...
done <- true
}()
@ -173,7 +174,7 @@ func TestBroker(t *testing.T) {
Convey("with a client offer if available.", func() {
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
proxyPolls(i, w, r)
done <- true
}(ctx)
// Pass a fake client offer to this proxy
@ -187,7 +188,7 @@ func TestBroker(t *testing.T) {
Convey("return empty 200 OK when no client offer is available.", func() {
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
proxyPolls(i, w, r)
done <- true
}(ctx)
p := <-ctx.proxyPolls
@ -209,7 +210,7 @@ func TestBroker(t *testing.T) {
r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyAnswers(ctx, w, r)
proxyAnswers(i, w, r)
}(ctx)
answer := <-s.answerChannel
So(w.Code, ShouldEqual, http.StatusOK)
@ -220,7 +221,7 @@ func TestBroker(t *testing.T) {
data = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"invalid","Answer":"test"}`))
r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
So(err, ShouldBeNil)
proxyAnswers(ctx, w, r)
proxyAnswers(i, w, r)
So(w.Code, ShouldEqual, http.StatusOK)
b, err := ioutil.ReadAll(w.Body)
So(err, ShouldBeNil)
@ -232,7 +233,7 @@ func TestBroker(t *testing.T) {
data := bytes.NewReader(nil)
r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
So(err, ShouldBeNil)
proxyAnswers(ctx, w, r)
proxyAnswers(i, w, r)
So(w.Code, ShouldEqual, http.StatusBadRequest)
})
@ -240,7 +241,7 @@ func TestBroker(t *testing.T) {
data := bytes.NewReader(make([]byte, 100001))
r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
So(err, ShouldBeNil)
proxyAnswers(ctx, w, r)
proxyAnswers(i, w, r)
So(w.Code, ShouldEqual, http.StatusBadRequest)
})
@ -250,6 +251,7 @@ func TestBroker(t *testing.T) {
Convey("End-To-End", t, func() {
ctx := NewBrokerContext(NullLogger())
i := &IPC{ctx}
Convey("Check for client/proxy data race", func() {
proxy_done := make(chan bool)
@ -264,7 +266,7 @@ func TestBroker(t *testing.T) {
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, wp, rp)
proxyPolls(i, wp, rp)
proxy_done <- true
}(ctx)
@ -275,7 +277,7 @@ func TestBroker(t *testing.T) {
So(err, ShouldBeNil)
go func() {
clientOffers(ctx, wc, rc)
clientOffers(i, wc, rc)
client_done <- true
}()
@ -288,7 +290,7 @@ func TestBroker(t *testing.T) {
rp, err = http.NewRequest("POST", "snowflake.broker/answer", datap)
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyAnswers(ctx, wp, rp)
proxyAnswers(i, wp, rp)
proxy_done <- true
}(ctx)
@ -307,7 +309,7 @@ func TestBroker(t *testing.T) {
rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP)
So(err, ShouldBeNil)
go func() {
proxyPolls(ctx, wP, rP)
proxyPolls(i, wP, rP)
polled <- true
}()
@ -328,7 +330,7 @@ func TestBroker(t *testing.T) {
rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC)
So(err, ShouldBeNil)
go func() {
clientOffers(ctx, wC, rC)
clientOffers(i, wC, rC)
done <- true
}()
@ -341,7 +343,7 @@ func TestBroker(t *testing.T) {
dataA := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`))
rA, err := http.NewRequest("POST", "snowflake.broker/answer", dataA)
So(err, ShouldBeNil)
proxyAnswers(ctx, wA, rA)
proxyAnswers(i, wA, rA)
So(wA.Code, ShouldEqual, http.StatusOK)
<-done
@ -503,6 +505,7 @@ func TestMetrics(t *testing.T) {
done := make(chan bool)
buf := new(bytes.Buffer)
ctx := NewBrokerContext(log.New(buf, "", 0))
i := &IPC{ctx}
err := ctx.metrics.LoadGeoipDatabases("test_geoip", "test_geoip6")
So(err, ShouldEqual, nil)
@ -514,10 +517,10 @@ func TestMetrics(t *testing.T) {
r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
go func(i *IPC) {
proxyPolls(i, w, r)
done <- true
}(ctx)
}(i)
p := <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
@ -527,10 +530,10 @@ func TestMetrics(t *testing.T) {
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
go func(i *IPC) {
proxyPolls(i, w, r)
done <- true
}(ctx)
}(i)
p = <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
@ -540,10 +543,10 @@ func TestMetrics(t *testing.T) {
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
go func(i *IPC) {
proxyPolls(i, w, r)
done <- true
}(ctx)
}(i)
p = <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
@ -553,10 +556,10 @@ func TestMetrics(t *testing.T) {
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
go func(i *IPC) {
proxyPolls(i, w, r)
done <- true
}(ctx)
}(i)
p = <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
@ -573,7 +576,7 @@ func TestMetrics(t *testing.T) {
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0")
@ -595,7 +598,7 @@ func TestMetrics(t *testing.T) {
// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
go func() {
clientOffers(ctx, w, r)
clientOffers(i, w, r)
done <- true
}()
offer := <-snowflake.offerChannel
@ -614,49 +617,49 @@ func TestMetrics(t *testing.T) {
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\n")
@ -666,7 +669,7 @@ func TestMetrics(t *testing.T) {
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
buf.Reset()
ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "client-denied-count 16\nclient-restricted-denied-count 16\nclient-unrestricted-denied-count 0\n")
@ -680,7 +683,7 @@ func TestMetrics(t *testing.T) {
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
proxyPolls(i, w, r)
done <- true
}(ctx)
p := <-ctx.proxyPolls //manually unblock poll
@ -693,10 +696,10 @@ func TestMetrics(t *testing.T) {
log.Printf("unable to get NewRequest with error: %v", err)
}
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
go func(i *IPC) {
proxyPolls(i, w, r)
done <- true
}(ctx)
}(i)
p = <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
@ -711,10 +714,10 @@ func TestMetrics(t *testing.T) {
r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
go func(i *IPC) {
proxyPolls(i, w, r)
done <- true
}(ctx)
}(i)
p := <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
@ -728,10 +731,10 @@ func TestMetrics(t *testing.T) {
log.Printf("unable to get NewRequest with error: %v", err)
}
r.RemoteAddr = "129.97.208.24:8888" //CA geoip
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
go func(i *IPC) {
proxyPolls(i, w, r)
done <- true
}(ctx)
}(i)
p = <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
@ -747,7 +750,7 @@ func TestMetrics(t *testing.T) {
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0")
@ -760,7 +763,7 @@ func TestMetrics(t *testing.T) {
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 8\nclient-snowflake-match-count 0")
@ -773,7 +776,7 @@ func TestMetrics(t *testing.T) {
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(i, w, r)
ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0")

18
common/messages/ipc.go Normal file
View file

@ -0,0 +1,18 @@
package messages
import (
"errors"
)
type Arg struct {
Body []byte
RemoteAddr string
NatType string
}
var (
ErrBadRequest = errors.New("bad request")
ErrInternal = errors.New("internal error")
ErrUnavailable = errors.New("service unavailable")
ErrTimeout = errors.New("timeout")
)