diff --git a/broker/broker.go b/broker/broker.go index 2a253b0..4343de8 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -21,6 +21,7 @@ import ( "syscall" "time" + "git.torproject.org/pluggable-transports/snowflake.git/common/messages" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog" "golang.org/x/crypto/acme/autocert" ) @@ -151,15 +152,16 @@ func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake { For snowflake proxies to request a client from the Broker. */ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { - id := r.Header.Get("X-Session-ID") body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) - if nil != err { + if err != nil { log.Println("Invalid data.") w.WriteHeader(http.StatusBadRequest) return } - if string(body) != id { - log.Println("Mismatched IDs!") + + sid, err := messages.DecodePollRequest(body) + if err != nil { + log.Println("Invalid data.") w.WriteHeader(http.StatusBadRequest) return } @@ -173,14 +175,26 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { } // Wait for a client to avail an offer to the snowflake, or timeout if nil. - offer := ctx.RequestOffer(id) + offer := ctx.RequestOffer(sid) + var b []byte if nil == offer { ctx.metrics.proxyIdleCount++ - w.WriteHeader(http.StatusGatewayTimeout) + + b, err = messages.EncodePollResponse("", false) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Write(b) return } - log.Println("Passing client offer to snowflake.") - if _, err := w.Write(offer); err != nil { + b, err = messages.EncodePollResponse(string(offer), true) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + if _, err := w.Write(b); err != nil { log.Printf("proxyPolls unable to write offer with error: %v", err) } } @@ -235,14 +249,7 @@ an offer from proxyHandler to respond with an answer in an HTTP POST, which the broker will pass back to the original client. */ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { - id := r.Header.Get("X-Session-ID") - snowflake, ok := ctx.idToSnowflake[id] - if !ok || nil == snowflake { - // The snowflake took too long to respond with an answer, so its client - // disappeared / the snowflake is no longer recognized by the Broker. - w.WriteHeader(http.StatusGone) - return - } + body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) if nil != err || nil == body || len(body) <= 0 { log.Println("Invalid data.") @@ -250,7 +257,32 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return } - snowflake.answerChannel <- body + answer, id, err := messages.DecodeAnswerRequest(body) + if err != nil || answer == "" { + log.Println("Invalid data.") + w.WriteHeader(http.StatusBadRequest) + return + } + + var success = true + snowflake, ok := ctx.idToSnowflake[id] + if !ok || nil == snowflake { + // The snowflake took too long to respond with an answer, so its client + // disappeared / the snowflake is no longer recognized by the Broker. + success = false + } + b, err := messages.EncodeAnswerResponse(success) + if err != nil { + log.Printf("Error encoding answer: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) + + if success { + snowflake.answerChannel <- []byte(answer) + } + } func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index 4c78ecd..c35c1d6 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -113,9 +113,8 @@ func TestBroker(t *testing.T) { Convey("Responds to proxy polls...", func() { done := make(chan bool) w := httptest.NewRecorder() - data := bytes.NewReader([]byte("test")) + data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) - r.Header.Set("X-Session-ID", "test") So(err, ShouldBeNil) Convey("with a client offer if available.", func() { @@ -125,57 +124,59 @@ func TestBroker(t *testing.T) { }(ctx) // Pass a fake client offer to this proxy p := <-ctx.proxyPolls - So(p.id, ShouldEqual, "test") + So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") p.offerChannel <- []byte("fake offer") <-done So(w.Code, ShouldEqual, http.StatusOK) - So(w.Body.String(), ShouldEqual, "fake offer") + So(w.Body.String(), ShouldEqual, `{"Status":"client match","Offer":"fake offer"}`) }) - Convey("times out when no client offer is available.", func() { + Convey("return empty 200 OK when no client offer is available.", func() { go func(ctx *BrokerContext) { proxyPolls(ctx, w, r) done <- true }(ctx) p := <-ctx.proxyPolls - So(p.id, ShouldEqual, "test") + So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") // nil means timeout p.offerChannel <- nil <-done - So(w.Body.String(), ShouldEqual, "") - So(w.Code, ShouldEqual, http.StatusGatewayTimeout) + So(w.Body.String(), ShouldEqual, `{"Status":"no match","Offer":""}`) + So(w.Code, ShouldEqual, http.StatusOK) }) }) Convey("Responds to proxy answers...", func() { s := ctx.AddSnowflake("test") w := httptest.NewRecorder() - data := bytes.NewReader([]byte("fake answer")) + data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`)) Convey("by passing to the client if valid.", func() { r, err := http.NewRequest("POST", "snowflake.broker/answer", data) So(err, ShouldBeNil) - r.Header.Set("X-Session-ID", "test") go func(ctx *BrokerContext) { proxyAnswers(ctx, w, r) }(ctx) answer := <-s.answerChannel So(w.Code, ShouldEqual, http.StatusOK) - So(answer, ShouldResemble, []byte("fake answer")) + So(answer, ShouldResemble, []byte("test")) }) - Convey("with error if the proxy is not recognized", func() { - r, err := http.NewRequest("POST", "snowflake.broker/answer", nil) + Convey("with client gone status if the proxy is not recognized", func() { + data = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"invalid","Answer":"test"}`)) + r, err := http.NewRequest("POST", "snowflake.broker/answer", data) So(err, ShouldBeNil) - r.Header.Set("X-Session-ID", "invalid") proxyAnswers(ctx, w, r) - So(w.Code, ShouldEqual, http.StatusGone) + So(w.Code, ShouldEqual, http.StatusOK) + b, err := ioutil.ReadAll(w.Body) + So(err, ShouldBeNil) + So(b, ShouldResemble, []byte(`{"Status":"client gone"}`)) + }) Convey("with error if the proxy gives invalid answer", func() { data := bytes.NewReader(nil) r, err := http.NewRequest("POST", "snowflake.broker/answer", data) - r.Header.Set("X-Session-ID", "test") So(err, ShouldBeNil) proxyAnswers(ctx, w, r) So(w.Code, ShouldEqual, http.StatusBadRequest) @@ -184,7 +185,6 @@ func TestBroker(t *testing.T) { Convey("with error if the proxy writes too much data", func() { data := bytes.NewReader(make([]byte, 100001)) r, err := http.NewRequest("POST", "snowflake.broker/answer", data) - r.Header.Set("X-Session-ID", "test") So(err, ShouldBeNil) proxyAnswers(ctx, w, r) So(w.Code, ShouldEqual, http.StatusBadRequest) @@ -199,11 +199,10 @@ func TestBroker(t *testing.T) { ctx := NewBrokerContext(NullLogger()) // Proxy polls with its ID first... - dataP := bytes.NewReader([]byte("test")) + dataP := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) wP := httptest.NewRecorder() rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP) So(err, ShouldBeNil) - rP.Header.Set("X-Session-ID", "test") go func() { proxyPolls(ctx, wP, rP) polled <- true @@ -211,13 +210,13 @@ func TestBroker(t *testing.T) { // Manually do the Broker goroutine action here for full control. p := <-ctx.proxyPolls - So(p.id, ShouldEqual, "test") + So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") s := ctx.AddSnowflake(p.id) go func() { offer := <-s.offerChannel p.offerChannel <- offer }() - So(ctx.idToSnowflake["test"], ShouldNotBeNil) + So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) // Client request blocks until proxy answer arrives. dataC := bytes.NewReader([]byte("fake offer")) @@ -231,20 +230,19 @@ func TestBroker(t *testing.T) { <-polled So(wP.Code, ShouldEqual, http.StatusOK) - So(wP.Body.String(), ShouldResemble, "fake offer") - So(ctx.idToSnowflake["test"], ShouldNotBeNil) + So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake offer"}`) + So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) // Follow up with the answer request afterwards wA := httptest.NewRecorder() - dataA := bytes.NewReader([]byte("fake answer")) - rA, err := http.NewRequest("POST", "snowflake.broker/proxy", dataA) + dataA := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`)) + rA, err := http.NewRequest("POST", "snowflake.broker/answer", dataA) So(err, ShouldBeNil) - rA.Header.Set("X-Session-ID", "test") proxyAnswers(ctx, wA, rA) So(wA.Code, ShouldEqual, http.StatusOK) <-done So(wC.Code, ShouldEqual, http.StatusOK) - So(wC.Body.String(), ShouldEqual, "fake answer") + So(wC.Body.String(), ShouldEqual, "test") }) } @@ -408,7 +406,7 @@ func TestMetrics(t *testing.T) { //Test addition of proxy polls Convey("for proxy polls", func() { w := httptest.NewRecorder() - data := bytes.NewReader([]byte("test")) + data := bytes.NewReader([]byte("{\"Sid\":\"ymbcCMto7KHNGYlp\",\"Version\":\"1.0\"}")) r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) r.Header.Set("X-Session-ID", "test") r.RemoteAddr = "129.97.208.23:8888" //CA geoip @@ -492,7 +490,7 @@ func TestMetrics(t *testing.T) { //Test unique ip Convey("proxy counts by unique ip", func() { w := httptest.NewRecorder() - data := bytes.NewReader([]byte("test")) + data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) r.Header.Set("X-Session-ID", "test") r.RemoteAddr = "129.97.208.23:8888" //CA geoip @@ -505,7 +503,7 @@ func TestMetrics(t *testing.T) { p.offerChannel <- nil <-done - data = bytes.NewReader([]byte("test")) + data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) if err != nil { log.Printf("unable to get NewRequest with error: %v", err) diff --git a/common/messages/proxy.go b/common/messages/proxy.go new file mode 100644 index 0000000..141fbda --- /dev/null +++ b/common/messages/proxy.go @@ -0,0 +1,214 @@ +//Package for communication with the snowflake broker + +//import "git.torproject.org/pluggable-transports/snowflake.git/common/messages" +package messages + +import ( + "encoding/json" + "fmt" +) + +const version = "1.0" + +/* Version 1.0 specification: + +== ProxyPollRequest == +{ + Sid: [generated session id of proxy] + Version: 1.0 +} + +== ProxyPollResponse == +1) If a client is matched: +HTTP 200 OK +{ + Status: "client match", + { + type: offer, + sdp: [WebRTC SDP] + } +} + +2) If a client is not matched: +HTTP 200 OK + +{ + Status: "no proxies" +} + +3) If the request is malformed: +HTTP 400 BadRequest + +== ProxyAnswerRequest == +{ + Sid: [generated session id of proxy] + Version: 1.0 + Answer: + { + type: answer + sdp: [WebRTC SDP] + } +} + +== ProxyAnswerResponse == +1) If the client retrieved the answer: +HTTP 200 OK + +{ + Status: "success" +} + +2) If the client left: +HTTP 200 OK + +{ + Status: "client gone" +} + +3) If the request is malformed: +HTTP 400 BadRequest + +*/ + +type ProxyPollRequest struct { + Sid string + Version string +} + +func EncodePollRequest(sid string) ([]byte, error) { + return json.Marshal(ProxyPollRequest{ + Sid: sid, + Version: version, + }) +} + +// Decodes a poll message from a snowflake proxy and returns the +// sid of the proxy on success and an error if it failed +func DecodePollRequest(data []byte) (string, error) { + var message ProxyPollRequest + + err := json.Unmarshal(data, &message) + if err != nil { + return "", err + } + if message.Version != "1.0" { + return "", fmt.Errorf("using unknown version") + } + + // Version 1.0 requires an Sid + if message.Sid == "" { + return "", fmt.Errorf("no supplied session id") + } + + return message.Sid, nil +} + +type ProxyPollResponse struct { + Status string + Offer string +} + +func EncodePollResponse(offer string, success bool) ([]byte, error) { + if success { + return json.Marshal(ProxyPollResponse{ + Status: "client match", + Offer: offer, + }) + + } + return json.Marshal(ProxyPollResponse{ + Status: "no match", + }) +} + +// Decodes a poll response from the broker and returns an offer +// If there is a client match, the returned offer string will be non-empty +func DecodePollResponse(data []byte) (string, error) { + var message ProxyPollResponse + + err := json.Unmarshal(data, &message) + if err != nil { + return "", err + } + if message.Status == "" { + return "", fmt.Errorf("received invalid data") + } + + if message.Status == "client match" { + if message.Offer == "" { + return "", fmt.Errorf("no supplied offer") + } + } else { + message.Offer = "" + } + + return message.Offer, nil +} + +type ProxyAnswerRequest struct { + Version string + Sid string + Answer string +} + +func EncodeAnswerRequest(answer string, sid string) ([]byte, error) { + return json.Marshal(ProxyAnswerRequest{ + Version: "1.0", + Sid: sid, + Answer: answer, + }) +} + +// Returns the sdp answer and proxy sid +func DecodeAnswerRequest(data []byte) (string, string, error) { + var message ProxyAnswerRequest + + err := json.Unmarshal(data, &message) + if err != nil { + return "", "", err + } + if message.Version != "1.0" { + return "", "", fmt.Errorf("using unknown version") + } + + if message.Sid == "" || message.Answer == "" { + return "", "", fmt.Errorf("no supplied sid or answer") + } + + return message.Answer, message.Sid, nil +} + +type ProxyAnswerResponse struct { + Status string +} + +func EncodeAnswerResponse(success bool) ([]byte, error) { + if success { + return json.Marshal(ProxyAnswerResponse{ + Status: "success", + }) + + } + return json.Marshal(ProxyAnswerResponse{ + Status: "client gone", + }) +} + +func DecodeAnswerResponse(data []byte) (bool, error) { + var message ProxyAnswerResponse + var success bool + + err := json.Unmarshal(data, &message) + if err != nil { + return success, err + } + if message.Status == "" { + return success, fmt.Errorf("received invalid data") + } + + if message.Status == "success" { + success = true + } + + return success, nil +} diff --git a/common/messages/proxy_test.go b/common/messages/proxy_test.go new file mode 100644 index 0000000..f2f006e --- /dev/null +++ b/common/messages/proxy_test.go @@ -0,0 +1,161 @@ +package messages + +import ( + "encoding/json" + "fmt" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestDecodeProxyPollRequest(t *testing.T) { + Convey("Context", t, func() { + for _, test := range []struct { + sid string + data string + err error + }{ + { + //Version 1.0 proxy message + "ymbcCMto7KHNGYlp", + `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`, + nil, + }, + { + //Version 0.X proxy message: + "", + "ymbcCMto7KHNGYlp", + &json.SyntaxError{}, + }, + { + "", + `{"Sid":"ymbcCMto7KHNGYlp"}`, + fmt.Errorf(""), + }, + { + "", + "{}", + fmt.Errorf(""), + }, + { + "", + `{"Version":"1.0"}`, + fmt.Errorf(""), + }, + { + "", + `{"Version":"2.0"}`, + fmt.Errorf(""), + }, + } { + sid, err := DecodePollRequest([]byte(test.data)) + So(sid, ShouldResemble, test.sid) + So(err, ShouldHaveSameTypeAs, test.err) + } + + }) +} + +func TestEncodeProxyPollRequests(t *testing.T) { + Convey("Context", t, func() { + b, err := EncodePollRequest("ymbcCMto7KHNGYlp") + So(err, ShouldEqual, nil) + sid, err := DecodePollRequest(b) + So(sid, ShouldEqual, "ymbcCMto7KHNGYlp") + So(err, ShouldEqual, nil) + }) +} + +func TestDecodeProxyAnswerRequest(t *testing.T) { + Convey("Context", t, func() { + for _, test := range []struct { + answer string + sid string + data string + err error + }{ + { + "test", + "test", + `{"Version":"1.0","Sid":"test","Answer":"test"}`, + nil, + }, + { + "", + "", + `{"type":"offer","sdp":"v=0\r\no=- 4358805017720277108 2 IN IP4 [scrubbed]\r\ns=-\r\nt=0 0\r\na=group:BUNDLE data\r\na=msid-semantic: WMS\r\nm=application 56688 DTLS/SCTP 5000\r\nc=IN IP4 [scrubbed]\r\na=candidate:3769337065 1 udp 2122260223 [scrubbed] 56688 typ host generation 0 network-id 1 network-cost 50\r\na=candidate:2921887769 1 tcp 1518280447 [scrubbed] 35441 typ host tcptype passive generation 0 network-id 1 network-cost 50\r\na=ice-ufrag:aMAZ\r\na=ice-pwd:jcHb08Jjgrazp2dzjdrvPPvV\r\na=ice-options:trickle\r\na=fingerprint:sha-256 C8:88:EE:B9:E7:02:2E:21:37:ED:7A:D1:EB:2B:A3:15:A2:3B:5B:1C:3D:D4:D5:1F:06:CF:52:40:03:F8:DD:66\r\na=setup:actpass\r\na=mid:data\r\na=sctpmap:5000 webrtc-datachannel 1024\r\n"}`, + fmt.Errorf(""), + }, + { + "", + "", + `{"Version":"1.0","Answer":"test"}`, + fmt.Errorf(""), + }, + { + "", + "", + `{"Version":"1.0","Sid":"test"}`, + fmt.Errorf(""), + }, + } { + answer, sid, err := DecodeAnswerRequest([]byte(test.data)) + So(answer, ShouldResemble, test.answer) + So(sid, ShouldResemble, test.sid) + So(err, ShouldHaveSameTypeAs, test.err) + } + + }) +} + +func TestEncodeProxyAnswerRequest(t *testing.T) { + Convey("Context", t, func() { + b, err := EncodeAnswerRequest("test answer", "test sid") + So(err, ShouldEqual, nil) + answer, sid, err := DecodeAnswerRequest(b) + So(answer, ShouldEqual, "test answer") + So(sid, ShouldEqual, "test sid") + So(err, ShouldEqual, nil) + }) +} + +func TestDecodeProxyAnswerResponse(t *testing.T) { + Convey("Context", t, func() { + for _, test := range []struct { + success bool + data string + err error + }{ + { + true, + `{"Status":"success"}`, + nil, + }, + { + false, + `{"Status":"client gone"}`, + nil, + }, + { + false, + `{"Test":"test"}`, + fmt.Errorf(""), + }, + } { + success, err := DecodeAnswerResponse([]byte(test.data)) + So(success, ShouldResemble, test.success) + So(err, ShouldHaveSameTypeAs, test.err) + } + + }) +} + +func TestEncodeProxyAnswerResponse(t *testing.T) { + Convey("Context", t, func() { + b, err := EncodeAnswerResponse(true) + So(err, ShouldEqual, nil) + success, err := DecodeAnswerResponse(b) + So(success, ShouldEqual, true) + So(err, ShouldEqual, nil) + }) +} diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go index 9b7dad2..ea2a986 100644 --- a/proxy-go/snowflake.go +++ b/proxy-go/snowflake.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "git.torproject.org/pluggable-transports/snowflake.git/common/messages" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog" "github.com/pion/webrtc" "golang.org/x/net/websocket" @@ -168,7 +169,12 @@ func pollOffer(sid string) *webrtc.SessionDescription { timeOfNextPoll = now } - req, _ := http.NewRequest("POST", broker.String(), bytes.NewBuffer([]byte(sid))) + b, err := messages.EncodePollRequest(sid) + if err != nil { + log.Printf("Error encoding poll message: %s", err.Error()) + return nil + } + req, _ := http.NewRequest("POST", broker.String(), bytes.NewBuffer(b)) req.Header.Set("X-Session-ID", sid) resp, err := client.Do(req) if err != nil { @@ -182,7 +188,16 @@ func pollOffer(sid string) *webrtc.SessionDescription { if err != nil { log.Printf("error reading broker response: %s", err) } else { - return deserializeSessionDescription(string(body)) + + offer, err := messages.DecodePollResponse(body) + if err != nil { + log.Printf("error reading broker response: %s", err.Error()) + log.Printf("body: %s", body) + return nil + } + if offer != "" { + return deserializeSessionDescription(offer) + } } } } @@ -191,9 +206,12 @@ func pollOffer(sid string) *webrtc.SessionDescription { func sendAnswer(sid string, pc *webrtc.PeerConnection) error { broker := brokerURL.ResolveReference(&url.URL{Path: "answer"}) - body := bytes.NewBuffer([]byte(serializeSessionDescription(pc.LocalDescription()))) - req, _ := http.NewRequest("POST", broker.String(), body) - req.Header.Set("X-Session-ID", sid) + answer := string([]byte(serializeSessionDescription(pc.LocalDescription()))) + b, err := messages.EncodeAnswerRequest(answer, sid) + if err != nil { + return err + } + req, _ := http.NewRequest("POST", broker.String(), bytes.NewBuffer(b)) resp, err := client.Do(req) if err != nil { return err @@ -201,6 +219,19 @@ func sendAnswer(sid string, pc *webrtc.PeerConnection) error { if resp.StatusCode != http.StatusOK { return fmt.Errorf("broker returned %d", resp.StatusCode) } + + body, err := limitedRead(resp.Body, readLimit) + if err != nil { + return fmt.Errorf("error reading broker response: %s", err) + } + success, err := messages.DecodeAnswerResponse(body) + if err != nil { + return err + } + if !success { + return fmt.Errorf("broker returned client timeout") + } + return nil } diff --git a/proxy/translation b/proxy/translation index 120578e..bbf11bb 160000 --- a/proxy/translation +++ b/proxy/translation @@ -1 +1 @@ -Subproject commit 120578ec9dbf0975fc9ac573130282f628b9747a +Subproject commit bbf11bb0c9f1aca4f6b18c6505645f85e2fa1986