diff --git a/broker/broker.go b/broker/broker.go index 408488c..0a79b8f 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -149,10 +149,20 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return } log.Println("Received snowflake: ", id) + + // Log geoip stats + remoteIP, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + log.Println("Error processing proxy IP: ", err.Error()) + } else { + ctx.metrics.UpdateCountryStats(remoteIP) + } + // Wait for a client to avail an offer to the snowflake, or timeout if nil. offer := ctx.RequestOffer(id) if nil == offer { log.Println("Proxy " + id + " did not receive a Client offer.") + ctx.metrics.proxyIdleCount++ w.WriteHeader(http.StatusGatewayTimeout) return } @@ -176,6 +186,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { // Immediately fail if there are no snowflakes available. if ctx.snowflakes.Len() <= 0 { log.Println("Client: No snowflake proxies available.") + ctx.metrics.clientDeniedCount++ w.WriteHeader(http.StatusServiceUnavailable) return } @@ -189,6 +200,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { select { case answer := <-snowflake.answerChannel: log.Println("Client: Retrieving answer") + ctx.metrics.clientProxyMatchCount++ w.Write(answer) // Initial tracking of elapsed time. ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / @@ -221,15 +233,6 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return } - // Get proxy country stats - remoteIP, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - log.Println("Error processing proxy IP: ", err.Error()) - } else { - - ctx.metrics.UpdateCountryStats(remoteIP) - } - log.Println("Received answer.") snowflake.answerChannel <- body } @@ -291,7 +294,7 @@ func main() { metricsFile = os.Stdout } - metricsLogger := log.New(metricsFile, "", log.LstdFlags|log.LUTC) + metricsLogger := log.New(metricsFile, "", 0) ctx := NewBrokerContext(metricsLogger) diff --git a/broker/metrics.go b/broker/metrics.go index f961d1f..ffa61e2 100644 --- a/broker/metrics.go +++ b/broker/metrics.go @@ -1,9 +1,51 @@ +/* +We export metrics in the following format: + + "snowflake-stats-end" YYYY-MM-DD HH:MM:SS (NSEC s) NL + [At most once.] + + YYYY-MM-DD HH:MM:SS defines the end of the included measurement + interval of length NSEC seconds (86400 seconds by default). + + "snowflake-ips" CC=NUM,CC=NUM,... NL + [At most once.] + + List of mappings from two-letter country codes to the number of + unique IP addresses of snowflake proxies that have polled. + + "snowflake-ips-total" NUM NL + [At most once.] + + A count of the total number of unique IP addresses of snowflake + proxies that have polled. + + "snowflake-idle-count" NUM NL + [At most once.] + + A count of the number of times a proxy has polled but received + no client offer, rounded up to the nearest multiple of 8. + + "client-denied-count" NUM NL + [At most once.] + + A count of the number of times a client has requested a proxy + from the broker but no proxies were available, rounded up to + the nearest multiple of 8. + + "client-snowflake-match-count" NUM NL + [At most once.] + + A count of the number of times a client successfully received a + proxy from the broker, rounded up to the nearest multiple of 8. +*/ + package main import ( // "golang.org/x/net/internal/timeseries" "fmt" "log" + "math" "net" "sync" "time" @@ -13,21 +55,38 @@ var ( once sync.Once ) +const metricsResolution = 60 * 60 * 24 * time.Second //86400 seconds + type CountryStats struct { + addrs map[string]bool counts map[string]int } // Implements Observable type Metrics struct { - tablev4 *GeoIPv4Table - tablev6 *GeoIPv6Table - countryStats CountryStats - // snowflakes timeseries.Float + logger *log.Logger + tablev4 *GeoIPv4Table + tablev6 *GeoIPv6Table + + countryStats CountryStats clientRoundtripEstimate time.Duration + proxyIdleCount uint + clientDeniedCount uint + clientProxyMatchCount uint } func (s CountryStats) Display() string { - return fmt.Sprint(s.counts) + output := "" + for cc, count := range s.counts { + output += fmt.Sprintf("%s=%d,", cc, count) + } + + // cut off trailing "," + if len(output) > 0 { + return output[:len(output)-1] + } + + return output } func (m *Metrics) UpdateCountryStats(addr string) { @@ -35,6 +94,10 @@ func (m *Metrics) UpdateCountryStats(addr string) { var country string var ok bool + if m.countryStats.addrs[addr] { + return + } + ip := net.ParseIP(addr) if ip.To4() != nil { //This is an IPv4 address @@ -54,8 +117,9 @@ func (m *Metrics) UpdateCountryStats(addr string) { log.Println("Unknown geoip") } - //update map of countries and counts + //update map of unique ips and counts m.countryStats.counts[country]++ + m.countryStats.addrs[addr] = true return } @@ -90,19 +154,45 @@ func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) { m.countryStats = CountryStats{ counts: make(map[string]int), + addrs: make(map[string]bool), } + m.logger = metricsLogger + // Write to log file every hour with updated metrics - go once.Do(func() { - heartbeat := time.Tick(time.Hour) - for range heartbeat { - metricsLogger.Println("Country stats: ", m.countryStats.Display()) - - //restore all metrics to original values - m.countryStats.counts = make(map[string]int) - - } - }) + go once.Do(m.logMetrics) return m, nil } + +// Logs metrics in intervals specified by metricsResolution +func (m *Metrics) logMetrics() { + heartbeat := time.Tick(metricsResolution) + for range heartbeat { + m.printMetrics() + m.zeroMetrics() + } +} + +func (m *Metrics) printMetrics() { + m.logger.Println("snowflake-stats-end", time.Now().UTC().Format("2006-01-02 15:04:05"), fmt.Sprintf("(%d s)", int(metricsResolution.Seconds()))) + m.logger.Println("snowflake-ips", m.countryStats.Display()) + m.logger.Println("snowflake-ips-total", len(m.countryStats.addrs)) + m.logger.Println("snowflake-idle-count", binCount(m.proxyIdleCount)) + m.logger.Println("client-denied-count", binCount(m.clientDeniedCount)) + m.logger.Println("client-snowflake-match-count", binCount(m.clientProxyMatchCount)) +} + +// Restores all metrics to original values +func (m *Metrics) zeroMetrics() { + m.proxyIdleCount = 0 + m.clientDeniedCount = 0 + m.clientProxyMatchCount = 0 + m.countryStats.counts = make(map[string]int) + m.countryStats.addrs = make(map[string]bool) +} + +// Rounds up a count to the nearest multiple of 8. +func binCount(count uint) uint { + return uint((math.Ceil(float64(count) / 8)) * 8) +} diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index 03aa2b4..5afbe33 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -11,6 +11,7 @@ import ( "net/http/httptest" "os" "testing" + "time" ) func NullLogger() *log.Logger { @@ -390,3 +391,131 @@ func TestGeoip(t *testing.T) { }) } + +func TestMetrics(t *testing.T) { + + Convey("Test metrics...", t, func() { + done := make(chan bool) + buf := new(bytes.Buffer) + ctx := NewBrokerContext(log.New(buf, "", 0)) + + err := ctx.metrics.LoadGeoipDatabases("test_geoip", "test_geoip6") + So(err, ShouldEqual, nil) + + //Test addition of proxy polls + Convey("for proxy polls", func() { + w := httptest.NewRecorder() + data := bytes.NewReader([]byte("test")) + r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) + r.Header.Set("X-Session-ID", "test") + r.RemoteAddr = "129.97.208.23:8888" //CA geoip + So(err, ShouldBeNil) + go func(ctx *BrokerContext) { + proxyPolls(ctx, w, r) + done <- true + }(ctx) + p := <-ctx.proxyPolls //manually unblock poll + p.offerChannel <- nil + <-done + + ctx.metrics.printMetrics() + So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=1\nsnowflake-ips-total 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-snowflake-match-count 0\n") + }) + + //Test addition of client failures + Convey("for no proxies available", func() { + w := httptest.NewRecorder() + data := bytes.NewReader([]byte("test")) + r, err := http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) + + clientOffers(ctx, w, r) + + ctx.metrics.printMetrics() + So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 8\nclient-snowflake-match-count 0\n") + + // Test reset + buf.Reset() + ctx.metrics.zeroMetrics() + ctx.metrics.printMetrics() + So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-snowflake-match-count 0\n") + }) + //Test addition of client matches + Convey("for client-proxy match", func() { + w := httptest.NewRecorder() + data := bytes.NewReader([]byte("test")) + r, err := http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) + + // Prepare a fake proxy to respond with. + snowflake := ctx.AddSnowflake("fake") + go func() { + clientOffers(ctx, w, r) + done <- true + }() + offer := <-snowflake.offerChannel + So(offer, ShouldResemble, []byte("test")) + snowflake.answerChannel <- []byte("fake answer") + <-done + + ctx.metrics.printMetrics() + So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-snowflake-match-count 8\n") + }) + //Test rounding boundary + Convey("binning boundary", func() { + w := httptest.NewRecorder() + data := bytes.NewReader([]byte("test")) + r, err := http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) + + clientOffers(ctx, w, r) + clientOffers(ctx, w, r) + clientOffers(ctx, w, r) + clientOffers(ctx, w, r) + clientOffers(ctx, w, r) + clientOffers(ctx, w, r) + clientOffers(ctx, w, r) + clientOffers(ctx, w, r) + + ctx.metrics.printMetrics() + So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 8\nclient-snowflake-match-count 0\n") + + clientOffers(ctx, w, r) + buf.Reset() + ctx.metrics.printMetrics() + So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 16\nclient-snowflake-match-count 0\n") + }) + + //Test unique ip + Convey("proxy counts by unique ip", func() { + w := httptest.NewRecorder() + data := bytes.NewReader([]byte("test")) + r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) + r.Header.Set("X-Session-ID", "test") + r.RemoteAddr = "129.97.208.23:8888" //CA geoip + So(err, ShouldBeNil) + go func(ctx *BrokerContext) { + proxyPolls(ctx, w, r) + done <- true + }(ctx) + p := <-ctx.proxyPolls //manually unblock poll + p.offerChannel <- nil + <-done + + data = bytes.NewReader([]byte("test")) + r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) + r.Header.Set("X-Session-ID", "test") + r.RemoteAddr = "129.97.208.23:8888" //CA geoip + go func(ctx *BrokerContext) { + proxyPolls(ctx, w, r) + done <- true + }(ctx) + p = <-ctx.proxyPolls //manually unblock poll + p.offerChannel <- nil + <-done + + ctx.metrics.printMetrics() + So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=1\nsnowflake-ips-total 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-snowflake-match-count 0\n") + }) + }) +}