snowflake/client/lib/rendezvous_test.go
2024-09-08 14:50:08 +04:00

439 lines
16 KiB
Go

package snowflake_client
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/golang/mock/gomock"
"github.com/pion/webrtc/v3"
. "github.com/smartystreets/goconvey/convey"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/amp"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/nat"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/util"
)
// mockTransport's RoundTrip method returns a response with a fake status and
// body.
type mockTransport struct {
statusCode int
body []byte
}
func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return &http.Response{
Status: fmt.Sprintf("%d %s", t.statusCode, http.StatusText(t.statusCode)),
StatusCode: t.statusCode,
Body: io.NopCloser(bytes.NewReader(t.body)),
}, nil
}
// errorTransport's RoundTrip method returns an error.
type errorTransport struct {
err error
}
func (t errorTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, t.err
}
// makeEncPollReq returns an encoded client poll request containing a given
// offer.
func makeEncPollReq(offer string) []byte {
encPollReq, err := (&messages.ClientPollRequest{
Offer: offer,
NAT: nat.NATUnknown,
}).EncodeClientPollRequest()
if err != nil {
panic(err)
}
return encPollReq
}
// makeEncPollResp returns an encoded client poll response with given answer and
// error strings.
func makeEncPollResp(answer, errorStr string) []byte {
encPollResp, err := (&messages.ClientPollResponse{
Answer: answer,
Error: errorStr,
}).EncodePollResponse()
if err != nil {
panic(err)
}
return encPollResp
}
var fakeEncPollReq = makeEncPollReq(`{"type":"offer","sdp":"test"}`)
func TestHTTPRendezvous(t *testing.T) {
Convey("HTTP rendezvous", t, func() {
Convey("Construct httpRendezvous with no front domain", func() {
transport := &mockTransport{http.StatusOK, []byte{}}
rend, err := newHTTPRendezvous("http://test.broker", []string{}, transport)
So(err, ShouldBeNil)
So(rend.brokerURL, ShouldNotBeNil)
So(rend.brokerURL.Host, ShouldResemble, "test.broker")
So(rend.fronts, ShouldEqual, []string{})
So(rend.transport, ShouldEqual, transport)
})
Convey("Construct httpRendezvous *with* front domain", func() {
transport := &mockTransport{http.StatusOK, []byte{}}
rend, err := newHTTPRendezvous("http://test.broker", []string{"front"}, transport)
So(err, ShouldBeNil)
So(rend.brokerURL, ShouldNotBeNil)
So(rend.brokerURL.Host, ShouldResemble, "test.broker")
So(rend.fronts, ShouldContain, "front")
So(rend.transport, ShouldEqual, transport)
})
Convey("httpRendezvous.Exchange responds with answer", func() {
fakeEncPollResp := makeEncPollResp(
`{"answer": "{\"type\":\"answer\",\"sdp\":\"fake\"}" }`,
"",
)
rend, err := newHTTPRendezvous("http://test.broker", []string{},
&mockTransport{http.StatusOK, fakeEncPollResp})
So(err, ShouldBeNil)
answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldBeNil)
So(answer, ShouldResemble, fakeEncPollResp)
})
Convey("httpRendezvous.Exchange responds with no answer", func() {
fakeEncPollResp := makeEncPollResp(
"",
`{"error": "no snowflake proxies currently available"}`,
)
rend, err := newHTTPRendezvous("http://test.broker", []string{},
&mockTransport{http.StatusOK, fakeEncPollResp})
So(err, ShouldBeNil)
answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldBeNil)
So(answer, ShouldResemble, fakeEncPollResp)
})
Convey("httpRendezvous.Exchange fails with unexpected HTTP status code", func() {
rend, err := newHTTPRendezvous("http://test.broker", []string{},
&mockTransport{http.StatusInternalServerError, []byte{}})
So(err, ShouldBeNil)
answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, brokerErrorUnexpected)
})
Convey("httpRendezvous.Exchange fails with error", func() {
transportErr := errors.New("error")
rend, err := newHTTPRendezvous("http://test.broker", []string{},
&errorTransport{err: transportErr})
So(err, ShouldBeNil)
answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldEqual, transportErr)
So(answer, ShouldBeNil)
})
Convey("httpRendezvous.Exchange fails with large read", func() {
rend, err := newHTTPRendezvous("http://test.broker", []string{},
&mockTransport{http.StatusOK, make([]byte, readLimit+1)})
So(err, ShouldBeNil)
_, err = rend.Exchange(fakeEncPollReq)
So(err, ShouldEqual, io.ErrUnexpectedEOF)
})
})
}
func ampArmorEncode(p []byte) []byte {
var buf bytes.Buffer
enc, err := amp.NewArmorEncoder(&buf)
if err != nil {
panic(err)
}
_, err = enc.Write(p)
if err != nil {
panic(err)
}
err = enc.Close()
if err != nil {
panic(err)
}
return buf.Bytes()
}
func TestAMPCacheRendezvous(t *testing.T) {
Convey("AMP cache rendezvous", t, func() {
Convey("Construct ampCacheRendezvous with no cache and no front domain", func() {
transport := &mockTransport{http.StatusOK, []byte{}}
rend, err := newAMPCacheRendezvous("http://test.broker", "", []string{}, transport)
So(err, ShouldBeNil)
So(rend.brokerURL, ShouldNotBeNil)
So(rend.brokerURL.String(), ShouldResemble, "http://test.broker")
So(rend.cacheURL, ShouldBeNil)
So(rend.fronts, ShouldResemble, []string{})
So(rend.transport, ShouldEqual, transport)
})
Convey("Construct ampCacheRendezvous with cache and no front domain", func() {
transport := &mockTransport{http.StatusOK, []byte{}}
rend, err := newAMPCacheRendezvous("http://test.broker", "https://amp.cache/", []string{}, transport)
So(err, ShouldBeNil)
So(rend.brokerURL, ShouldNotBeNil)
So(rend.brokerURL.String(), ShouldResemble, "http://test.broker")
So(rend.cacheURL, ShouldNotBeNil)
So(rend.cacheURL.String(), ShouldResemble, "https://amp.cache/")
So(rend.fronts, ShouldResemble, []string{})
So(rend.transport, ShouldEqual, transport)
})
Convey("Construct ampCacheRendezvous with no cache and front domain", func() {
transport := &mockTransport{http.StatusOK, []byte{}}
rend, err := newAMPCacheRendezvous("http://test.broker", "", []string{"front"}, transport)
So(err, ShouldBeNil)
So(rend.brokerURL, ShouldNotBeNil)
So(rend.brokerURL.String(), ShouldResemble, "http://test.broker")
So(rend.cacheURL, ShouldBeNil)
So(rend.fronts, ShouldContain, "front")
So(rend.transport, ShouldEqual, transport)
})
Convey("Construct ampCacheRendezvous with cache and front domain", func() {
transport := &mockTransport{http.StatusOK, []byte{}}
rend, err := newAMPCacheRendezvous("http://test.broker", "https://amp.cache/", []string{"front"}, transport)
So(err, ShouldBeNil)
So(rend.brokerURL, ShouldNotBeNil)
So(rend.brokerURL.String(), ShouldResemble, "http://test.broker")
So(rend.cacheURL, ShouldNotBeNil)
So(rend.cacheURL.String(), ShouldResemble, "https://amp.cache/")
So(rend.fronts, ShouldContain, "front")
So(rend.transport, ShouldEqual, transport)
})
Convey("ampCacheRendezvous.Exchange responds with answer", func() {
fakeEncPollResp := makeEncPollResp(
`{"answer": "{\"type\":\"answer\",\"sdp\":\"fake\"}" }`,
"",
)
rend, err := newAMPCacheRendezvous("http://test.broker", "", []string{},
&mockTransport{http.StatusOK, ampArmorEncode(fakeEncPollResp)})
So(err, ShouldBeNil)
answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldBeNil)
So(answer, ShouldResemble, fakeEncPollResp)
})
Convey("ampCacheRendezvous.Exchange responds with no answer", func() {
fakeEncPollResp := makeEncPollResp(
"",
`{"error": "no snowflake proxies currently available"}`,
)
rend, err := newAMPCacheRendezvous("http://test.broker", "", []string{},
&mockTransport{http.StatusOK, ampArmorEncode(fakeEncPollResp)})
So(err, ShouldBeNil)
answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldBeNil)
So(answer, ShouldResemble, fakeEncPollResp)
})
Convey("ampCacheRendezvous.Exchange fails with unexpected HTTP status code", func() {
rend, err := newAMPCacheRendezvous("http://test.broker", "", []string{},
&mockTransport{http.StatusInternalServerError, []byte{}})
So(err, ShouldBeNil)
answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, brokerErrorUnexpected)
})
Convey("ampCacheRendezvous.Exchange fails with error", func() {
transportErr := errors.New("error")
rend, err := newAMPCacheRendezvous("http://test.broker", "", []string{},
&errorTransport{err: transportErr})
So(err, ShouldBeNil)
answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldEqual, transportErr)
So(answer, ShouldBeNil)
})
Convey("ampCacheRendezvous.Exchange fails with large read", func() {
// readLimit should apply to the raw HTTP body, not the
// encoded bytes. Encode readLimit bytes—the encoded
// size will be larger—and try to read the body. It
// should fail.
rend, err := newAMPCacheRendezvous("http://test.broker", "", []string{},
&mockTransport{http.StatusOK, ampArmorEncode(make([]byte, readLimit))})
So(err, ShouldBeNil)
_, err = rend.Exchange(fakeEncPollReq)
// We may get io.ErrUnexpectedEOF here, or something
// like "missing </pre> tag".
So(err, ShouldNotBeNil)
})
})
}
func TestSQSRendezvous(t *testing.T) {
Convey("SQS Rendezvous", t, func() {
var sendMessageInput *sqs.SendMessageInput
var getQueueUrlInput *sqs.GetQueueUrlInput
Convey("Construct SQS queue rendezvous", func() {
transport := &mockTransport{http.StatusOK, []byte{}}
rend, err := newSQSRendezvous("https://sqs.us-east-1.amazonaws.com", "eyJhd3MtYWNjZXNzLWtleS1pZCI6InRlc3QtYWNjZXNzLWtleSIsImF3cy1zZWNyZXQta2V5IjoidGVzdC1zZWNyZXQta2V5In0=", transport)
So(err, ShouldBeNil)
So(rend.sqsClient, ShouldNotBeNil)
So(rend.sqsURL, ShouldNotBeNil)
So(rend.sqsURL.String(), ShouldResemble, "https://sqs.us-east-1.amazonaws.com")
})
ctrl := gomock.NewController(t)
mockSqsClient := sqsclient.NewMockSQSClient(ctrl)
responseQueueURL := "https://sqs.us-east-1.amazonaws.com/testing"
sqsUrl, _ := url.Parse("https://sqs.us-east-1.amazonaws.com/broker")
fakeEncPollResp := makeEncPollResp(
`{"answer": "{\"type\":\"answer\",\"sdp\":\"fake\"}" }`,
"",
)
sqsRendezvous := sqsRendezvous{
transport: &mockTransport{http.StatusOK, []byte{}},
sqsClient: mockSqsClient,
sqsURL: sqsUrl,
timeout: 0,
numRetries: 5,
}
Convey("sqsRendezvous.Exchange responds with answer", func() {
sqsClientId := ""
mockSqsClient.EXPECT().SendMessage(gomock.Any(), gomock.AssignableToTypeOf(sendMessageInput)).Do(func(ctx interface{}, input *sqs.SendMessageInput, optFns ...interface{}) {
So(*input.MessageBody, ShouldEqual, string(fakeEncPollResp))
So(*input.QueueUrl, ShouldEqual, sqsUrl.String())
sqsClientId = *input.MessageAttributes["ClientID"].StringValue
})
mockSqsClient.EXPECT().GetQueueUrl(gomock.Any(), gomock.AssignableToTypeOf(getQueueUrlInput)).DoAndReturn(func(ctx interface{}, input *sqs.GetQueueUrlInput, optFns ...interface{}) (*sqs.GetQueueUrlOutput, error) {
So(*input.QueueName, ShouldEqual, "snowflake-client-"+sqsClientId)
return &sqs.GetQueueUrlOutput{
QueueUrl: aws.String(responseQueueURL),
}, nil
})
mockSqsClient.EXPECT().ReceiveMessage(gomock.Any(), gomock.Eq(&sqs.ReceiveMessageInput{
QueueUrl: &responseQueueURL,
MaxNumberOfMessages: 1,
WaitTimeSeconds: 20,
})).Return(&sqs.ReceiveMessageOutput{
Messages: []types.Message{{Body: aws.String("answer")}},
}, nil)
answer, err := sqsRendezvous.Exchange(fakeEncPollResp)
So(answer, ShouldEqual, []byte("answer"))
So(err, ShouldBeNil)
})
Convey("sqsRendezvous.Exchange cannot get queue url", func() {
sqsClientId := ""
mockSqsClient.EXPECT().SendMessage(gomock.Any(), gomock.AssignableToTypeOf(sendMessageInput)).Do(func(ctx interface{}, input *sqs.SendMessageInput, optFns ...interface{}) {
So(*input.MessageBody, ShouldEqual, string(fakeEncPollResp))
So(*input.QueueUrl, ShouldEqual, sqsUrl.String())
sqsClientId = *input.MessageAttributes["ClientID"].StringValue
})
for i := 0; i < sqsRendezvous.numRetries; i++ {
mockSqsClient.EXPECT().GetQueueUrl(gomock.Any(), gomock.AssignableToTypeOf(getQueueUrlInput)).DoAndReturn(func(ctx interface{}, input *sqs.GetQueueUrlInput, optFns ...interface{}) (*sqs.GetQueueUrlOutput, error) {
So(*input.QueueName, ShouldEqual, "snowflake-client-"+sqsClientId)
return nil, errors.New("test error")
})
}
answer, err := sqsRendezvous.Exchange(fakeEncPollResp)
So(answer, ShouldBeNil)
So(err, ShouldNotBeNil)
So(err, ShouldEqual, errors.New("test error"))
})
Convey("sqsRendezvous.Exchange does not receive answer", func() {
sqsClientId := ""
mockSqsClient.EXPECT().SendMessage(gomock.Any(), gomock.AssignableToTypeOf(sendMessageInput)).Do(func(ctx interface{}, input *sqs.SendMessageInput, optFns ...interface{}) {
So(*input.MessageBody, ShouldEqual, string(fakeEncPollResp))
So(*input.QueueUrl, ShouldEqual, sqsUrl.String())
sqsClientId = *input.MessageAttributes["ClientID"].StringValue
})
mockSqsClient.EXPECT().GetQueueUrl(gomock.Any(), gomock.AssignableToTypeOf(getQueueUrlInput)).DoAndReturn(func(ctx interface{}, input *sqs.GetQueueUrlInput, optFns ...interface{}) (*sqs.GetQueueUrlOutput, error) {
So(*input.QueueName, ShouldEqual, "snowflake-client-"+sqsClientId)
return &sqs.GetQueueUrlOutput{
QueueUrl: aws.String(responseQueueURL),
}, nil
})
for i := 0; i < sqsRendezvous.numRetries; i++ {
mockSqsClient.EXPECT().ReceiveMessage(gomock.Any(), gomock.Eq(&sqs.ReceiveMessageInput{
QueueUrl: &responseQueueURL,
MaxNumberOfMessages: 1,
WaitTimeSeconds: 20,
})).Return(&sqs.ReceiveMessageOutput{
Messages: []types.Message{},
}, nil)
}
answer, err := sqsRendezvous.Exchange(fakeEncPollResp)
So(answer, ShouldEqual, []byte{})
So(err, ShouldBeNil)
})
})
}
func TestBrokerChannel(t *testing.T) {
Convey("Requests a proxy and handles response", t, func() {
answerSdp := &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: "test",
}
answerSdpStr, _ := util.SerializeSessionDescription(answerSdp)
serverResponse, _ := (&messages.ClientPollResponse{
Answer: answerSdpStr,
}).EncodePollResponse()
offerSdp := &webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: "test",
}
requestBodyChan := make(chan []byte)
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
go func() {
requestBodyChan <- body
}()
w.Write(serverResponse)
}))
defer mockServer.Close()
brokerChannel, err := newBrokerChannelFromConfig(ClientConfig{
BrokerURL: mockServer.URL,
BridgeFingerprint: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
})
So(err, ShouldBeNil)
brokerChannel.SetNATType(nat.NATRestricted)
answerSdpReturned, err := brokerChannel.Negotiate(offerSdp)
So(err, ShouldBeNil)
So(answerSdpReturned, ShouldEqual, answerSdp)
body := <-requestBodyChan
pollReq, err := messages.DecodeClientPollRequest(body)
So(err, ShouldBeNil)
So(pollReq.Fingerprint, ShouldEqual, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
So(pollReq.NAT, ShouldEqual, nat.NATRestricted)
requestSdp, err := util.DeserializeSessionDescription(pollReq.Offer)
So(err, ShouldBeNil)
So(requestSdp, ShouldEqual, offerSdp)
})
}