Encode client-broker messages as json in HTTP body

Send the client poll request and response in a json-encoded format in
the HTTP request body rather than sending the data in HTTP headers. This
will pave the way for using domain-fronting alternatives for the
Snowflake rendezvous.
This commit is contained in:
Cecylia Bocovich 2021-05-05 15:31:39 -04:00
parent ae7cc478fd
commit 270eb21803
7 changed files with 472 additions and 63 deletions

View file

@ -6,6 +6,7 @@ SessionDescriptions in order to negotiate a WebRTC connection.
package main package main
import ( import (
"bytes"
"container/heap" "container/heap"
"crypto/tls" "crypto/tls"
"flag" "flag"
@ -39,6 +40,16 @@ const (
NATUnrestricted = "unrestricted" NATUnrestricted = "unrestricted"
) )
// We support two client message formats. The legacy format is for backwards
// combatability and relies heavily on HTTP headers and status codes to convey
// information.
type clientVersion int
const (
v0 clientVersion = iota //legacy version
v1
)
type BrokerContext struct { type BrokerContext struct {
snowflakes *SnowflakeHeap snowflakes *SnowflakeHeap
restrictedSnowflakes *SnowflakeHeap restrictedSnowflakes *SnowflakeHeap
@ -90,7 +101,7 @@ type MetricsHandler struct {
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID, Snowflake-NAT-Type") w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
// Return early if it's CORS preflight. // Return early if it's CORS preflight.
if "OPTIONS" == r.Method { if "OPTIONS" == r.Method {
return return
@ -170,7 +181,7 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri
snowflake.proxyType = proxyType snowflake.proxyType = proxyType
snowflake.natType = natType snowflake.natType = natType
snowflake.offerChannel = make(chan *ClientOffer) snowflake.offerChannel = make(chan *ClientOffer)
snowflake.answerChannel = make(chan []byte) snowflake.answerChannel = make(chan string)
ctx.snowflakeLock.Lock() ctx.snowflakeLock.Lock()
if natType == NATUnrestricted { if natType == NATUnrestricted {
heap.Push(ctx.snowflakes, snowflake) heap.Push(ctx.snowflakes, snowflake)
@ -245,6 +256,20 @@ type ClientOffer struct {
sdp []byte sdp []byte
} }
// Sends an encoded response to the client and an
// HTTP server error if the response encoding fails
func sendClientResponse(resp *messages.ClientPollResponse, w http.ResponseWriter) {
data, err := resp.EncodePollResponse()
if err != nil {
log.Printf("error encoding answer")
w.WriteHeader(http.StatusInternalServerError)
} else {
if _, err := w.Write([]byte(data)); err != nil {
log.Printf("unable to write answer with error: %v", err)
}
}
}
/* /*
Expects a WebRTC SDP offer in the Request to give to an assigned Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in snowflake proxy, which responds with the SDP answer to be sent in
@ -252,19 +277,55 @@ the HTTP response back to the client.
*/ */
func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
var err error var err error
var version clientVersion
startTime := time.Now() startTime := time.Now()
offer := &ClientOffer{} body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
offer.sdp, err = ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) if err != nil {
if nil != err { log.Printf("Error reading client request: %s", err.Error())
log.Println("Invalid data.") w.WriteHeader(http.StatusInternalServerError)
w.WriteHeader(http.StatusBadRequest)
return return
} }
if len(body) > 0 && body[0] == '{' {
version = v0
} else {
parts := bytes.SplitN(body, []byte("\n"), 2)
if len(parts) < 2 {
// no version number found
err := fmt.Errorf("unsupported message version")
sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
return
}
body = parts[1]
if string(parts[0]) == "1.0" {
version = v1
offer.natType = r.Header.Get("Snowflake-NAT-Type") } else {
if offer.natType == "" { err := fmt.Errorf("unsupported message version")
offer.natType = NATUnknown sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
return
}
}
var offer *ClientOffer
switch version {
case v0:
offer = &ClientOffer{
natType: r.Header.Get("Snowflake-NAT-Type"),
sdp: body,
}
case v1:
req, err := messages.DecodeClientPollRequest(body)
if err != nil {
sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
return
}
offer = &ClientOffer{
natType: req.NAT,
sdp: []byte(req.Offer),
}
default:
panic("unknown version")
} }
// Only hand out known restricted snowflakes to unrestricted clients // Only hand out known restricted snowflakes to unrestricted clients
@ -289,7 +350,15 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
ctx.metrics.clientRestrictedDeniedCount++ ctx.metrics.clientRestrictedDeniedCount++
} }
ctx.metrics.lock.Unlock() ctx.metrics.lock.Unlock()
w.WriteHeader(http.StatusServiceUnavailable) switch version {
case v0:
w.WriteHeader(http.StatusServiceUnavailable)
case v1:
resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"}
sendClientResponse(resp, w)
default:
panic("unknown version")
}
return return
} }
// Otherwise, find the most available snowflake proxy, and pass the offer to it. // Otherwise, find the most available snowflake proxy, and pass the offer to it.
@ -306,17 +375,36 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
ctx.metrics.clientProxyMatchCount++ ctx.metrics.clientProxyMatchCount++
ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc() ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc()
ctx.metrics.lock.Unlock() ctx.metrics.lock.Unlock()
if _, err := w.Write(answer); err != nil { switch version {
log.Printf("unable to write answer with error: %v", err) case v0:
if _, err := w.Write([]byte(answer)); err != nil {
log.Printf("unable to write answer with error: %v", err)
}
case v1:
resp := &messages.ClientPollResponse{Answer: answer}
sendClientResponse(resp, w)
default:
panic("unknown version")
} }
// Initial tracking of elapsed time. // Initial tracking of elapsed time.
ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / ctx.metrics.clientRoundtripEstimate = time.Since(startTime) /
time.Millisecond time.Millisecond
case <-time.After(time.Second * ClientTimeout): case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.") log.Println("Client: Timed out.")
w.WriteHeader(http.StatusGatewayTimeout) switch version {
if _, err := w.Write([]byte("timed out waiting for answer!")); err != nil { case v0:
log.Printf("unable to write timeout error, failed with error: %v", err) w.WriteHeader(http.StatusGatewayTimeout)
if _, err := w.Write(
[]byte("timed out waiting for answer!")); err != nil {
log.Printf("unable to write timeout error, failed with error: %v",
err)
}
case v1:
resp := &messages.ClientPollResponse{
Error: "timed out waiting for answer!"}
sendClientResponse(resp, w)
default:
panic("unknown version")
} }
} }
@ -364,7 +452,7 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
w.Write(b) w.Write(b)
if success { if success {
snowflake.answerChannel <- []byte(answer) snowflake.answerChannel <- answer
} }
} }

View file

@ -70,10 +70,59 @@ func TestBroker(t *testing.T) {
Convey("Responds to client offers...", func() { Convey("Responds to client offers...", func() {
w := httptest.NewRecorder() w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test")) data := bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data) r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil) So(err, ShouldBeNil)
Convey("with error when no snowflakes are available.", func() {
clientOffers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusOK)
So(w.Body.String(), ShouldEqual, `{"error":"no snowflake proxies currently available"}`)
})
Convey("with a proxy answer if available.", func() {
done := make(chan bool)
// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted)
go func() {
clientOffers(ctx, w, r)
done <- true
}()
offer := <-snowflake.offerChannel
So(offer.sdp, ShouldResemble, []byte("fake"))
snowflake.answerChannel <- "fake answer"
<-done
So(w.Body.String(), ShouldEqual, `{"answer":"fake answer"}`)
So(w.Code, ShouldEqual, http.StatusOK)
})
Convey("Times out when no proxy responds.", func() {
if testing.Short() {
return
}
done := make(chan bool)
snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted)
go func() {
clientOffers(ctx, w, r)
// Takes a few seconds here...
done <- true
}()
offer := <-snowflake.offerChannel
So(offer.sdp, ShouldResemble, []byte("fake"))
<-done
So(w.Code, ShouldEqual, http.StatusOK)
So(w.Body.String(), ShouldEqual, `{"error":"timed out waiting for answer!"}`)
})
})
Convey("Responds to legacy client offers...", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("{test}"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
r.Header.Set("Snowflake-NAT-TYPE", "restricted")
Convey("with 503 when no snowflakes are available.", func() { Convey("with 503 when no snowflakes are available.", func() {
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusServiceUnavailable) So(w.Code, ShouldEqual, http.StatusServiceUnavailable)
@ -89,8 +138,8 @@ func TestBroker(t *testing.T) {
done <- true done <- true
}() }()
offer := <-snowflake.offerChannel offer := <-snowflake.offerChannel
So(offer.sdp, ShouldResemble, []byte("test")) So(offer.sdp, ShouldResemble, []byte("{test}"))
snowflake.answerChannel <- []byte("fake answer") snowflake.answerChannel <- "fake answer"
<-done <-done
So(w.Body.String(), ShouldEqual, "fake answer") So(w.Body.String(), ShouldEqual, "fake answer")
So(w.Code, ShouldEqual, http.StatusOK) So(w.Code, ShouldEqual, http.StatusOK)
@ -108,10 +157,11 @@ func TestBroker(t *testing.T) {
done <- true done <- true
}() }()
offer := <-snowflake.offerChannel offer := <-snowflake.offerChannel
So(offer.sdp, ShouldResemble, []byte("test")) So(offer.sdp, ShouldResemble, []byte("{test}"))
<-done <-done
So(w.Code, ShouldEqual, http.StatusGatewayTimeout) So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
}) })
}) })
Convey("Responds to proxy polls...", func() { Convey("Responds to proxy polls...", func() {
@ -163,7 +213,7 @@ func TestBroker(t *testing.T) {
}(ctx) }(ctx)
answer := <-s.answerChannel answer := <-s.answerChannel
So(w.Code, ShouldEqual, http.StatusOK) So(w.Code, ShouldEqual, http.StatusOK)
So(answer, ShouldResemble, []byte("test")) So(answer, ShouldResemble, "test")
}) })
Convey("with client gone status if the proxy is not recognized", func() { Convey("with client gone status if the proxy is not recognized", func() {
@ -272,7 +322,8 @@ func TestBroker(t *testing.T) {
So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil)
// Client request blocks until proxy answer arrives. // Client request blocks until proxy answer arrives.
dataC := bytes.NewReader([]byte("fake offer")) dataC := bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}"))
wC := httptest.NewRecorder() wC := httptest.NewRecorder()
rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC) rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -283,7 +334,7 @@ func TestBroker(t *testing.T) {
<-polled <-polled
So(wP.Code, ShouldEqual, http.StatusOK) So(wP.Code, ShouldEqual, http.StatusOK)
So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake offer","NAT":"unknown"}`) So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake","NAT":"unknown"}`)
So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil)
// Follow up with the answer request afterwards // Follow up with the answer request afterwards
wA := httptest.NewRecorder() wA := httptest.NewRecorder()
@ -295,7 +346,7 @@ func TestBroker(t *testing.T) {
<-done <-done
So(wC.Code, ShouldEqual, http.StatusOK) So(wC.Code, ShouldEqual, http.StatusOK)
So(wC.Body.String(), ShouldEqual, "test") So(wC.Body.String(), ShouldEqual, `{"answer":"test"}`)
}) })
}) })
} }
@ -517,7 +568,8 @@ func TestMetrics(t *testing.T) {
//Test addition of client failures //Test addition of client failures
Convey("for no proxies available", func() { Convey("for no proxies available", func() {
w := httptest.NewRecorder() w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test")) data := bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data) r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -535,7 +587,8 @@ func TestMetrics(t *testing.T) {
//Test addition of client matches //Test addition of client matches
Convey("for client-proxy match", func() { Convey("for client-proxy match", func() {
w := httptest.NewRecorder() w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test")) data := bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data) r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -546,8 +599,8 @@ func TestMetrics(t *testing.T) {
done <- true done <- true
}() }()
offer := <-snowflake.offerChannel offer := <-snowflake.offerChannel
So(offer.sdp, ShouldResemble, []byte("test")) So(offer.sdp, ShouldResemble, []byte("fake"))
snowflake.answerChannel <- []byte("fake answer") snowflake.answerChannel <- "fake answer"
<-done <-done
ctx.metrics.printMetrics() ctx.metrics.printMetrics()
@ -556,22 +609,63 @@ func TestMetrics(t *testing.T) {
//Test rounding boundary //Test rounding boundary
Convey("binning boundary", func() { Convey("binning boundary", func() {
w := httptest.NewRecorder() w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test")) data := bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data) r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil) So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
ctx.metrics.printMetrics() ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\n") So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\n")
w = httptest.NewRecorder()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
buf.Reset() buf.Reset()
ctx.metrics.printMetrics() ctx.metrics.printMetrics()
@ -648,9 +742,9 @@ func TestMetrics(t *testing.T) {
//Test client failures by NAT type //Test client failures by NAT type
Convey("client failures by NAT type", func() { Convey("client failures by NAT type", func() {
w := httptest.NewRecorder() w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test")) data := bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data) r, err := http.NewRequest("POST", "snowflake.broker/client", data)
r.Header.Set("Snowflake-NAT-TYPE", "restricted")
So(err, ShouldBeNil) So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
@ -661,8 +755,9 @@ func TestMetrics(t *testing.T) {
buf.Reset() buf.Reset()
ctx.metrics.zeroMetrics() ctx.metrics.zeroMetrics()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unrestricted\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data) r, err = http.NewRequest("POST", "snowflake.broker/client", data)
r.Header.Set("Snowflake-NAT-TYPE", "unrestricted")
So(err, ShouldBeNil) So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)
@ -673,8 +768,9 @@ func TestMetrics(t *testing.T) {
buf.Reset() buf.Reset()
ctx.metrics.zeroMetrics() ctx.metrics.zeroMetrics()
data = bytes.NewReader(
[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}"))
r, err = http.NewRequest("POST", "snowflake.broker/client", data) r, err = http.NewRequest("POST", "snowflake.broker/client", data)
r.Header.Set("Snowflake-NAT-TYPE", "unknown")
So(err, ShouldBeNil) So(err, ShouldBeNil)
clientOffers(ctx, w, r) clientOffers(ctx, w, r)

View file

@ -13,7 +13,7 @@ type Snowflake struct {
proxyType string proxyType string
natType string natType string
offerChannel chan *ClientOffer offerChannel chan *ClientOffer
answerChannel chan []byte answerChannel chan string
clients int clients int
index int index int
} }

View file

@ -176,7 +176,7 @@ func TestSnowflakeClient(t *testing.T) {
Convey("Rendezvous", t, func() { Convey("Rendezvous", t, func() {
transport := &MockTransport{ transport := &MockTransport{
http.StatusOK, http.StatusOK,
[]byte(`{"type":"answer","sdp":"fake"}`), []byte(`{"answer": "{\"type\":\"answer\",\"sdp\":\"fake\"}" }`),
} }
fakeOffer, err := util.DeserializeSessionDescription(`{"type":"offer","sdp":"test"}`) fakeOffer, err := util.DeserializeSessionDescription(`{"type":"offer","sdp":"test"}`)
if err != nil { if err != nil {
@ -209,26 +209,25 @@ func TestSnowflakeClient(t *testing.T) {
So(answer.SDP, ShouldResemble, "fake") So(answer.SDP, ShouldResemble, "fake")
}) })
Convey("BrokerChannel.Negotiate fails with 503", func() { Convey("BrokerChannel.Negotiate fails", func() {
b, err := NewBrokerChannel("test.broker", "", b, err := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusServiceUnavailable, []byte("\n")}, &MockTransport{http.StatusOK, []byte(`{"error": "no snowflake proxies currently available"}`)},
false) false)
So(err, ShouldBeNil) So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer) answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(answer, ShouldBeNil) So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerError503)
}) })
Convey("BrokerChannel.Negotiate fails with 400", func() { Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
b, err := NewBrokerChannel("test.broker", "", b, err := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusBadRequest, []byte("\n")}, &MockTransport{http.StatusInternalServerError, []byte("\n")},
false) false)
So(err, ShouldBeNil) So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer) answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(answer, ShouldBeNil) So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerError400) So(err.Error(), ShouldResemble, BrokerErrorUnexpected)
}) })
Convey("BrokerChannel.Negotiate fails with large read", func() { Convey("BrokerChannel.Negotiate fails with large read", func() {
@ -242,15 +241,6 @@ func TestSnowflakeClient(t *testing.T) {
So(err.Error(), ShouldResemble, "unexpected EOF") So(err.Error(), ShouldResemble, "unexpected EOF")
}) })
Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{123, []byte("")}, false)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerErrorUnexpected)
})
}) })
} }

View file

@ -19,14 +19,13 @@ import (
"sync" "sync"
"time" "time"
"git.torproject.org/pluggable-transports/snowflake.git/common/messages"
"git.torproject.org/pluggable-transports/snowflake.git/common/nat" "git.torproject.org/pluggable-transports/snowflake.git/common/nat"
"git.torproject.org/pluggable-transports/snowflake.git/common/util" "git.torproject.org/pluggable-transports/snowflake.git/common/util"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
) )
const ( const (
BrokerError503 string = "No snowflake proxies currently available."
BrokerError400 string = "You sent an invalid offer in the request."
BrokerErrorUnexpected string = "Unexpected error, no answer." BrokerErrorUnexpected string = "Unexpected error, no answer."
readLimit = 100000 //Maximum number of bytes to be read from an HTTP response readLimit = 100000 //Maximum number of bytes to be read from an HTTP response
) )
@ -107,7 +106,20 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
if err != nil { if err != nil {
return nil, err return nil, err
} }
data := bytes.NewReader([]byte(offerSDP))
// Encode client poll request
bc.lock.Lock()
req := &messages.ClientPollRequest{
Offer: offerSDP,
NAT: bc.NATType,
}
body, err := req.EncodePollRequest()
bc.lock.Unlock()
if err != nil {
return nil, err
}
data := bytes.NewReader([]byte(body))
// Suffix with broker's client registration handler. // Suffix with broker's client registration handler.
clientURL := bc.url.ResolveReference(&url.URL{Path: "client"}) clientURL := bc.url.ResolveReference(&url.URL{Path: "client"})
request, err := http.NewRequest("POST", clientURL.String(), data) request, err := http.NewRequest("POST", clientURL.String(), data)
@ -117,10 +129,6 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
if "" != bc.Host { // Set true host if necessary. if "" != bc.Host { // Set true host if necessary.
request.Host = bc.Host request.Host = bc.Host
} }
// include NAT-TYPE
bc.lock.Lock()
request.Header.Set("Snowflake-NAT-TYPE", bc.NATType)
bc.lock.Unlock()
resp, err := bc.transport.RoundTrip(request) resp, err := bc.transport.RoundTrip(request)
if nil != err { if nil != err {
return nil, err return nil, err
@ -135,11 +143,15 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
return nil, err return nil, err
} }
log.Printf("Received answer: %s", string(body)) log.Printf("Received answer: %s", string(body))
return util.DeserializeSessionDescription(string(body))
case http.StatusServiceUnavailable: resp, err := messages.DecodeClientPollResponse(body)
return nil, errors.New(BrokerError503) if err != nil {
case http.StatusBadRequest: return nil, err
return nil, errors.New(BrokerError400) }
if resp.Error != "" {
return nil, errors.New(resp.Error)
}
return util.DeserializeSessionDescription(resp.Answer)
default: default:
return nil, errors.New(BrokerErrorUnexpected) return nil, errors.New(BrokerErrorUnexpected)
} }

107
common/messages/client.go Normal file
View file

@ -0,0 +1,107 @@
//Package for communication with the snowflake broker
//import "git.torproject.org/pluggable-transports/snowflake.git/common/messages"
package messages
import (
"encoding/json"
"fmt"
)
const ClientVersion = "1.0"
/* Client--Broker protocol v1.x specification:
All messages contain the version number
followed by a new line and then the message body
<message> := <version>\n<body>
<version> := <digit>.<digit>
<body> := <poll request>|<poll response>
There are two different types of body messages,
each encoded in JSON format
== ClientPollRequest ==
<poll request> :=
{
offer: <sdp offer>
[nat: (unknown|restricted|unrestricted)]
}
The NAT field is optional, and if it is missing a
value of "unknown" will be assumed.
== ClientPollResponse ==
<poll response> :=
{
[answer: <sdp answer>]
[error: <error string>]
}
If the broker succeeded in matching the client with a proxy,
the answer field MUST contain a valid SDP answer, and the
error field MUST be empty. If the answer field is empty, the
error field MUST contain a string explaining with a reason
for the error.
*/
type ClientPollRequest struct {
Offer string `json:"offer"`
NAT string `json:"nat"`
}
// Encodes a poll message from a snowflake client
func (req *ClientPollRequest) EncodePollRequest() ([]byte, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, err
}
return append([]byte(ClientVersion+"\n"), body...), nil
}
// Decodes a poll message from a snowflake client
func DecodeClientPollRequest(data []byte) (*ClientPollRequest, error) {
var message ClientPollRequest
err := json.Unmarshal(data, &message)
if err != nil {
return nil, err
}
if message.Offer == "" {
return nil, fmt.Errorf("no supplied offer")
}
if message.NAT == "" {
message.NAT = "unknown"
}
return &message, nil
}
type ClientPollResponse struct {
Answer string `json:"answer,omitempty"`
Error string `json:"error,omitempty"`
}
// Encodes a poll response for a snowflake client
func (resp *ClientPollResponse) EncodePollResponse() ([]byte, error) {
return json.Marshal(resp)
}
// Decodes a poll response for a snowflake client
// If the Error field is empty, the Answer should be non-empty
func DecodeClientPollResponse(data []byte) (*ClientPollResponse, error) {
var message ClientPollResponse
err := json.Unmarshal(data, &message)
if err != nil {
return nil, err
}
if message.Error == "" && message.Answer == "" {
return nil, fmt.Errorf("received empty broker response")
}
return &message, nil
}

View file

@ -1,6 +1,7 @@
package messages package messages
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"testing" "testing"
@ -252,3 +253,118 @@ func TestEncodeProxyAnswerResponse(t *testing.T) {
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
}) })
} }
func TestDecodeClientPollRequest(t *testing.T) {
Convey("Context", t, func() {
for _, test := range []struct {
natType string
offer string
data string
err error
}{
{
//version 1.0 client message
"unknown",
"fake",
`{"nat":"unknown","offer":"fake"}`,
nil,
},
{
//version 1.0 client message
"unknown",
"fake",
`{"offer":"fake"}`,
nil,
},
{
//unknown version
"",
"",
`{"version":"2.0"}`,
fmt.Errorf(""),
},
{
//no offer
"",
"",
`{"nat":"unknown"}`,
fmt.Errorf(""),
},
} {
req, err := DecodeClientPollRequest([]byte(test.data))
if test.err == nil {
So(req.NAT, ShouldResemble, test.natType)
So(req.Offer, ShouldResemble, test.offer)
}
So(err, ShouldHaveSameTypeAs, test.err)
}
})
}
func TestEncodeClientPollRequests(t *testing.T) {
Convey("Context", t, func() {
req1 := &ClientPollRequest{
NAT: "unknown",
Offer: "fake",
}
b, err := req1.EncodePollRequest()
So(err, ShouldEqual, nil)
fmt.Println(string(b))
parts := bytes.SplitN(b, []byte("\n"), 2)
So(string(parts[0]), ShouldEqual, "1.0")
b = parts[1]
req2, err := DecodeClientPollRequest(b)
So(err, ShouldEqual, nil)
So(req2, ShouldResemble, req1)
})
}
func TestDecodeClientPollResponse(t *testing.T) {
Convey("Context", t, func() {
for _, test := range []struct {
answer string
msg string
data string
}{
{
"fake answer",
"",
`{"answer":"fake answer"}`,
},
{
"",
"no snowflakes",
`{"error":"no snowflakes"}`,
},
} {
resp, err := DecodeClientPollResponse([]byte(test.data))
So(err, ShouldBeNil)
So(resp.Answer, ShouldResemble, test.answer)
So(resp.Error, ShouldResemble, test.msg)
}
})
}
func TestEncodeClientPollResponse(t *testing.T) {
Convey("Context", t, func() {
resp1 := &ClientPollResponse{
Answer: "fake answer",
}
b, err := resp1.EncodePollResponse()
So(err, ShouldEqual, nil)
resp2, err := DecodeClientPollResponse(b)
So(err, ShouldEqual, nil)
So(resp1, ShouldResemble, resp2)
resp1 = &ClientPollResponse{
Error: "failed",
}
b, err = resp1.EncodePollResponse()
So(err, ShouldEqual, nil)
resp2, err = DecodeClientPollResponse(b)
So(err, ShouldEqual, nil)
So(resp1, ShouldResemble, resp2)
})
}