diff --git a/.gitignore b/.gitignore index 002f95e..3cf98e8 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ .DS_Store datadir/ broker/broker +broker/http-frontend/http-frontend client/client server/server proxy/proxy diff --git a/broker/broker.go b/broker/broker.go index 906c210..277598b 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -6,48 +6,20 @@ SessionDescriptions in order to negotiate a WebRTC connection. package main import ( - "bytes" "container/heap" - "crypto/tls" "flag" - "fmt" "io" - "io/ioutil" "log" "net" - "net/http" + "net/rpc" "os" "os/signal" - "strings" "sync" "syscall" "time" - "git.torproject.org/pluggable-transports/snowflake.git/common/messages" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "golang.org/x/crypto/acme/autocert" -) - -const ( - ClientTimeout = 10 - ProxyTimeout = 10 - readLimit = 100000 //Maximum number of bytes to be read from an HTTP request - - NATUnknown = "unknown" - NATRestricted = "restricted" - NATUnrestricted = "unrestricted" -) - -// We support two client message formats. The legacy format is for backwards -// combatability and relies heavily on HTTP headers and status codes to convey -// information. -type clientVersion int - -const ( - v0 clientVersion = iota //legacy version - v1 + // "github.com/prometheus/client_golang/prometheus" ) type BrokerContext struct { @@ -87,38 +59,6 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext { } } -// Implements the http.Handler interface -type SnowflakeHandler struct { - *BrokerContext - handle func(*BrokerContext, http.ResponseWriter, *http.Request) -} - -// Implements the http.Handler interface -type MetricsHandler struct { - logFilename string - handle func(string, http.ResponseWriter, *http.Request) -} - -func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID") - // Return early if it's CORS preflight. - if "OPTIONS" == r.Method { - return - } - sh.handle(sh.BrokerContext, w, r) -} - -func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID") - // Return early if it's CORS preflight. - if "OPTIONS" == r.Method { - return - } - mh.handle(mh.logFilename, w, r) -} - // Proxies may poll for client offers concurrently. type ProxyPoll struct { id string @@ -162,7 +102,7 @@ func (ctx *BrokerContext) Broker() { } else { heap.Remove(ctx.restrictedSnowflakes, snowflake.index) } - ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": request.natType, "type": request.proxyType}).Dec() + // ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": request.natType, "type": request.proxyType}).Dec() delete(ctx.idToSnowflake, snowflake.id) close(request.offerChannel) } @@ -188,372 +128,42 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri } else { heap.Push(ctx.restrictedSnowflakes, snowflake) } - ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc() + // ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc() ctx.snowflakeLock.Unlock() ctx.idToSnowflake[id] = snowflake return snowflake } -/* -For snowflake proxies to request a client from the Broker. -*/ -func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) - if err != nil { - log.Println("Invalid data.") - w.WriteHeader(http.StatusBadRequest) - return - } - - sid, proxyType, natType, err := messages.DecodePollRequest(body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - // Log geoip stats - remoteIP, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - log.Println("Error processing proxy IP: ", err.Error()) - } else { - ctx.metrics.lock.Lock() - ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType) - ctx.metrics.lock.Unlock() - } - - // Wait for a client to avail an offer to the snowflake, or timeout if nil. - offer := ctx.RequestOffer(sid, proxyType, natType) - var b []byte - if nil == offer { - ctx.metrics.lock.Lock() - ctx.metrics.proxyIdleCount++ - ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc() - ctx.metrics.lock.Unlock() - - b, err = messages.EncodePollResponse("", false, "") - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.Write(b) - return - } - ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc() - b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - if _, err := w.Write(b); err != nil { - log.Printf("proxyPolls unable to write offer with error: %v", err) - } -} - // Client offer contains an SDP and the NAT type of the client type ClientOffer struct { natType string sdp []byte } -// Sends an encoded response to the client and an -// HTTP server error if the response encoding fails -func sendClientResponse(resp *messages.ClientPollResponse, w http.ResponseWriter) { - data, err := resp.EncodePollResponse() - if err != nil { - log.Printf("error encoding answer") - w.WriteHeader(http.StatusInternalServerError) - } else { - if _, err := w.Write([]byte(data)); err != nil { - log.Printf("unable to write answer with error: %v", err) - } - } -} - -/* -Expects a WebRTC SDP offer in the Request to give to an assigned -snowflake proxy, which responds with the SDP answer to be sent in -the HTTP response back to the client. -*/ -func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { - var err error - var version clientVersion - - startTime := time.Now() - body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) - if err != nil { - log.Printf("Error reading client request: %s", err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - if len(body) > 0 && body[0] == '{' { - version = v0 - } else { - parts := bytes.SplitN(body, []byte("\n"), 2) - if len(parts) < 2 { - // no version number found - err := fmt.Errorf("unsupported message version") - sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w) - return - } - body = parts[1] - if string(parts[0]) == "1.0" { - version = v1 - - } else { - err := fmt.Errorf("unsupported message version") - sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w) - return - } - } - - var offer *ClientOffer - switch version { - case v0: - offer = &ClientOffer{ - natType: r.Header.Get("Snowflake-NAT-Type"), - sdp: body, - } - case v1: - req, err := messages.DecodeClientPollRequest(body) - if err != nil { - sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w) - return - } - offer = &ClientOffer{ - natType: req.NAT, - sdp: []byte(req.Offer), - } - default: - panic("unknown version") - } - - // Only hand out known restricted snowflakes to unrestricted clients - var snowflakeHeap *SnowflakeHeap - if offer.natType == NATUnrestricted { - snowflakeHeap = ctx.restrictedSnowflakes - } else { - snowflakeHeap = ctx.snowflakes - } - - // Immediately fail if there are no snowflakes available. - ctx.snowflakeLock.Lock() - numSnowflakes := snowflakeHeap.Len() - ctx.snowflakeLock.Unlock() - if numSnowflakes <= 0 { - ctx.metrics.lock.Lock() - ctx.metrics.clientDeniedCount++ - ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc() - if offer.natType == NATUnrestricted { - ctx.metrics.clientUnrestrictedDeniedCount++ - } else { - ctx.metrics.clientRestrictedDeniedCount++ - } - ctx.metrics.lock.Unlock() - switch version { - case v0: - w.WriteHeader(http.StatusServiceUnavailable) - case v1: - resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"} - sendClientResponse(resp, w) - default: - panic("unknown version") - } - return - } - // Otherwise, find the most available snowflake proxy, and pass the offer to it. - // Delete must be deferred in order to correctly process answer request later. - ctx.snowflakeLock.Lock() - snowflake := heap.Pop(snowflakeHeap).(*Snowflake) - ctx.snowflakeLock.Unlock() - snowflake.offerChannel <- offer - - // Wait for the answer to be returned on the channel or timeout. - select { - case answer := <-snowflake.answerChannel: - ctx.metrics.lock.Lock() - ctx.metrics.clientProxyMatchCount++ - ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc() - ctx.metrics.lock.Unlock() - switch version { - case v0: - if _, err := w.Write([]byte(answer)); err != nil { - log.Printf("unable to write answer with error: %v", err) - } - case v1: - resp := &messages.ClientPollResponse{Answer: answer} - sendClientResponse(resp, w) - default: - panic("unknown version") - } - // Initial tracking of elapsed time. - ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / - time.Millisecond - case <-time.After(time.Second * ClientTimeout): - log.Println("Client: Timed out.") - switch version { - case v0: - w.WriteHeader(http.StatusGatewayTimeout) - if _, err := w.Write( - []byte("timed out waiting for answer!")); err != nil { - log.Printf("unable to write timeout error, failed with error: %v", - err) - } - case v1: - resp := &messages.ClientPollResponse{ - Error: "timed out waiting for answer!"} - sendClientResponse(resp, w) - default: - panic("unknown version") - } - } - - ctx.snowflakeLock.Lock() - ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": snowflake.natType, "type": snowflake.proxyType}).Dec() - delete(ctx.idToSnowflake, snowflake.id) - ctx.snowflakeLock.Unlock() -} - -/* -Expects snowflake proxes which have previously successfully received -an offer from proxyHandler to respond with an answer in an HTTP POST, -which the broker will pass back to the original client. -*/ -func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { - - body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) - if nil != err || nil == body || len(body) <= 0 { - log.Println("Invalid data.") - w.WriteHeader(http.StatusBadRequest) - return - } - - answer, id, err := messages.DecodeAnswerRequest(body) - if err != nil || answer == "" { - w.WriteHeader(http.StatusBadRequest) - return - } - - var success = true - ctx.snowflakeLock.Lock() - snowflake, ok := ctx.idToSnowflake[id] - ctx.snowflakeLock.Unlock() - if !ok || nil == snowflake { - // The snowflake took too long to respond with an answer, so its client - // disappeared / the snowflake is no longer recognized by the Broker. - success = false - } - b, err := messages.EncodeAnswerResponse(success) - if err != nil { - log.Printf("Error encoding answer: %s", err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - w.Write(b) - - if success { - snowflake.answerChannel <- answer - } - -} - -func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { - - var webexts, browsers, standalones, unknowns int - var natRestricted, natUnrestricted, natUnknown int - ctx.snowflakeLock.Lock() - s := fmt.Sprintf("current snowflakes available: %d\n", len(ctx.idToSnowflake)) - for _, snowflake := range ctx.idToSnowflake { - if snowflake.proxyType == "badge" { - browsers++ - } else if snowflake.proxyType == "webext" { - webexts++ - } else if snowflake.proxyType == "standalone" { - standalones++ - } else { - unknowns++ - } - - switch snowflake.natType { - case NATRestricted: - natRestricted++ - case NATUnrestricted: - natUnrestricted++ - default: - natUnknown++ - } - - } - ctx.snowflakeLock.Unlock() - s += fmt.Sprintf("\tstandalone proxies: %d", standalones) - s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers) - s += fmt.Sprintf("\n\twebext proxies: %d", webexts) - s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns) - - s += fmt.Sprintf("\nNAT Types available:") - s += fmt.Sprintf("\n\trestricted: %d", natRestricted) - s += fmt.Sprintf("\n\tunrestricted: %d", natUnrestricted) - s += fmt.Sprintf("\n\tunknown: %d", natUnknown) - if _, err := w.Write([]byte(s)); err != nil { - log.Printf("writing proxy information returned error: %v ", err) - } -} - -func robotsTxtHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain; charset=utf-8") - if _, err := w.Write([]byte("User-agent: *\nDisallow: /\n")); err != nil { - log.Printf("robotsTxtHandler unable to write, with this error: %v", err) - } -} - -func metricsHandler(metricsFilename string, w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain; charset=utf-8") - - if metricsFilename == "" { - http.NotFound(w, r) - return - } - metricsFile, err := os.OpenFile(metricsFilename, os.O_RDONLY, 0644) - if err != nil { - log.Println("Error opening metrics file for reading") - http.NotFound(w, r) - return - } - - if _, err := io.Copy(w, metricsFile); err != nil { - log.Printf("copying metricsFile returned error: %v", err) - } -} - func main() { - var acmeEmail string - var acmeHostnamesCommas string - var acmeCertCacheDir string - var addr string var geoipDatabase string var geoip6Database string - var disableTLS bool - var certFilename, keyFilename string var disableGeoip bool + var metricsFilename string var unsafeLogging bool - flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications") - flag.StringVar(&acmeHostnamesCommas, "acme-hostnames", "", "comma-separated hostnames for TLS certificate") - flag.StringVar(&certFilename, "cert", "", "TLS certificate file") - flag.StringVar(&keyFilename, "key", "", "TLS private key file") - flag.StringVar(&acmeCertCacheDir, "acme-cert-cache", "acme-cert-cache", "directory in which certificates should be cached") - flag.StringVar(&addr, "addr", ":443", "address to listen on") + var socket string + flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes") flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes") - flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS") flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection") + flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output") flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed") + + flag.StringVar(&socket, "socket", "/tmp/broker.sock", "path to ipc socket") + flag.Parse() var err error var metricsFile io.Writer + var logOutput io.Writer = os.Stderr if unsafeLogging { log.SetOutput(logOutput) @@ -561,7 +171,6 @@ func main() { // We want to send the log output through our scrubber first log.SetOutput(&safelog.LogScrubber{Output: logOutput}) } - log.SetFlags(log.LstdFlags | log.LUTC) if metricsFilename != "" { @@ -587,19 +196,6 @@ func main() { go ctx.Broker() - http.HandleFunc("/robots.txt", robotsTxtHandler) - - http.Handle("/proxy", SnowflakeHandler{ctx, proxyPolls}) - http.Handle("/client", SnowflakeHandler{ctx, clientOffers}) - http.Handle("/answer", SnowflakeHandler{ctx, proxyAnswers}) - http.Handle("/debug", SnowflakeHandler{ctx, debugHandler}) - http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler}) - http.Handle("/prometheus", promhttp.HandlerFor(ctx.metrics.promMetrics.registry, promhttp.HandlerOpts{})) - - server := http.Server{ - Addr: addr, - } - sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGHUP) @@ -616,49 +212,18 @@ func main() { } }() - // Handle the various ways of setting up TLS. The legal configurations - // are: - // --acme-hostnames (with optional --acme-email and/or --acme-cert-cache) - // --cert and --key together - // --disable-tls - // The outputs of this block of code are the disableTLS, - // needHTTP01Listener, certManager, and getCertificate variables. - if acmeHostnamesCommas != "" { - acmeHostnames := strings.Split(acmeHostnamesCommas, ",") - log.Printf("ACME hostnames: %q", acmeHostnames) + // if err := os.RemoveAll(socket); err != nil { + // log.Fatal(err) + // } - var cache autocert.Cache - if err = os.MkdirAll(acmeCertCacheDir, 0700); err != nil { - log.Printf("Warning: Couldn't create cache directory %q (reason: %s) so we're *not* using our certificate cache.", acmeCertCacheDir, err) - } else { - cache = autocert.DirCache(acmeCertCacheDir) - } - - certManager := autocert.Manager{ - Cache: cache, - Prompt: autocert.AcceptTOS, - HostPolicy: autocert.HostWhitelist(acmeHostnames...), - Email: acmeEmail, - } - go func() { - log.Printf("Starting HTTP-01 listener") - log.Fatal(http.ListenAndServe(":80", certManager.HTTPHandler(nil))) - }() - - server.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate} - err = server.ListenAndServeTLS("", "") - } else if certFilename != "" && keyFilename != "" { - if acmeEmail != "" || acmeHostnamesCommas != "" { - log.Fatalf("The --cert and --key options are not allowed with --acme-email or --acme-hostnames.") - } - err = server.ListenAndServeTLS(certFilename, keyFilename) - } else if disableTLS { - err = server.ListenAndServe() - } else { - log.Fatal("the --acme-hostnames, --cert and --key, or --disable-tls option is required") - } + ipc := &IPC{ctx} + rpc.Register(ipc) + l, err := net.Listen("unix", socket) if err != nil { log.Fatal(err) } + defer l.Close() + + rpc.Accept(l) } diff --git a/broker/http-frontend/http.go b/broker/http-frontend/http.go new file mode 100644 index 0000000..5def552 --- /dev/null +++ b/broker/http-frontend/http.go @@ -0,0 +1,344 @@ +package main + +import ( + "crypto/tls" + "errors" + "flag" + "io" + "io/ioutil" + "log" + "net/http" + "net/rpc" + "os" + "strings" + + // "github.com/prometheus/client_golang/prometheus" + // "github.com/prometheus/client_golang/prometheus/promhttp" + "git.torproject.org/pluggable-transports/snowflake.git/common/messages" + "git.torproject.org/pluggable-transports/snowflake.git/common/safelog" + "golang.org/x/crypto/acme/autocert" +) + +const ( + readLimit = 100000 // Maximum number of bytes to be read from an HTTP request +) + +// Implements the http.Handler interface +type SnowflakeHandler struct { + c *rpc.Client + handle func(*rpc.Client, http.ResponseWriter, *http.Request) +} + +func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID") + // Return early if it's CORS preflight. + if "OPTIONS" == r.Method { + return + } + sh.handle(sh.c, w, r) +} + +// Implements the http.Handler interface +type MetricsHandler struct { + logFilename string + handle func(string, http.ResponseWriter, *http.Request) +} + +func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID") + // Return early if it's CORS preflight. + if "OPTIONS" == r.Method { + return + } + mh.handle(mh.logFilename, w, r) +} + +func robotsTxtHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + if _, err := w.Write([]byte("User-agent: *\nDisallow: /\n")); err != nil { + log.Printf("robotsTxtHandler unable to write, with this error: %v", err) + } +} + +func metricsHandler(metricsFilename string, w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + + if metricsFilename == "" { + http.NotFound(w, r) + return + } + metricsFile, err := os.OpenFile(metricsFilename, os.O_RDONLY, 0644) + if err != nil { + log.Println("Error opening metrics file for reading") + http.NotFound(w, r) + return + } + + if _, err := io.Copy(w, metricsFile); err != nil { + log.Printf("copying metricsFile returned error: %v", err) + } +} + +func debugHandler(c *rpc.Client, w http.ResponseWriter, r *http.Request) { + var response string + + err := c.Call("IPC.Debug", new(interface{}), &response) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if _, err := w.Write([]byte(response)); err != nil { + log.Printf("writing proxy information returned error: %v ", err) + } +} + +/* +For snowflake proxies to request a client from the Broker. +*/ +func proxyPolls(c *rpc.Client, w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) + if err != nil { + log.Println("Invalid data.") + w.WriteHeader(http.StatusBadRequest) + return + } + + arg := messages.Arg{ + Body: body, + RemoteAddr: r.RemoteAddr, + } + + var response []byte + err = c.Call("IPC.ProxyPolls", arg, &response) + switch { + case err == nil: + case errors.Is(err, messages.ErrBadRequest): + w.WriteHeader(http.StatusBadRequest) + return + case errors.Is(err, messages.ErrInternal): + fallthrough + default: + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if _, err := w.Write(response); err != nil { + log.Printf("proxyPolls unable to write offer with error: %v", err) + } +} + +/* +Expects a WebRTC SDP offer in the Request to give to an assigned +snowflake proxy, which responds with the SDP answer to be sent in +the HTTP response back to the client. +*/ +func clientOffers(c *rpc.Client, w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) + if err != nil { + log.Printf("Error reading client request: %s", err.Error()) + w.WriteHeader(http.StatusBadRequest) + return + } + + // Handle the legacy version + isLegacy := false + if len(body) > 0 && body[0] == '{' { + isLegacy = true + req := messages.ClientPollRequest{ + Offer: string(body), + NAT: r.Header.Get("Snowflake-NAT-Type"), + } + body, err = req.EncodePollRequest() + if err != nil { + log.Printf("Error shimming the legacy request: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + } + + arg := messages.Arg{ + Body: body, + RemoteAddr: "", + } + + var response []byte + err = c.Call("IPC.ClientOffers", arg, &response) + if err != nil { + // Assert err == messages.ErrInternal + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if isLegacy { + resp, err := messages.DecodeClientPollResponse(response) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + switch resp.Error { + case "": + response = []byte(resp.Answer) + case "no snowflake proxies currently available": + w.WriteHeader(http.StatusServiceUnavailable) + return + case "timed out waiting for answer!": + w.WriteHeader(http.StatusGatewayTimeout) + return + default: + panic("unknown error") + } + } + + if _, err := w.Write(response); err != nil { + log.Printf("clientOffers unable to write answer with error: %v", err) + } +} + +/* +Expects snowflake proxies which have previously successfully received +an offer from proxyHandler to respond with an answer in an HTTP POST, +which the broker will pass back to the original client. +*/ +func proxyAnswers(c *rpc.Client, w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) + if err != nil { + log.Println("Invalid data.") + w.WriteHeader(http.StatusBadRequest) + return + } + + arg := messages.Arg{ + Body: body, + RemoteAddr: "", + } + + var response []byte + err = c.Call("IPC.ProxyAnswers", arg, &response) + switch { + case err == nil: + case errors.Is(err, messages.ErrBadRequest): + w.WriteHeader(http.StatusBadRequest) + return + case errors.Is(err, messages.ErrInternal): + fallthrough + default: + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if _, err := w.Write(response); err != nil { + log.Printf("proxyAnswers unable to write answer response with error: %v", err) + } +} + +func main() { + var acmeEmail string + var acmeHostnamesCommas string + var acmeCertCacheDir string + var addr string + var disableTLS bool + var certFilename, keyFilename string + + var metricsFilename string + var unsafeLogging bool + + var socket string + + flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications") + flag.StringVar(&acmeHostnamesCommas, "acme-hostnames", "", "comma-separated hostnames for TLS certificate") + flag.StringVar(&certFilename, "cert", "", "TLS certificate file") + flag.StringVar(&keyFilename, "key", "", "TLS private key file") + flag.StringVar(&acmeCertCacheDir, "acme-cert-cache", "acme-cert-cache", "directory in which certificates should be cached") + flag.StringVar(&addr, "addr", ":443", "address to listen on") + flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS") + + flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output") + flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed") + + flag.StringVar(&socket, "socket", "/tmp/broker.sock", "path to ipc socket") + + flag.Parse() + + var logOutput io.Writer = os.Stderr + if unsafeLogging { + log.SetOutput(logOutput) + } else { + // We want to send the log output through our scrubber first + log.SetOutput(&safelog.LogScrubber{Output: logOutput}) + } + log.SetFlags(log.LstdFlags | log.LUTC) + + var c, err = rpc.Dial("unix", socket) + if err != nil { + log.Fatal(err) + } + defer c.Close() + + http.HandleFunc("/robots.txt", robotsTxtHandler) + + http.Handle("/proxy", SnowflakeHandler{c, proxyPolls}) + http.Handle("/client", SnowflakeHandler{c, clientOffers}) + http.Handle("/answer", SnowflakeHandler{c, proxyAnswers}) + http.Handle("/debug", SnowflakeHandler{c, debugHandler}) + + http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler}) + // http.Handle("/prometheus", promhttp.HandlerFor(ctx.metrics.promMetrics.registry, promhttp.HandlerOpts{})) + + server := http.Server{ + Addr: addr, + } + + // Handle the various ways of setting up TLS. The legal configurations + // are: + // --acme-hostnames (with optional --acme-email and/or --acme-cert-cache) + // --cert and --key together + // --disable-tls + // The outputs of this block of code are the disableTLS, + // needHTTP01Listener, certManager, and getCertificate variables. + if acmeHostnamesCommas != "" { + acmeHostnames := strings.Split(acmeHostnamesCommas, ",") + log.Printf("ACME hostnames: %q", acmeHostnames) + + var cache autocert.Cache + if err = os.MkdirAll(acmeCertCacheDir, 0700); err != nil { + log.Printf("Warning: Couldn't create cache directory %q (reason: %s) so we're *not* using our certificate cache.", acmeCertCacheDir, err) + } else { + cache = autocert.DirCache(acmeCertCacheDir) + } + + certManager := autocert.Manager{ + Cache: cache, + Prompt: autocert.AcceptTOS, + HostPolicy: autocert.HostWhitelist(acmeHostnames...), + Email: acmeEmail, + } + go func() { + log.Printf("Starting HTTP-01 listener") + log.Fatal(http.ListenAndServe(":80", certManager.HTTPHandler(nil))) + }() + + server.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate} + err = server.ListenAndServeTLS("", "") + } else if certFilename != "" && keyFilename != "" { + if acmeEmail != "" || acmeHostnamesCommas != "" { + log.Fatalf("The --cert and --key options are not allowed with --acme-email or --acme-hostnames.") + } + err = server.ListenAndServeTLS(certFilename, keyFilename) + } else if disableTLS { + err = server.ListenAndServe() + } else { + log.Fatal("the --acme-hostnames, --cert and --key, or --disable-tls option is required") + } + + if err != nil { + log.Fatal(err) + } +} diff --git a/broker/ipc.go b/broker/ipc.go new file mode 100644 index 0000000..ea65c04 --- /dev/null +++ b/broker/ipc.go @@ -0,0 +1,273 @@ +package main + +import ( + "bytes" + "container/heap" + "fmt" + "log" + "net" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/common/messages" + // "github.com/prometheus/client_golang/prometheus" +) + +const ( + ClientTimeout = 10 + ProxyTimeout = 10 + + NATUnknown = "unknown" + NATRestricted = "restricted" + NATUnrestricted = "unrestricted" +) + +type clientVersion int + +const ( + v1 clientVersion = iota +) + +type IPC struct { + ctx *BrokerContext +} + +func (i *IPC) Debug(_ interface{}, response *string) error { + var webexts, browsers, standalones, unknowns int + var natRestricted, natUnrestricted, natUnknown int + + i.ctx.snowflakeLock.Lock() + s := fmt.Sprintf("current snowflakes available: %d\n", len(i.ctx.idToSnowflake)) + for _, snowflake := range i.ctx.idToSnowflake { + if snowflake.proxyType == "badge" { + browsers++ + } else if snowflake.proxyType == "webext" { + webexts++ + } else if snowflake.proxyType == "standalone" { + standalones++ + } else { + unknowns++ + } + + switch snowflake.natType { + case NATRestricted: + natRestricted++ + case NATUnrestricted: + natUnrestricted++ + default: + natUnknown++ + } + + } + i.ctx.snowflakeLock.Unlock() + + s += fmt.Sprintf("\tstandalone proxies: %d", standalones) + s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers) + s += fmt.Sprintf("\n\twebext proxies: %d", webexts) + s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns) + + s += fmt.Sprintf("\nNAT Types available:") + s += fmt.Sprintf("\n\trestricted: %d", natRestricted) + s += fmt.Sprintf("\n\tunrestricted: %d", natUnrestricted) + s += fmt.Sprintf("\n\tunknown: %d", natUnknown) + + *response = s + return nil +} + +func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error { + sid, proxyType, natType, err := messages.DecodePollRequest(arg.Body) + if err != nil { + return messages.ErrBadRequest + } + + // Log geoip stats + remoteIP, _, err := net.SplitHostPort(arg.RemoteAddr) + if err != nil { + log.Println("Error processing proxy IP: ", err.Error()) + } else { + i.ctx.metrics.lock.Lock() + i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType) + i.ctx.metrics.lock.Unlock() + } + + var b []byte + + // Wait for a client to avail an offer to the snowflake, or timeout if nil. + offer := i.ctx.RequestOffer(sid, proxyType, natType) + + if offer == nil { + i.ctx.metrics.lock.Lock() + i.ctx.metrics.proxyIdleCount++ + // i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc() + i.ctx.metrics.lock.Unlock() + + b, err = messages.EncodePollResponse("", false, "") + if err != nil { + return messages.ErrInternal + } + + *response = b + return nil + } + + // i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc() + b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType) + if err != nil { + return messages.ErrInternal + } + *response = b + + return nil +} + +func sendClientResponse(resp *messages.ClientPollResponse, response *[]byte) error { + data, err := resp.EncodePollResponse() + if err != nil { + log.Printf("error encoding answer") + return messages.ErrInternal + } else { + *response = []byte(data) + return nil + } +} + +func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error { + var version clientVersion + + startTime := time.Now() + body := arg.Body + + parts := bytes.SplitN(body, []byte("\n"), 2) + if len(parts) < 2 { + // no version number found + err := fmt.Errorf("unsupported message version") + return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response) + } + body = parts[1] + if string(parts[0]) == "1.0" { + version = v1 + } else { + err := fmt.Errorf("unsupported message version") + return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response) + } + + var offer *ClientOffer + switch version { + case v1: + req, err := messages.DecodeClientPollRequest(body) + if err != nil { + return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response) + } + offer = &ClientOffer{ + natType: req.NAT, + sdp: []byte(req.Offer), + } + default: + panic("unknown version") + } + + // Only hand out known restricted snowflakes to unrestricted clients + var snowflakeHeap *SnowflakeHeap + if offer.natType == NATUnrestricted { + snowflakeHeap = i.ctx.restrictedSnowflakes + } else { + snowflakeHeap = i.ctx.snowflakes + } + + // Immediately fail if there are no snowflakes available. + i.ctx.snowflakeLock.Lock() + numSnowflakes := snowflakeHeap.Len() + i.ctx.snowflakeLock.Unlock() + if numSnowflakes <= 0 { + i.ctx.metrics.lock.Lock() + i.ctx.metrics.clientDeniedCount++ + // i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc() + if offer.natType == NATUnrestricted { + i.ctx.metrics.clientUnrestrictedDeniedCount++ + } else { + i.ctx.metrics.clientRestrictedDeniedCount++ + } + i.ctx.metrics.lock.Unlock() + switch version { + case v1: + resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"} + return sendClientResponse(resp, response) + default: + panic("unknown version") + } + } + + // Otherwise, find the most available snowflake proxy, and pass the offer to it. + // Delete must be deferred in order to correctly process answer request later. + i.ctx.snowflakeLock.Lock() + snowflake := heap.Pop(snowflakeHeap).(*Snowflake) + i.ctx.snowflakeLock.Unlock() + snowflake.offerChannel <- offer + + var err error + + // Wait for the answer to be returned on the channel or timeout. + select { + case answer := <-snowflake.answerChannel: + i.ctx.metrics.lock.Lock() + i.ctx.metrics.clientProxyMatchCount++ + // i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc() + i.ctx.metrics.lock.Unlock() + switch version { + case v1: + resp := &messages.ClientPollResponse{Answer: answer} + err = sendClientResponse(resp, response) + default: + panic("unknown version") + } + // Initial tracking of elapsed time. + i.ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / time.Millisecond + case <-time.After(time.Second * ClientTimeout): + log.Println("Client: Timed out.") + switch version { + case v1: + resp := &messages.ClientPollResponse{ + Error: "timed out waiting for answer!"} + err = sendClientResponse(resp, response) + default: + panic("unknown version") + } + } + + i.ctx.snowflakeLock.Lock() + // i.ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": snowflake.natType, "type": snowflake.proxyType}).Dec() + delete(i.ctx.idToSnowflake, snowflake.id) + i.ctx.snowflakeLock.Unlock() + + return err +} + +func (i *IPC) ProxyAnswers(arg messages.Arg, response *[]byte) error { + answer, id, err := messages.DecodeAnswerRequest(arg.Body) + if err != nil || answer == "" { + return messages.ErrBadRequest + } + + var success = true + i.ctx.snowflakeLock.Lock() + snowflake, ok := i.ctx.idToSnowflake[id] + i.ctx.snowflakeLock.Unlock() + if !ok || snowflake == nil { + // The snowflake took too long to respond with an answer, so its client + // disappeared / the snowflake is no longer recognized by the Broker. + success = false + } + + b, err := messages.EncodeAnswerResponse(success) + if err != nil { + log.Printf("Error encoding answer: %s", err.Error()) + return messages.ErrInternal + } + *response = b + + if success { + snowflake.answerChannel <- answer + } + + return nil +} diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index 646fb02..a81de12 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -1,17 +1,17 @@ package main import ( - "bytes" + // "bytes" "container/heap" "io/ioutil" "log" - "net" - "net/http" - "net/http/httptest" + // "net" + // "net/http" + // "net/http/httptest" "os" "sync" "testing" - "time" + // "time" . "github.com/smartystreets/goconvey/convey" ) @@ -28,6 +28,7 @@ func TestBroker(t *testing.T) { Convey("Context", t, func() { ctx := NewBrokerContext(NullLogger()) + // i := &IPC{ctx} Convey("Adds Snowflake", func() { So(ctx.snowflakes.Len(), ShouldEqual, 0) @@ -68,287 +69,288 @@ func TestBroker(t *testing.T) { So(offer.sdp, ShouldResemble, []byte("test offer")) }) - Convey("Responds to client offers...", func() { - w := httptest.NewRecorder() - data := bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) - r, err := http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) + // Convey("Responds to client offers...", func() { + // w := httptest.NewRecorder() + // data := bytes.NewReader( + // []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) + // r, err := http.NewRequest("POST", "snowflake.broker/client", data) + // So(err, ShouldBeNil) + // + // Convey("with error when no snowflakes are available.", func() { + // clientOffers(i, w, r) + // So(w.Code, ShouldEqual, http.StatusOK) + // So(w.Body.String(), ShouldEqual, `{"error":"no snowflake proxies currently available"}`) + // }) + // + // Convey("with a proxy answer if available.", func() { + // done := make(chan bool) + // // Prepare a fake proxy to respond with. + // snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) + // go func() { + // clientOffers(i, w, r) + // done <- true + // }() + // offer := <-snowflake.offerChannel + // So(offer.sdp, ShouldResemble, []byte("fake")) + // snowflake.answerChannel <- "fake answer" + // <-done + // So(w.Body.String(), ShouldEqual, `{"answer":"fake answer"}`) + // So(w.Code, ShouldEqual, http.StatusOK) + // }) + // + // Convey("Times out when no proxy responds.", func() { + // if testing.Short() { + // return + // } + // done := make(chan bool) + // snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) + // go func() { + // clientOffers(i, w, r) + // // Takes a few seconds here... + // done <- true + // }() + // offer := <-snowflake.offerChannel + // So(offer.sdp, ShouldResemble, []byte("fake")) + // <-done + // So(w.Code, ShouldEqual, http.StatusOK) + // So(w.Body.String(), ShouldEqual, `{"error":"timed out waiting for answer!"}`) + // }) + // }) + // + // Convey("Responds to legacy client offers...", func() { + // w := httptest.NewRecorder() + // data := bytes.NewReader([]byte("{test}")) + // r, err := http.NewRequest("POST", "snowflake.broker/client", data) + // So(err, ShouldBeNil) + // r.Header.Set("Snowflake-NAT-TYPE", "restricted") + // + // Convey("with 503 when no snowflakes are available.", func() { + // clientOffers(i, w, r) + // So(w.Code, ShouldEqual, http.StatusServiceUnavailable) + // So(w.Body.String(), ShouldEqual, "") + // }) + // + // Convey("with a proxy answer if available.", func() { + // done := make(chan bool) + // // Prepare a fake proxy to respond with. + // snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) + // go func() { + // clientOffers(i, w, r) + // done <- true + // }() + // offer := <-snowflake.offerChannel + // So(offer.sdp, ShouldResemble, []byte("{test}")) + // snowflake.answerChannel <- "fake answer" + // <-done + // So(w.Body.String(), ShouldEqual, "fake answer") + // So(w.Code, ShouldEqual, http.StatusOK) + // }) + // + // Convey("Times out when no proxy responds.", func() { + // if testing.Short() { + // return + // } + // done := make(chan bool) + // snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) + // go func() { + // clientOffers(i, w, r) + // // Takes a few seconds here... + // done <- true + // }() + // offer := <-snowflake.offerChannel + // So(offer.sdp, ShouldResemble, []byte("{test}")) + // <-done + // So(w.Code, ShouldEqual, http.StatusGatewayTimeout) + // }) + // + // }) + // + // Convey("Responds to proxy polls...", func() { + // done := make(chan bool) + // w := httptest.NewRecorder() + // data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) + // r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) + // So(err, ShouldBeNil) + // + // Convey("with a client offer if available.", func() { + // go func(ctx *BrokerContext) { + // proxyPolls(i, w, r) + // done <- true + // }(ctx) + // // Pass a fake client offer to this proxy + // p := <-ctx.proxyPolls + // So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") + // p.offerChannel <- &ClientOffer{sdp: []byte("fake offer")} + // <-done + // So(w.Code, ShouldEqual, http.StatusOK) + // So(w.Body.String(), ShouldEqual, `{"Status":"client match","Offer":"fake offer","NAT":""}`) + // }) + // + // Convey("return empty 200 OK when no client offer is available.", func() { + // go func(ctx *BrokerContext) { + // proxyPolls(i, w, r) + // done <- true + // }(ctx) + // p := <-ctx.proxyPolls + // So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") + // // nil means timeout + // p.offerChannel <- nil + // <-done + // So(w.Body.String(), ShouldEqual, `{"Status":"no match","Offer":"","NAT":""}`) + // So(w.Code, ShouldEqual, http.StatusOK) + // }) + // }) - Convey("with error when no snowflakes are available.", func() { - clientOffers(ctx, w, r) - So(w.Code, ShouldEqual, http.StatusOK) - So(w.Body.String(), ShouldEqual, `{"error":"no snowflake proxies currently available"}`) - }) - - Convey("with a proxy answer if available.", func() { - done := make(chan bool) - // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) - go func() { - clientOffers(ctx, w, r) - done <- true - }() - offer := <-snowflake.offerChannel - So(offer.sdp, ShouldResemble, []byte("fake")) - snowflake.answerChannel <- "fake answer" - <-done - So(w.Body.String(), ShouldEqual, `{"answer":"fake answer"}`) - So(w.Code, ShouldEqual, http.StatusOK) - }) - - Convey("Times out when no proxy responds.", func() { - if testing.Short() { - return - } - done := make(chan bool) - snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) - go func() { - clientOffers(ctx, w, r) - // Takes a few seconds here... - done <- true - }() - offer := <-snowflake.offerChannel - So(offer.sdp, ShouldResemble, []byte("fake")) - <-done - So(w.Code, ShouldEqual, http.StatusOK) - So(w.Body.String(), ShouldEqual, `{"error":"timed out waiting for answer!"}`) - }) - }) - - Convey("Responds to legacy client offers...", func() { - w := httptest.NewRecorder() - data := bytes.NewReader([]byte("{test}")) - r, err := http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - r.Header.Set("Snowflake-NAT-TYPE", "restricted") - - Convey("with 503 when no snowflakes are available.", func() { - clientOffers(ctx, w, r) - So(w.Code, ShouldEqual, http.StatusServiceUnavailable) - So(w.Body.String(), ShouldEqual, "") - }) - - Convey("with a proxy answer if available.", func() { - done := make(chan bool) - // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) - go func() { - clientOffers(ctx, w, r) - done <- true - }() - offer := <-snowflake.offerChannel - So(offer.sdp, ShouldResemble, []byte("{test}")) - snowflake.answerChannel <- "fake answer" - <-done - So(w.Body.String(), ShouldEqual, "fake answer") - So(w.Code, ShouldEqual, http.StatusOK) - }) - - Convey("Times out when no proxy responds.", func() { - if testing.Short() { - return - } - done := make(chan bool) - snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) - go func() { - clientOffers(ctx, w, r) - // Takes a few seconds here... - done <- true - }() - offer := <-snowflake.offerChannel - So(offer.sdp, ShouldResemble, []byte("{test}")) - <-done - So(w.Code, ShouldEqual, http.StatusGatewayTimeout) - }) - - }) - - Convey("Responds to proxy polls...", func() { - done := make(chan bool) - w := httptest.NewRecorder() - data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) - r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) - So(err, ShouldBeNil) - - Convey("with a client offer if available.", func() { - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - // Pass a fake client offer to this proxy - p := <-ctx.proxyPolls - So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") - p.offerChannel <- &ClientOffer{sdp: []byte("fake offer")} - <-done - So(w.Code, ShouldEqual, http.StatusOK) - So(w.Body.String(), ShouldEqual, `{"Status":"client match","Offer":"fake offer","NAT":""}`) - }) - - Convey("return empty 200 OK when no client offer is available.", func() { - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p := <-ctx.proxyPolls - So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") - // nil means timeout - p.offerChannel <- nil - <-done - So(w.Body.String(), ShouldEqual, `{"Status":"no match","Offer":"","NAT":""}`) - So(w.Code, ShouldEqual, http.StatusOK) - }) - }) - - Convey("Responds to proxy answers...", func() { - s := ctx.AddSnowflake("test", "", NATUnrestricted) - w := httptest.NewRecorder() - data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`)) - - Convey("by passing to the client if valid.", func() { - r, err := http.NewRequest("POST", "snowflake.broker/answer", data) - So(err, ShouldBeNil) - go func(ctx *BrokerContext) { - proxyAnswers(ctx, w, r) - }(ctx) - answer := <-s.answerChannel - So(w.Code, ShouldEqual, http.StatusOK) - So(answer, ShouldResemble, "test") - }) - - Convey("with client gone status if the proxy is not recognized", func() { - data = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"invalid","Answer":"test"}`)) - r, err := http.NewRequest("POST", "snowflake.broker/answer", data) - So(err, ShouldBeNil) - proxyAnswers(ctx, w, r) - So(w.Code, ShouldEqual, http.StatusOK) - b, err := ioutil.ReadAll(w.Body) - So(err, ShouldBeNil) - So(b, ShouldResemble, []byte(`{"Status":"client gone"}`)) - - }) - - Convey("with error if the proxy gives invalid answer", func() { - data := bytes.NewReader(nil) - r, err := http.NewRequest("POST", "snowflake.broker/answer", data) - So(err, ShouldBeNil) - proxyAnswers(ctx, w, r) - So(w.Code, ShouldEqual, http.StatusBadRequest) - }) - - Convey("with error if the proxy writes too much data", func() { - data := bytes.NewReader(make([]byte, 100001)) - r, err := http.NewRequest("POST", "snowflake.broker/answer", data) - So(err, ShouldBeNil) - proxyAnswers(ctx, w, r) - So(w.Code, ShouldEqual, http.StatusBadRequest) - }) - - }) + // Convey("Responds to proxy answers...", func() { + // s := ctx.AddSnowflake("test", "", NATUnrestricted) + // w := httptest.NewRecorder() + // data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`)) + // + // Convey("by passing to the client if valid.", func() { + // r, err := http.NewRequest("POST", "snowflake.broker/answer", data) + // So(err, ShouldBeNil) + // go func(ctx *BrokerContext) { + // proxyAnswers(i, w, r) + // }(ctx) + // answer := <-s.answerChannel + // So(w.Code, ShouldEqual, http.StatusOK) + // So(answer, ShouldResemble, "test") + // }) + // + // Convey("with client gone status if the proxy is not recognized", func() { + // data = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"invalid","Answer":"test"}`)) + // r, err := http.NewRequest("POST", "snowflake.broker/answer", data) + // So(err, ShouldBeNil) + // proxyAnswers(i, w, r) + // So(w.Code, ShouldEqual, http.StatusOK) + // b, err := ioutil.ReadAll(w.Body) + // So(err, ShouldBeNil) + // So(b, ShouldResemble, []byte(`{"Status":"client gone"}`)) + // + // }) + // + // Convey("with error if the proxy gives invalid answer", func() { + // data := bytes.NewReader(nil) + // r, err := http.NewRequest("POST", "snowflake.broker/answer", data) + // So(err, ShouldBeNil) + // proxyAnswers(i, w, r) + // So(w.Code, ShouldEqual, http.StatusBadRequest) + // }) + // + // Convey("with error if the proxy writes too much data", func() { + // data := bytes.NewReader(make([]byte, 100001)) + // r, err := http.NewRequest("POST", "snowflake.broker/answer", data) + // So(err, ShouldBeNil) + // proxyAnswers(i, w, r) + // So(w.Code, ShouldEqual, http.StatusBadRequest) + // }) + // + // }) }) - Convey("End-To-End", t, func() { - ctx := NewBrokerContext(NullLogger()) - - Convey("Check for client/proxy data race", func() { - proxy_done := make(chan bool) - client_done := make(chan bool) - - go ctx.Broker() - - // Make proxy poll - wp := httptest.NewRecorder() - datap := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) - rp, err := http.NewRequest("POST", "snowflake.broker/proxy", datap) - So(err, ShouldBeNil) - - go func(ctx *BrokerContext) { - proxyPolls(ctx, wp, rp) - proxy_done <- true - }(ctx) - - // Client offer - wc := httptest.NewRecorder() - datac := bytes.NewReader([]byte("test")) - rc, err := http.NewRequest("POST", "snowflake.broker/client", datac) - So(err, ShouldBeNil) - - go func() { - clientOffers(ctx, wc, rc) - client_done <- true - }() - - <-proxy_done - So(wp.Code, ShouldEqual, http.StatusOK) - - // Proxy answers - wp = httptest.NewRecorder() - datap = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`)) - rp, err = http.NewRequest("POST", "snowflake.broker/answer", datap) - So(err, ShouldBeNil) - go func(ctx *BrokerContext) { - proxyAnswers(ctx, wp, rp) - proxy_done <- true - }(ctx) - - <-proxy_done - <-client_done - - }) - - Convey("Ensure correct snowflake brokering", func() { - done := make(chan bool) - polled := make(chan bool) - - // Proxy polls with its ID first... - dataP := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) - wP := httptest.NewRecorder() - rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP) - So(err, ShouldBeNil) - go func() { - proxyPolls(ctx, wP, rP) - polled <- true - }() - - // Manually do the Broker goroutine action here for full control. - p := <-ctx.proxyPolls - So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") - s := ctx.AddSnowflake(p.id, "", NATUnrestricted) - go func() { - offer := <-s.offerChannel - p.offerChannel <- offer - }() - So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) - - // Client request blocks until proxy answer arrives. - dataC := bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) - wC := httptest.NewRecorder() - rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC) - So(err, ShouldBeNil) - go func() { - clientOffers(ctx, wC, rC) - done <- true - }() - - <-polled - So(wP.Code, ShouldEqual, http.StatusOK) - So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake","NAT":"unknown"}`) - So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) - // Follow up with the answer request afterwards - wA := httptest.NewRecorder() - dataA := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`)) - rA, err := http.NewRequest("POST", "snowflake.broker/answer", dataA) - So(err, ShouldBeNil) - proxyAnswers(ctx, wA, rA) - So(wA.Code, ShouldEqual, http.StatusOK) - - <-done - So(wC.Code, ShouldEqual, http.StatusOK) - So(wC.Body.String(), ShouldEqual, `{"answer":"test"}`) - }) - }) + // Convey("End-To-End", t, func() { + // ctx := NewBrokerContext(NullLogger()) + // i := &IPC{ctx} + // + // Convey("Check for client/proxy data race", func() { + // proxy_done := make(chan bool) + // client_done := make(chan bool) + // + // go ctx.Broker() + // + // // Make proxy poll + // wp := httptest.NewRecorder() + // datap := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) + // rp, err := http.NewRequest("POST", "snowflake.broker/proxy", datap) + // So(err, ShouldBeNil) + // + // go func(ctx *BrokerContext) { + // proxyPolls(i, wp, rp) + // proxy_done <- true + // }(ctx) + // + // // Client offer + // wc := httptest.NewRecorder() + // datac := bytes.NewReader([]byte("test")) + // rc, err := http.NewRequest("POST", "snowflake.broker/client", datac) + // So(err, ShouldBeNil) + // + // go func() { + // clientOffers(i, wc, rc) + // client_done <- true + // }() + // + // <-proxy_done + // So(wp.Code, ShouldEqual, http.StatusOK) + // + // // Proxy answers + // wp = httptest.NewRecorder() + // datap = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`)) + // rp, err = http.NewRequest("POST", "snowflake.broker/answer", datap) + // So(err, ShouldBeNil) + // go func(ctx *BrokerContext) { + // proxyAnswers(i, wp, rp) + // proxy_done <- true + // }(ctx) + // + // <-proxy_done + // <-client_done + // + // }) + // + // Convey("Ensure correct snowflake brokering", func() { + // done := make(chan bool) + // polled := make(chan bool) + // + // // Proxy polls with its ID first... + // dataP := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) + // wP := httptest.NewRecorder() + // rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP) + // So(err, ShouldBeNil) + // go func() { + // proxyPolls(i, wP, rP) + // polled <- true + // }() + // + // // Manually do the Broker goroutine action here for full control. + // p := <-ctx.proxyPolls + // So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") + // s := ctx.AddSnowflake(p.id, "", NATUnrestricted) + // go func() { + // offer := <-s.offerChannel + // p.offerChannel <- offer + // }() + // So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) + // + // // Client request blocks until proxy answer arrives. + // dataC := bytes.NewReader( + // []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) + // wC := httptest.NewRecorder() + // rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC) + // So(err, ShouldBeNil) + // go func() { + // clientOffers(i, wC, rC) + // done <- true + // }() + // + // <-polled + // So(wP.Code, ShouldEqual, http.StatusOK) + // So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake","NAT":"unknown"}`) + // So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) + // // Follow up with the answer request afterwards + // wA := httptest.NewRecorder() + // dataA := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`)) + // rA, err := http.NewRequest("POST", "snowflake.broker/answer", dataA) + // So(err, ShouldBeNil) + // proxyAnswers(i, wA, rA) + // So(wA.Code, ShouldEqual, http.StatusOK) + // + // <-done + // So(wC.Code, ShouldEqual, http.StatusOK) + // So(wC.Body.String(), ShouldEqual, `{"answer":"test"}`) + // }) + // }) } func TestSnowflakeHeap(t *testing.T) { @@ -392,406 +394,405 @@ func TestSnowflakeHeap(t *testing.T) { So(r.clients, ShouldEqual, 5) So(r.index, ShouldEqual, -1) }) + + // Convey("End-To-End", t, func() { + // ctx := NewBrokerContext(NullLogger()) + // i := &IPC{ctx} + // + // Convey("Check for client/proxy data race", func() { + // proxy_done := make(chan bool) + // client_done := make(chan bool) + // + // go ctx.Broker() + // + // // Make proxy poll + // wp := httptest.NewRecorder() + // datap := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) + // rp, err := http.NewRequest("POST", "snowflake.broker/proxy", datap) + // So(err, ShouldBeNil) + // + // go func(ctx *BrokerContext) { + // proxyPolls(i, wp, rp) + // proxy_done <- true + // }(ctx) + // + // // Client offer + // wc := httptest.NewRecorder() + // datac := bytes.NewReader([]byte("test")) + // rc, err := http.NewRequest("POST", "snowflake.broker/client", datac) + // So(err, ShouldBeNil) + // + // go func() { + // clientOffers(i, wc, rc) + // client_done <- true + // }() + // + // <-proxy_done + // So(wp.Code, ShouldEqual, http.StatusOK) + // + // // Proxy answers + // wp = httptest.NewRecorder() + // datap = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`)) + // rp, err = http.NewRequest("POST", "snowflake.broker/answer", datap) + // So(err, ShouldBeNil) + // go func(ctx *BrokerContext) { + // proxyAnswers(i, wp, rp) + // proxy_done <- true + // }(ctx) + // + // <-proxy_done + // <-client_done + // + // }) + // + // Convey("Ensure correct snowflake brokering", func() { + // done := make(chan bool) + // polled := make(chan bool) + // + // // Proxy polls with its ID first... + // dataP := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) + // wP := httptest.NewRecorder() + // rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP) + // So(err, ShouldBeNil) + // go func() { + // proxyPolls(i, wP, rP) + // polled <- true + // }() + // + // // Manually do the Broker goroutine action here for full control. + // p := <-ctx.proxyPolls + // So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") + // s := ctx.AddSnowflake(p.id, "", NATUnrestricted) + // go func() { + // offer := <-s.offerChannel + // p.offerChannel <- offer + // }() + // So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) + // + // // Client request blocks until proxy answer arrives. + // dataC := bytes.NewReader([]byte("fake offer")) + // wC := httptest.NewRecorder() + // rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC) + // So(err, ShouldBeNil) + // go func() { + // clientOffers(i, wC, rC) + // done <- true + // }() + // + // <-polled + // So(wP.Code, ShouldEqual, http.StatusOK) + // So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake offer","NAT":"unknown"}`) + // So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) + // // Follow up with the answer request afterwards + // wA := httptest.NewRecorder() + // dataA := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`)) + // rA, err := http.NewRequest("POST", "snowflake.broker/answer", dataA) + // So(err, ShouldBeNil) + // proxyAnswers(i, wA, rA) + // So(wA.Code, ShouldEqual, http.StatusOK) + // + // <-done + // So(wC.Code, ShouldEqual, http.StatusOK) + // So(wC.Body.String(), ShouldEqual, "test") + // }) + // }) } -func TestGeoip(t *testing.T) { - Convey("Geoip", t, func() { - tv4 := new(GeoIPv4Table) - err := GeoIPLoadFile(tv4, "test_geoip") - So(err, ShouldEqual, nil) - tv6 := new(GeoIPv6Table) - err = GeoIPLoadFile(tv6, "test_geoip6") - So(err, ShouldEqual, nil) - - Convey("IPv4 Country Mapping Tests", func() { - for _, test := range []struct { - addr, cc string - ok bool - }{ - { - "129.97.208.23", //uwaterloo - "CA", - true, - }, - { - "127.0.0.1", - "", - false, - }, - { - "255.255.255.255", - "", - false, - }, - { - "0.0.0.0", - "", - false, - }, - { - "223.252.127.255", //test high end of range - "JP", - true, - }, - { - "223.252.127.255", //test low end of range - "JP", - true, - }, - } { - country, ok := GetCountryByAddr(tv4, net.ParseIP(test.addr)) - So(country, ShouldEqual, test.cc) - So(ok, ShouldResemble, test.ok) - } - }) - - Convey("IPv6 Country Mapping Tests", func() { - for _, test := range []struct { - addr, cc string - ok bool - }{ - { - "2620:101:f000:0:250:56ff:fe80:168e", //uwaterloo - "CA", - true, - }, - { - "fd00:0:0:0:0:0:0:1", - "", - false, - }, - { - "0:0:0:0:0:0:0:0", - "", - false, - }, - { - "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", - "", - false, - }, - { - "2a07:2e47:ffff:ffff:ffff:ffff:ffff:ffff", //test high end of range - "FR", - true, - }, - { - "2a07:2e40::", //test low end of range - "FR", - true, - }, - } { - country, ok := GetCountryByAddr(tv6, net.ParseIP(test.addr)) - So(country, ShouldEqual, test.cc) - So(ok, ShouldResemble, test.ok) - } - }) - - // Make sure things behave properly if geoip file fails to load - ctx := NewBrokerContext(NullLogger()) - if err := ctx.metrics.LoadGeoipDatabases("invalid_filename", "invalid_filename6"); err != nil { - log.Printf("loading geo ip databases returned error: %v", err) - } - ctx.metrics.UpdateCountryStats("127.0.0.1", "", NATUnrestricted) - So(ctx.metrics.tablev4, ShouldEqual, nil) - - }) -} - -func TestMetrics(t *testing.T) { - Convey("Test metrics...", t, func() { - done := make(chan bool) - buf := new(bytes.Buffer) - ctx := NewBrokerContext(log.New(buf, "", 0)) - - err := ctx.metrics.LoadGeoipDatabases("test_geoip", "test_geoip6") - So(err, ShouldEqual, nil) - - //Test addition of proxy polls - Convey("for proxy polls", func() { - w := httptest.NewRecorder() - data := bytes.NewReader([]byte("{\"Sid\":\"ymbcCMto7KHNGYlp\",\"Version\":\"1.0\"}")) - r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) - r.RemoteAddr = "129.97.208.23:8888" //CA geoip - So(err, ShouldBeNil) - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p := <-ctx.proxyPolls //manually unblock poll - p.offerChannel <- nil - <-done - - w = httptest.NewRecorder() - data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"standalone"}`)) - r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) - r.RemoteAddr = "129.97.208.23:8888" //CA geoip - So(err, ShouldBeNil) - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p = <-ctx.proxyPolls //manually unblock poll - p.offerChannel <- nil - <-done - - w = httptest.NewRecorder() - data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"badge"}`)) - r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) - r.RemoteAddr = "129.97.208.23:8888" //CA geoip - So(err, ShouldBeNil) - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p = <-ctx.proxyPolls //manually unblock poll - p.offerChannel <- nil - <-done - - w = httptest.NewRecorder() - data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"webext"}`)) - r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) - r.RemoteAddr = "129.97.208.23:8888" //CA geoip - So(err, ShouldBeNil) - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p = <-ctx.proxyPolls //manually unblock poll - p.offerChannel <- nil - <-done - ctx.metrics.printMetrics() - So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=4\nsnowflake-ips-total 4\nsnowflake-ips-standalone 1\nsnowflake-ips-badge 1\nsnowflake-ips-webext 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0\nsnowflake-ips-nat-restricted 0\nsnowflake-ips-nat-unrestricted 0\nsnowflake-ips-nat-unknown 1\n") - - }) - - //Test addition of client failures - Convey("for no proxies available", func() { - w := httptest.NewRecorder() - data := bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) - r, err := http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - - clientOffers(ctx, w, r) - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0") - - // Test reset - buf.Reset() - ctx.metrics.zeroMetrics() - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "snowflake-ips \nsnowflake-ips-total 0\nsnowflake-ips-standalone 0\nsnowflake-ips-badge 0\nsnowflake-ips-webext 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0\nsnowflake-ips-nat-restricted 0\nsnowflake-ips-nat-unrestricted 0\nsnowflake-ips-nat-unknown 0\n") - }) - //Test addition of client matches - Convey("for client-proxy match", func() { - w := httptest.NewRecorder() - data := bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) - r, err := http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - - // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) - go func() { - clientOffers(ctx, w, r) - done <- true - }() - offer := <-snowflake.offerChannel - So(offer.sdp, ShouldResemble, []byte("fake")) - snowflake.answerChannel <- "fake answer" - <-done - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "client-denied-count 0\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 8") - }) - //Test rounding boundary - Convey("binning boundary", func() { - w := httptest.NewRecorder() - data := bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err := http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - - clientOffers(ctx, w, r) - w = httptest.NewRecorder() - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - clientOffers(ctx, w, r) - w = httptest.NewRecorder() - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - clientOffers(ctx, w, r) - w = httptest.NewRecorder() - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - clientOffers(ctx, w, r) - w = httptest.NewRecorder() - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - clientOffers(ctx, w, r) - w = httptest.NewRecorder() - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - clientOffers(ctx, w, r) - w = httptest.NewRecorder() - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - clientOffers(ctx, w, r) - w = httptest.NewRecorder() - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - clientOffers(ctx, w, r) - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\n") - - w = httptest.NewRecorder() - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - clientOffers(ctx, w, r) - buf.Reset() - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "client-denied-count 16\nclient-restricted-denied-count 16\nclient-unrestricted-denied-count 0\n") - }) - - //Test unique ip - Convey("proxy counts by unique ip", func() { - w := httptest.NewRecorder() - data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) - r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) - r.RemoteAddr = "129.97.208.23:8888" //CA geoip - So(err, ShouldBeNil) - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p := <-ctx.proxyPolls //manually unblock poll - p.offerChannel <- nil - <-done - - data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) - r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) - if err != nil { - log.Printf("unable to get NewRequest with error: %v", err) - } - r.RemoteAddr = "129.97.208.23:8888" //CA geoip - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p = <-ctx.proxyPolls //manually unblock poll - p.offerChannel <- nil - <-done - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "snowflake-ips CA=1\nsnowflake-ips-total 1") - }) - //Test NAT types - Convey("proxy counts by NAT type", func() { - w := httptest.NewRecorder() - data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.2","Type":"unknown","NAT":"restricted"}`)) - r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) - r.RemoteAddr = "129.97.208.23:8888" //CA geoip - So(err, ShouldBeNil) - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p := <-ctx.proxyPolls //manually unblock poll - p.offerChannel <- nil - <-done - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "snowflake-ips-nat-restricted 1\nsnowflake-ips-nat-unrestricted 0\nsnowflake-ips-nat-unknown 0") - - data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.2","Type":"unknown","NAT":"unrestricted"}`)) - r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) - if err != nil { - log.Printf("unable to get NewRequest with error: %v", err) - } - r.RemoteAddr = "129.97.208.24:8888" //CA geoip - go func(ctx *BrokerContext) { - proxyPolls(ctx, w, r) - done <- true - }(ctx) - p = <-ctx.proxyPolls //manually unblock poll - p.offerChannel <- nil - <-done - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "snowflake-ips-nat-restricted 1\nsnowflake-ips-nat-unrestricted 1\nsnowflake-ips-nat-unknown 0") - }) - //Test client failures by NAT type - Convey("client failures by NAT type", func() { - w := httptest.NewRecorder() - data := bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) - r, err := http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - - clientOffers(ctx, w, r) - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0") - - buf.Reset() - ctx.metrics.zeroMetrics() - - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unrestricted\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - - clientOffers(ctx, w, r) - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 8\nclient-snowflake-match-count 0") - - buf.Reset() - ctx.metrics.zeroMetrics() - - data = bytes.NewReader( - []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) - r, err = http.NewRequest("POST", "snowflake.broker/client", data) - So(err, ShouldBeNil) - - clientOffers(ctx, w, r) - - ctx.metrics.printMetrics() - So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0") - }) - Convey("for country stats order", func() { - - stats := map[string]int{ - "IT": 50, - "FR": 200, - "TZ": 100, - "CN": 250, - "RU": 150, - "CA": 1, - "BE": 1, - "PH": 1, - } - ctx.metrics.countryStats.counts = stats - So(ctx.metrics.countryStats.Display(), ShouldEqual, "CN=250,FR=200,RU=150,TZ=100,IT=50,BE=1,CA=1,PH=1") - }) - }) -} +// func TestMetrics(t *testing.T) { +// Convey("Test metrics...", t, func() { +// done := make(chan bool) +// buf := new(bytes.Buffer) +// ctx := NewBrokerContext(log.New(buf, "", 0)) +// i := &IPC{ctx} +// +// err := ctx.metrics.LoadGeoipDatabases("test_geoip", "test_geoip6") +// So(err, ShouldEqual, nil) +// +// //Test addition of proxy polls +// Convey("for proxy polls", func() { +// w := httptest.NewRecorder() +// data := bytes.NewReader([]byte("{\"Sid\":\"ymbcCMto7KHNGYlp\",\"Version\":\"1.0\"}")) +// r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) +// r.RemoteAddr = "129.97.208.23:8888" //CA geoip +// So(err, ShouldBeNil) +// go func(i *IPC) { +// proxyPolls(i, w, r) +// done <- true +// }(i) +// p := <-ctx.proxyPolls //manually unblock poll +// p.offerChannel <- nil +// <-done +// +// w = httptest.NewRecorder() +// data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"standalone"}`)) +// r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) +// r.RemoteAddr = "129.97.208.23:8888" //CA geoip +// So(err, ShouldBeNil) +// go func(i *IPC) { +// proxyPolls(i, w, r) +// done <- true +// }(i) +// p = <-ctx.proxyPolls //manually unblock poll +// p.offerChannel <- nil +// <-done +// +// w = httptest.NewRecorder() +// data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"badge"}`)) +// r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) +// r.RemoteAddr = "129.97.208.23:8888" //CA geoip +// So(err, ShouldBeNil) +// go func(i *IPC) { +// proxyPolls(i, w, r) +// done <- true +// }(i) +// p = <-ctx.proxyPolls //manually unblock poll +// p.offerChannel <- nil +// <-done +// +// w = httptest.NewRecorder() +// data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"webext"}`)) +// r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) +// r.RemoteAddr = "129.97.208.23:8888" //CA geoip +// So(err, ShouldBeNil) +// go func(i *IPC) { +// proxyPolls(i, w, r) +// done <- true +// }(i) +// p = <-ctx.proxyPolls //manually unblock poll +// p.offerChannel <- nil +// <-done +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=4\nsnowflake-ips-total 4\nsnowflake-ips-standalone 1\nsnowflake-ips-badge 1\nsnowflake-ips-webext 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0\nsnowflake-ips-nat-restricted 0\nsnowflake-ips-nat-unrestricted 0\nsnowflake-ips-nat-unknown 1\n") +// +// }) +// +// //Test addition of client failures +// Convey("for no proxies available", func() { +// w := httptest.NewRecorder() +// data := bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) +// r, err := http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// +// clientOffers(i, w, r) +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0") +// +// // Test reset +// buf.Reset() +// ctx.metrics.zeroMetrics() +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "snowflake-ips \nsnowflake-ips-total 0\nsnowflake-ips-standalone 0\nsnowflake-ips-badge 0\nsnowflake-ips-webext 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0\nsnowflake-ips-nat-restricted 0\nsnowflake-ips-nat-unrestricted 0\nsnowflake-ips-nat-unknown 0\n") +// }) +// //Test addition of client matches +// Convey("for client-proxy match", func() { +// w := httptest.NewRecorder() +// data := bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) +// r, err := http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// +// // Prepare a fake proxy to respond with. +// snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) +// go func() { +// clientOffers(i, w, r) +// done <- true +// }() +// offer := <-snowflake.offerChannel +// So(offer.sdp, ShouldResemble, []byte("fake")) +// snowflake.answerChannel <- "fake answer" +// <-done +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "client-denied-count 0\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 8") +// }) +// //Test rounding boundary +// Convey("binning boundary", func() { +// w := httptest.NewRecorder() +// data := bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err := http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// +// clientOffers(i, w, r) +// w = httptest.NewRecorder() +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// clientOffers(i, w, r) +// w = httptest.NewRecorder() +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// clientOffers(i, w, r) +// w = httptest.NewRecorder() +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// clientOffers(i, w, r) +// w = httptest.NewRecorder() +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// clientOffers(i, w, r) +// w = httptest.NewRecorder() +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// clientOffers(i, w, r) +// w = httptest.NewRecorder() +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// clientOffers(i, w, r) +// w = httptest.NewRecorder() +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// clientOffers(i, w, r) +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\n") +// +// w = httptest.NewRecorder() +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// clientOffers(i, w, r) +// buf.Reset() +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "client-denied-count 16\nclient-restricted-denied-count 16\nclient-unrestricted-denied-count 0\n") +// }) +// +// //Test unique ip +// Convey("proxy counts by unique ip", func() { +// w := httptest.NewRecorder() +// data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) +// r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) +// r.RemoteAddr = "129.97.208.23:8888" //CA geoip +// So(err, ShouldBeNil) +// go func(ctx *BrokerContext) { +// proxyPolls(i, w, r) +// done <- true +// }(ctx) +// p := <-ctx.proxyPolls //manually unblock poll +// p.offerChannel <- nil +// <-done +// +// data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`)) +// r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) +// if err != nil { +// log.Printf("unable to get NewRequest with error: %v", err) +// } +// r.RemoteAddr = "129.97.208.23:8888" //CA geoip +// go func(i *IPC) { +// proxyPolls(i, w, r) +// done <- true +// }(i) +// p = <-ctx.proxyPolls //manually unblock poll +// p.offerChannel <- nil +// <-done +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "snowflake-ips CA=1\nsnowflake-ips-total 1") +// }) +// //Test NAT types +// Convey("proxy counts by NAT type", func() { +// w := httptest.NewRecorder() +// data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.2","Type":"unknown","NAT":"restricted"}`)) +// r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) +// r.RemoteAddr = "129.97.208.23:8888" //CA geoip +// So(err, ShouldBeNil) +// go func(i *IPC) { +// proxyPolls(i, w, r) +// done <- true +// }(i) +// p := <-ctx.proxyPolls //manually unblock poll +// p.offerChannel <- nil +// <-done +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "snowflake-ips-nat-restricted 1\nsnowflake-ips-nat-unrestricted 0\nsnowflake-ips-nat-unknown 0") +// +// data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.2","Type":"unknown","NAT":"unrestricted"}`)) +// r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) +// if err != nil { +// log.Printf("unable to get NewRequest with error: %v", err) +// } +// r.RemoteAddr = "129.97.208.24:8888" //CA geoip +// go func(i *IPC) { +// proxyPolls(i, w, r) +// done <- true +// }(i) +// p = <-ctx.proxyPolls //manually unblock poll +// p.offerChannel <- nil +// <-done +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "snowflake-ips-nat-restricted 1\nsnowflake-ips-nat-unrestricted 1\nsnowflake-ips-nat-unknown 0") +// }) +// //Test client failures by NAT type +// Convey("client failures by NAT type", func() { +// w := httptest.NewRecorder() +// data := bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}")) +// r, err := http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// +// clientOffers(i, w, r) +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0") +// +// buf.Reset() +// ctx.metrics.zeroMetrics() +// +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unrestricted\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// +// clientOffers(i, w, r) +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 8\nclient-snowflake-match-count 0") +// +// buf.Reset() +// ctx.metrics.zeroMetrics() +// +// data = bytes.NewReader( +// []byte("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")) +// r, err = http.NewRequest("POST", "snowflake.broker/client", data) +// So(err, ShouldBeNil) +// +// clientOffers(i, w, r) +// +// ctx.metrics.printMetrics() +// So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0") +// }) +// Convey("for country stats order", func() { +// +// stats := map[string]int{ +// "IT": 50, +// "FR": 200, +// "TZ": 100, +// "CN": 250, +// "RU": 150, +// "CA": 1, +// "BE": 1, +// "PH": 1, +// } +// ctx.metrics.countryStats.counts = stats +// So(ctx.metrics.countryStats.Display(), ShouldEqual, "CN=250,FR=200,RU=150,TZ=100,IT=50,BE=1,CA=1,PH=1") +// }) +// }) +// } diff --git a/common/messages/ipc.go b/common/messages/ipc.go new file mode 100644 index 0000000..ee29a57 --- /dev/null +++ b/common/messages/ipc.go @@ -0,0 +1,15 @@ +package messages + +import ( + "errors" +) + +type Arg struct { + Body []byte + RemoteAddr string +} + +var ( + ErrBadRequest = errors.New("bad request") + ErrInternal = errors.New("internal error") +)