Simplify broker metrics and remove mutexes

This is a large change to how the snowflake broker metrics are
implemented. This change removes all uses of mutexes from the metrics
implementation in favor of atomic operations on counters stored in
sync.Map.

There is a small change to the actual metrics output. We used to count
the same proxy ip multiple times in our snowflake-ips-total and
snowflake-ips country stats if the same proxy ip address polled more
than once with different proxy types. This was an overcounting of the
number of unique proxy IP addresses that is now fixed.

If a unique proxy ip polls with more than one proxy type or nat type,
these polls will still be counted once for each proxy type or nat type
in our proxy type and nat type specific stats (e.g.,
snowflake-ips-nat-restricted and snowflake-ips-nat-unrestricted).
This commit is contained in:
Cecylia Bocovich 2025-05-29 17:36:36 -04:00
parent 64c7a26475
commit 78cf8e68b2
No known key found for this signature in database
GPG key ID: 009DE379FD9B7B90
3 changed files with 164 additions and 243 deletions

View file

@ -5,7 +5,6 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"log" "log"
"time"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/constants" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/constants"
@ -74,22 +73,16 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
} }
if !relayPatternSupported { if !relayPatternSupported {
i.ctx.metrics.lock.Lock() i.ctx.metrics.IncrementCounter("proxy-poll-without-relay-url")
i.ctx.metrics.proxyPollWithoutRelayURLExtension++
i.ctx.metrics.promMetrics.ProxyPollWithoutRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc() i.ctx.metrics.promMetrics.ProxyPollWithoutRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
i.ctx.metrics.lock.Unlock()
} else { } else {
i.ctx.metrics.lock.Lock() i.ctx.metrics.IncrementCounter("proxy-poll-with-relay-url")
i.ctx.metrics.proxyPollWithRelayURLExtension++
i.ctx.metrics.promMetrics.ProxyPollWithRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc() i.ctx.metrics.promMetrics.ProxyPollWithRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
i.ctx.metrics.lock.Unlock()
} }
if !i.ctx.CheckProxyRelayPattern(relayPattern, !relayPatternSupported) { if !i.ctx.CheckProxyRelayPattern(relayPattern, !relayPatternSupported) {
i.ctx.metrics.lock.Lock() i.ctx.metrics.IncrementCounter("proxy-poll-rejected-relay-url")
i.ctx.metrics.proxyPollRejectedWithRelayURLExtension++
i.ctx.metrics.promMetrics.ProxyPollRejectedForRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc() i.ctx.metrics.promMetrics.ProxyPollRejectedForRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
i.ctx.metrics.lock.Unlock()
b, err := messages.EncodePollResponseWithRelayURL("", false, "", "", "incorrect relay pattern") b, err := messages.EncodePollResponseWithRelayURL("", false, "", "", "incorrect relay pattern")
*response = b *response = b
@ -104,9 +97,7 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
if err != nil { if err != nil {
log.Println("Warning: cannot process proxy IP: ", err.Error()) log.Println("Warning: cannot process proxy IP: ", err.Error())
} else { } else {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType) i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType)
i.ctx.metrics.lock.Unlock()
} }
var b []byte var b []byte
@ -115,10 +106,8 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
offer := i.ctx.RequestOffer(sid, proxyType, natType, clients) offer := i.ctx.RequestOffer(sid, proxyType, natType, clients)
if offer == nil { if offer == nil {
i.ctx.metrics.lock.Lock() i.ctx.metrics.IncrementCounter("proxy-idle")
i.ctx.metrics.proxyIdleCount++
i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc() i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc()
i.ctx.metrics.lock.Unlock()
b, err = messages.EncodePollResponse("", false, "") b, err = messages.EncodePollResponse("", false, "")
if err != nil { if err != nil {
@ -162,8 +151,6 @@ func sendClientResponse(resp *messages.ClientPollResponse, response *[]byte) err
func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error { func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
startTime := time.Now()
req, err := messages.DecodeClientPollRequest(arg.Body) req, err := messages.DecodeClientPollRequest(arg.Body)
if err != nil { if err != nil {
return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response) return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
@ -197,9 +184,7 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
if snowflake != nil { if snowflake != nil {
snowflake.offerChannel <- offer snowflake.offerChannel <- offer
} else { } else {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "denied") i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "denied")
i.ctx.metrics.lock.Unlock()
resp := &messages.ClientPollResponse{Error: messages.StrNoProxies} resp := &messages.ClientPollResponse{Error: messages.StrNoProxies}
return sendClientResponse(resp, response) return sendClientResponse(resp, response)
} }
@ -207,19 +192,11 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
// Wait for the answer to be returned on the channel or timeout. // Wait for the answer to be returned on the channel or timeout.
select { select {
case answer := <-snowflake.answerChannel: case answer := <-snowflake.answerChannel:
i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "matched") i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "matched")
i.ctx.metrics.lock.Unlock()
resp := &messages.ClientPollResponse{Answer: answer} resp := &messages.ClientPollResponse{Answer: answer}
err = sendClientResponse(resp, response) err = sendClientResponse(resp, response)
// Initial tracking of elapsed time.
i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / time.Millisecond
i.ctx.metrics.lock.Unlock()
case <-arg.Context.Done(): case <-arg.Context.Done():
i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "timeout") i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "timeout")
i.ctx.metrics.lock.Unlock()
resp := &messages.ClientPollResponse{Error: messages.StrTimedOut} resp := &messages.ClientPollResponse{Error: messages.StrTimedOut}
err = sendClientResponse(resp, response) err = sendClientResponse(resp, response)
} }

View file

@ -12,6 +12,7 @@ import (
"net" "net"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -25,53 +26,9 @@ const (
metricsResolution = 60 * 60 * 24 * time.Second //86400 seconds metricsResolution = 60 * 60 * 24 * time.Second //86400 seconds
) )
var rendezvoudMethodList = [...]messages.RendezvousMethod{
messages.RendezvousHttp,
messages.RendezvousAmpCache,
messages.RendezvousSqs,
}
type CountryStats struct {
// map[proxyType][address]bool
proxies map[string]map[string]bool
unknown map[string]bool
natRestricted map[string]bool
natUnrestricted map[string]bool
natUnknown map[string]bool
counts map[string]int
}
// Implements Observable
type Metrics struct {
logger *log.Logger
geoipdb *geoip.Geoip
countryStats CountryStats
clientRoundtripEstimate time.Duration
proxyIdleCount uint
clientDeniedCount map[messages.RendezvousMethod]uint
clientRestrictedDeniedCount map[messages.RendezvousMethod]uint
clientUnrestrictedDeniedCount map[messages.RendezvousMethod]uint
clientProxyMatchCount map[messages.RendezvousMethod]uint
clientProxyTimeoutCount map[messages.RendezvousMethod]uint
rendezvousCountryStats map[messages.RendezvousMethod]map[string]int
proxyPollWithRelayURLExtension uint
proxyPollWithoutRelayURLExtension uint
proxyPollRejectedWithRelayURLExtension uint
// synchronization for access to snowflake metrics
lock sync.Mutex
promMetrics *PromMetrics
}
type record struct { type record struct {
cc string cc string
count int count uint64
} }
type records []record type records []record
@ -84,68 +41,103 @@ func (r records) Less(i, j int) bool {
return r[i].count < r[j].count return r[i].count < r[j].count
} }
func (s CountryStats) Display() string { type Metrics struct {
output := "" logger *log.Logger
geoipdb *geoip.Geoip
// Use the records struct to sort our counts map by value. ips *sync.Map // proxy IP addresses we've seen before
rs := records{} counters *sync.Map // counters for ip-based metrics
for cc, count := range s.counts {
rs = append(rs, record{cc: cc, count: count})
}
sort.Sort(sort.Reverse(rs))
for _, r := range rs {
output += fmt.Sprintf("%s=%d,", r.cc, r.count)
}
// cut off trailing "," // counters for country-based metrics
if len(output) > 0 { proxies *sync.Map // ip-based counts of proxy country codes
return output[:len(output)-1] clientHTTPPolls *sync.Map // poll-based counts of client HTTP rendezvous
} clientAMPPolls *sync.Map // poll-based counts of client AMP cache rendezvous
clientSQSPolls *sync.Map // poll-based counts of client SQS rendezvous
return output promMetrics *PromMetrics
}
func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {
m := new(Metrics)
m.logger = metricsLogger
m.promMetrics = initPrometheus()
m.ips = new(sync.Map)
m.counters = new(sync.Map)
m.proxies = new(sync.Map)
m.clientHTTPPolls = new(sync.Map)
m.clientAMPPolls = new(sync.Map)
m.clientSQSPolls = new(sync.Map)
// Write to log file every day with updated metrics
go m.logMetrics()
return m, nil
}
func incrementMapCounter(counters *sync.Map, key string) {
start := uint64(1)
val, loaded := counters.LoadOrStore(key, &start)
if loaded {
ptr := val.(*uint64)
atomic.AddUint64(ptr, 1)
}
}
func (m *Metrics) IncrementCounter(key string) {
incrementMapCounter(m.counters, key)
} }
func (m *Metrics) UpdateCountryStats(addr string, proxyType string, natType string) { func (m *Metrics) UpdateCountryStats(addr string, proxyType string, natType string) {
var country string
var ok bool
addresses, ok := m.countryStats.proxies[proxyType]
if !ok {
if m.countryStats.unknown[addr] {
return
}
m.countryStats.unknown[addr] = true
} else {
if addresses[addr] {
return
}
addresses[addr] = true
}
// perform geolocation of IP address
ip := net.ParseIP(addr) ip := net.ParseIP(addr)
if m.geoipdb == nil { if m.geoipdb == nil {
return return
} }
country, ok = m.geoipdb.GetCountryByAddr(ip) country, ok := m.geoipdb.GetCountryByAddr(ip)
if !ok { if !ok {
country = "??" country = "??"
} }
m.countryStats.counts[country]++
// check whether we've seen this proxy ip before
if _, loaded := m.ips.LoadOrStore(addr, true); !loaded {
m.IncrementCounter("proxy-total")
incrementMapCounter(m.proxies, country)
}
// update unique IP proxy NAT metrics
key := fmt.Sprintf("%s-%s", addr, natType)
if _, loaded := m.ips.LoadOrStore(key, true); !loaded {
switch natType {
case NATRestricted:
m.IncrementCounter("proxy-nat-restricted")
case NATUnrestricted:
m.IncrementCounter("proxy-nat-unrestricted")
default:
m.IncrementCounter("proxy-nat-unknown")
}
}
// update unique IP proxy type metrics
key = fmt.Sprintf("%s-%s", addr, proxyType)
if _, loaded := m.ips.LoadOrStore(key, true); !loaded {
switch proxyType {
case "standalone":
m.IncrementCounter("proxy-standalone")
case "badge":
m.IncrementCounter("proxy-badge")
case "iptproxy":
m.IncrementCounter("proxy-iptproxy")
case "webext":
m.IncrementCounter("proxy-webext")
}
}
m.promMetrics.ProxyTotal.With(prometheus.Labels{ m.promMetrics.ProxyTotal.With(prometheus.Labels{
"nat": natType, "nat": natType,
"type": proxyType, "type": proxyType,
"cc": country, "cc": country,
}).Inc() }).Inc()
switch natType {
case NATRestricted:
m.countryStats.natRestricted[addr] = true
case NATUnrestricted:
m.countryStats.natUnrestricted[addr] = true
default:
m.countryStats.natUnknown[addr] = true
}
} }
func (m *Metrics) UpdateRendezvousStats(addr string, rendezvousMethod messages.RendezvousMethod, natType, status string) { func (m *Metrics) UpdateRendezvousStats(addr string, rendezvousMethod messages.RendezvousMethod, natType, status string) {
@ -160,20 +152,31 @@ func (m *Metrics) UpdateRendezvousStats(addr string, rendezvousMethod messages.R
switch status { switch status {
case "denied": case "denied":
m.clientDeniedCount[rendezvousMethod]++ m.IncrementCounter("client-denied")
if natType == NATUnrestricted { if natType == NATUnrestricted {
m.clientUnrestrictedDeniedCount[rendezvousMethod]++ m.IncrementCounter("client-unrestricted-denied")
} else { } else {
m.clientRestrictedDeniedCount[rendezvousMethod]++ m.IncrementCounter("client-restricted-denied")
} }
case "matched": case "matched":
m.clientProxyMatchCount[rendezvousMethod]++ m.IncrementCounter("client-match")
case "timeout": case "timeout":
m.clientProxyTimeoutCount[rendezvousMethod]++ m.IncrementCounter("client-timeout")
default: default:
log.Printf("Unknown rendezvous status: %s", status) log.Printf("Unknown rendezvous status: %s", status)
} }
m.rendezvousCountryStats[rendezvousMethod][country]++
switch rendezvousMethod {
case messages.RendezvousHttp:
m.IncrementCounter("client-http")
incrementMapCounter(m.clientHTTPPolls, country)
case messages.RendezvousAmpCache:
m.IncrementCounter("client-amp")
incrementMapCounter(m.clientAMPPolls, country)
case messages.RendezvousSqs:
m.IncrementCounter("client-sqs")
incrementMapCounter(m.clientSQSPolls, country)
}
m.promMetrics.ClientPollTotal.With(prometheus.Labels{ m.promMetrics.ClientPollTotal.With(prometheus.Labels{
"nat": natType, "nat": natType,
"status": status, "status": status,
@ -182,17 +185,27 @@ func (m *Metrics) UpdateRendezvousStats(addr string, rendezvousMethod messages.R
}).Inc() }).Inc()
} }
func (m *Metrics) DisplayRendezvousStatsByCountry(rendezvoudMethod messages.RendezvousMethod) string { func displayCountryStats(m *sync.Map, binned bool) string {
output := "" output := ""
// Use the records struct to sort our counts map by value. // Use the records struct to sort our counts map by value.
rs := records{} rs := records{}
for cc, count := range m.rendezvousCountryStats[rendezvoudMethod] {
rs = append(rs, record{cc: cc, count: count}) m.Range(func(cc any, _ any) bool {
count, loaded := m.LoadAndDelete(cc)
ptr := count.(*uint64)
if loaded {
rs = append(rs, record{cc: cc.(string), count: *ptr})
} }
return true
})
sort.Sort(sort.Reverse(rs)) sort.Sort(sort.Reverse(rs))
for _, r := range rs { for _, r := range rs {
output += fmt.Sprintf("%s=%d,", r.cc, binCount(uint(r.count))) count := uint64(r.count)
if binned {
count = binCount(count)
}
output += fmt.Sprintf("%s=%d,", r.cc, count)
} }
// cut off trailing "," // cut off trailing ","
@ -212,126 +225,61 @@ func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
return err return err
} }
func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {
m := new(Metrics)
m.clientDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint)
m.clientProxyTimeoutCount = make(map[messages.RendezvousMethod]uint)
m.rendezvousCountryStats = make(map[messages.RendezvousMethod]map[string]int)
for _, rendezvousMethod := range rendezvoudMethodList {
m.rendezvousCountryStats[rendezvousMethod] = make(map[string]int)
}
m.countryStats = CountryStats{
counts: make(map[string]int),
proxies: make(map[string]map[string]bool),
unknown: make(map[string]bool),
natRestricted: make(map[string]bool),
natUnrestricted: make(map[string]bool),
natUnknown: make(map[string]bool),
}
for pType := range messages.KnownProxyTypes {
m.countryStats.proxies[pType] = make(map[string]bool)
}
m.logger = metricsLogger
m.promMetrics = initPrometheus()
// Write to log file every day with updated metrics
go m.logMetrics()
return m, nil
}
// Logs metrics in intervals specified by metricsResolution // Logs metrics in intervals specified by metricsResolution
func (m *Metrics) logMetrics() { func (m *Metrics) logMetrics() {
heartbeat := time.Tick(metricsResolution) heartbeat := time.Tick(metricsResolution)
for range heartbeat { for range heartbeat {
m.printMetrics() m.printMetrics()
m.zeroMetrics()
} }
} }
func (m *Metrics) loadAndZero(key string) uint64 {
count, loaded := m.counters.LoadAndDelete(key)
if !loaded {
count = new(uint64)
}
ptr := count.(*uint64)
return *ptr
}
func (m *Metrics) printMetrics() { func (m *Metrics) printMetrics() {
m.lock.Lock()
m.logger.Println( m.logger.Println(
"snowflake-stats-end", "snowflake-stats-end",
time.Now().UTC().Format("2006-01-02 15:04:05"), time.Now().UTC().Format("2006-01-02 15:04:05"),
fmt.Sprintf("(%d s)", int(metricsResolution.Seconds())), fmt.Sprintf("(%d s)", int(metricsResolution.Seconds())),
) )
m.logger.Println("snowflake-ips", m.countryStats.Display()) m.logger.Println("snowflake-ips", displayCountryStats(m.proxies, false))
total := len(m.countryStats.unknown) m.logger.Printf("snowflake-ips-iptproxy %d\n", m.loadAndZero("proxy-iptproxy"))
for pType, addresses := range m.countryStats.proxies { m.logger.Printf("snowflake-ips-standalone %d\n", m.loadAndZero("proxy-standalone"))
m.logger.Printf("snowflake-ips-%s %d\n", pType, len(addresses)) m.logger.Printf("snowflake-ips-webext %d\n", m.loadAndZero("proxy-webext"))
total += len(addresses) m.logger.Printf("snowflake-ips-badge %d\n", m.loadAndZero("proxy-badge"))
} m.logger.Println("snowflake-ips-total", m.loadAndZero("proxy-total"))
m.logger.Println("snowflake-ips-total", total) m.logger.Println("snowflake-idle-count", binCount(m.loadAndZero("proxy-idle")))
m.logger.Println("snowflake-idle-count", binCount(m.proxyIdleCount)) m.logger.Println("snowflake-proxy-poll-with-relay-url-count", binCount(m.loadAndZero("proxy-poll-with-relay-url")))
m.logger.Println("snowflake-proxy-poll-with-relay-url-count", binCount(m.proxyPollWithRelayURLExtension)) m.logger.Println("snowflake-proxy-poll-without-relay-url-count", binCount(m.loadAndZero("proxy-poll-without-relay-url")))
m.logger.Println("snowflake-proxy-poll-without-relay-url-count", binCount(m.proxyPollWithoutRelayURLExtension)) m.logger.Println("snowflake-proxy-rejected-for-relay-url-count", binCount(m.loadAndZero("proxy-poll-rejected-relay-url")))
m.logger.Println("snowflake-proxy-rejected-for-relay-url-count", binCount(m.proxyPollRejectedWithRelayURLExtension))
m.logger.Println("client-denied-count", binCount(sumMapValues(&m.clientDeniedCount))) m.logger.Println("client-denied-count", binCount(m.loadAndZero("client-denied")))
m.logger.Println("client-restricted-denied-count", binCount(sumMapValues(&m.clientRestrictedDeniedCount))) m.logger.Println("client-restricted-denied-count", binCount(m.loadAndZero("client-restricted-denied")))
m.logger.Println("client-unrestricted-denied-count", binCount(sumMapValues(&m.clientUnrestrictedDeniedCount))) m.logger.Println("client-unrestricted-denied-count", binCount(m.loadAndZero("client-unrestricted-denied")))
m.logger.Println("client-snowflake-match-count", binCount(sumMapValues(&m.clientProxyMatchCount))) m.logger.Println("client-snowflake-match-count", binCount(m.loadAndZero("client-match")))
m.logger.Println("client-snowflake-timeout-count", binCount(sumMapValues(&m.clientProxyTimeoutCount))) m.logger.Println("client-snowflake-timeout-count", binCount(m.loadAndZero("client-timeout")))
for _, rendezvousMethod := range rendezvoudMethodList { m.logger.Printf("client-http-count %d\n", binCount(m.loadAndZero("client-http")))
m.logger.Printf("client-%s-count %d\n", rendezvousMethod, binCount( m.logger.Printf("client-http-ips %s\n", displayCountryStats(m.clientHTTPPolls, true))
m.clientDeniedCount[rendezvousMethod]+m.clientProxyMatchCount[rendezvousMethod], m.logger.Printf("client-ampcache-count %d\n", binCount(m.loadAndZero("client-amp")))
)) m.logger.Printf("client-ampcache-ips %s\n", displayCountryStats(m.clientAMPPolls, true))
m.logger.Printf("client-%s-ips %s\n", rendezvousMethod, m.DisplayRendezvousStatsByCountry(rendezvousMethod)) m.logger.Printf("client-sqs-count %d\n", binCount(m.loadAndZero("client-sqs")))
} m.logger.Printf("client-sqs-ips %s\n", displayCountryStats(m.clientSQSPolls, true))
m.logger.Println("snowflake-ips-nat-restricted", len(m.countryStats.natRestricted)) m.logger.Println("snowflake-ips-nat-restricted", m.loadAndZero("proxy-nat-restricted"))
m.logger.Println("snowflake-ips-nat-unrestricted", len(m.countryStats.natUnrestricted)) m.logger.Println("snowflake-ips-nat-unrestricted", m.loadAndZero("proxy-nat-unrestricted"))
m.logger.Println("snowflake-ips-nat-unknown", len(m.countryStats.natUnknown)) m.logger.Println("snowflake-ips-nat-unknown", m.loadAndZero("proxy-nat-unknown"))
m.lock.Unlock()
}
// Restores all metrics to original values
func (m *Metrics) zeroMetrics() {
m.proxyIdleCount = 0
m.clientDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.proxyPollRejectedWithRelayURLExtension = 0
m.proxyPollWithRelayURLExtension = 0
m.proxyPollWithoutRelayURLExtension = 0
m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint)
m.clientProxyTimeoutCount = make(map[messages.RendezvousMethod]uint)
m.rendezvousCountryStats = make(map[messages.RendezvousMethod]map[string]int)
for _, rendezvousMethod := range rendezvoudMethodList {
m.rendezvousCountryStats[rendezvousMethod] = make(map[string]int)
}
m.countryStats.counts = make(map[string]int)
for pType := range m.countryStats.proxies {
m.countryStats.proxies[pType] = make(map[string]bool)
}
m.countryStats.unknown = make(map[string]bool)
m.countryStats.natRestricted = make(map[string]bool)
m.countryStats.natUnrestricted = make(map[string]bool)
m.countryStats.natUnknown = make(map[string]bool)
} }
// Rounds up a count to the nearest multiple of 8. // Rounds up a count to the nearest multiple of 8.
func binCount(count uint) uint { func binCount(count uint64) uint64 {
return uint((math.Ceil(float64(count) / 8)) * 8) return uint64((math.Ceil(float64(count) / 8)) * 8)
}
func sumMapValues(m *map[messages.RendezvousMethod]uint) uint {
var s uint = 0
for _, v := range *m {
s += v
}
return s
} }
type PromMetrics struct { type PromMetrics struct {

View file

@ -11,6 +11,7 @@ import (
"net/http/httptest" "net/http/httptest"
"os" "os"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -671,7 +672,7 @@ func TestMetrics(t *testing.T) {
w := httptest.NewRecorder() w := httptest.NewRecorder()
data := bytes.NewReader([]byte("{\"Sid\":\"ymbcCMto7KHNGYlp\",\"Version\":\"1.0\",\"AcceptedRelayPattern\":\"snowflake.torproject.net\"}")) data := bytes.NewReader([]byte("{\"Sid\":\"ymbcCMto7KHNGYlp\",\"Version\":\"1.0\",\"AcceptedRelayPattern\":\"snowflake.torproject.net\"}"))
r, err := http.NewRequest("POST", "snowflake.broker/proxy", data) r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip r.RemoteAddr = "129.97.208.23" //CA geoip
So(err, ShouldBeNil) So(err, ShouldBeNil)
go func(i *IPC) { go func(i *IPC) {
proxyPolls(i, w, r) proxyPolls(i, w, r)
@ -684,7 +685,7 @@ func TestMetrics(t *testing.T) {
w = httptest.NewRecorder() w = httptest.NewRecorder()
data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"standalone","AcceptedRelayPattern":"snowflake.torproject.net"}`)) data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"standalone","AcceptedRelayPattern":"snowflake.torproject.net"}`))
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip r.RemoteAddr = "129.97.208.24" //CA geoip
So(err, ShouldBeNil) So(err, ShouldBeNil)
go func(i *IPC) { go func(i *IPC) {
proxyPolls(i, w, r) proxyPolls(i, w, r)
@ -697,7 +698,7 @@ func TestMetrics(t *testing.T) {
w = httptest.NewRecorder() w = httptest.NewRecorder()
data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"badge","AcceptedRelayPattern":"snowflake.torproject.net"}`)) data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"badge","AcceptedRelayPattern":"snowflake.torproject.net"}`))
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip r.RemoteAddr = "129.97.208.25" //CA geoip
So(err, ShouldBeNil) So(err, ShouldBeNil)
go func(i *IPC) { go func(i *IPC) {
proxyPolls(i, w, r) proxyPolls(i, w, r)
@ -710,7 +711,7 @@ func TestMetrics(t *testing.T) {
w = httptest.NewRecorder() w = httptest.NewRecorder()
data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"webext","AcceptedRelayPattern":"snowflake.torproject.net"}`)) data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0","Type":"webext","AcceptedRelayPattern":"snowflake.torproject.net"}`))
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
r.RemoteAddr = "129.97.208.23:8888" //CA geoip r.RemoteAddr = "129.97.208.26" //CA geoip
So(err, ShouldBeNil) So(err, ShouldBeNil)
go func(i *IPC) { go func(i *IPC) {
proxyPolls(i, w, r) proxyPolls(i, w, r)
@ -744,7 +745,7 @@ client-sqs-count 0
client-sqs-ips client-sqs-ips
snowflake-ips-nat-restricted 0 snowflake-ips-nat-restricted 0
snowflake-ips-nat-unrestricted 0 snowflake-ips-nat-unrestricted 0
snowflake-ips-nat-unknown 1 snowflake-ips-nat-unknown 4
`) `)
}) })
@ -774,7 +775,6 @@ client-sqs-ips `)
// Test reset // Test reset
buf.Reset() buf.Reset()
ctx.metrics.zeroMetrics()
ctx.metrics.printMetrics() ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "\nsnowflake-ips \n") So(buf.String(), ShouldContainSubstring, "\nsnowflake-ips \n")
So(buf.String(), ShouldContainSubstring, "\nsnowflake-ips-standalone 0\n") So(buf.String(), ShouldContainSubstring, "\nsnowflake-ips-standalone 0\n")
@ -881,9 +881,6 @@ snowflake-ips-nat-unknown 0
So(err, ShouldBeNil) So(err, ShouldBeNil)
clientOffers(i, w, r) clientOffers(i, w, r)
ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\n")
w = httptest.NewRecorder() w = httptest.NewRecorder()
data, err = createClientOffer(sdp, NATRestricted, "") data, err = createClientOffer(sdp, NATRestricted, "")
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -945,9 +942,6 @@ snowflake-ips-nat-unknown 0
p.offerChannel <- nil p.offerChannel <- nil
<-done <-done
ctx.metrics.printMetrics()
So(buf.String(), ShouldContainSubstring, "snowflake-ips-nat-restricted 1\nsnowflake-ips-nat-unrestricted 0\nsnowflake-ips-nat-unknown 0")
data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.2","Type":"unknown","NAT":"unrestricted","AcceptedRelayPattern":"snowflake.torproject.net"}`)) data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.2","Type":"unknown","NAT":"unrestricted","AcceptedRelayPattern":"snowflake.torproject.net"}`))
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data) r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
if err != nil { if err != nil {
@ -979,7 +973,6 @@ snowflake-ips-nat-unknown 0
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0") So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0")
buf.Reset() buf.Reset()
ctx.metrics.zeroMetrics()
data, err = createClientOffer(sdp, NATUnrestricted, "") data, err = createClientOffer(sdp, NATUnrestricted, "")
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -992,7 +985,6 @@ snowflake-ips-nat-unknown 0
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 8\nclient-snowflake-match-count 0") So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 8\nclient-snowflake-match-count 0")
buf.Reset() buf.Reset()
ctx.metrics.zeroMetrics()
data, err = createClientOffer(sdp, NATUnknown, "") data, err = createClientOffer(sdp, NATUnknown, "")
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -1005,8 +997,8 @@ snowflake-ips-nat-unknown 0
So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0") So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0")
}) })
Convey("for country stats order", func() { Convey("for country stats order", func() {
stats := new(sync.Map)
stats := map[string]int{ for cc, count := range map[string]uint64{
"IT": 50, "IT": 50,
"FR": 200, "FR": 200,
"TZ": 100, "TZ": 100,
@ -1015,9 +1007,13 @@ snowflake-ips-nat-unknown 0
"CA": 1, "CA": 1,
"BE": 1, "BE": 1,
"PH": 1, "PH": 1,
} {
stats.LoadOrStore(cc, new(uint64))
val, _ := stats.Load(cc)
ptr := val.(*uint64)
atomic.AddUint64(ptr, count)
} }
ctx.metrics.countryStats.counts = stats So(displayCountryStats(stats, false), ShouldEqual, "CN=250,FR=200,RU=150,TZ=100,IT=50,BE=1,CA=1,PH=1")
So(ctx.metrics.countryStats.Display(), ShouldEqual, "CN=250,FR=200,RU=150,TZ=100,IT=50,BE=1,CA=1,PH=1")
}) })
}) })
} }