Move RendezvousMethod field to messages.Arg

This commit is contained in:
Anthony Chang 2024-01-29 18:24:25 -05:00 committed by Cecylia Bocovich
parent 26ceb6e20d
commit dbecefa7d2
No known key found for this signature in database
GPG key ID: 009DE379FD9B7B90
6 changed files with 45 additions and 40 deletions

View file

@ -34,10 +34,11 @@ func ampClientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
encPollReq, err = amp.DecodePath(path)
if err == nil {
arg := messages.Arg{
Body: encPollReq,
RemoteAddr: "",
Body: encPollReq,
RemoteAddr: "",
RendezvousMethod: messages.RendezvousAmpCache,
}
err = i.ClientOffers(arg, &response, RendezvousAmpCache)
err = i.ClientOffers(arg, &response)
} else {
response, err = (&messages.ClientPollResponse{
Error: "cannot decode URL path",

View file

@ -167,12 +167,13 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
}
arg := messages.Arg{
Body: body,
RemoteAddr: "",
Body: body,
RemoteAddr: "",
RendezvousMethod: messages.RendezvousHttp,
}
var response []byte
err = i.ClientOffers(arg, &response, RendezvousHttp)
err = i.ClientOffers(arg, &response)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)

View file

@ -161,7 +161,8 @@ func sendClientResponse(resp *messages.ClientPollResponse, response *[]byte) err
}
}
func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod RendezvousMethod) error {
func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
startTime := time.Now()
req, err := messages.DecodeClientPollRequest(arg.Body)
@ -195,12 +196,12 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod
snowflake.offerChannel <- offer
} else {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientDeniedCount[rendezvousMethod]++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied", "rendezvous_method": string(rendezvousMethod)}).Inc()
i.ctx.metrics.clientDeniedCount[arg.RendezvousMethod]++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied", "rendezvous_method": string(arg.RendezvousMethod)}).Inc()
if offer.natType == NATUnrestricted {
i.ctx.metrics.clientUnrestrictedDeniedCount[rendezvousMethod]++
i.ctx.metrics.clientUnrestrictedDeniedCount[arg.RendezvousMethod]++
} else {
i.ctx.metrics.clientRestrictedDeniedCount[rendezvousMethod]++
i.ctx.metrics.clientRestrictedDeniedCount[arg.RendezvousMethod]++
}
i.ctx.metrics.lock.Unlock()
resp := &messages.ClientPollResponse{Error: messages.StrNoProxies}
@ -211,8 +212,8 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod
select {
case answer := <-snowflake.answerChannel:
i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientProxyMatchCount[rendezvousMethod]++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched", "rendezvous_method": string(rendezvousMethod)}).Inc()
i.ctx.metrics.clientProxyMatchCount[arg.RendezvousMethod]++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched", "rendezvous_method": string(arg.RendezvousMethod)}).Inc()
i.ctx.metrics.lock.Unlock()
resp := &messages.ClientPollResponse{Answer: answer}
err = sendClientResponse(resp, response)

View file

@ -36,14 +36,6 @@ type CountryStats struct {
counts map[string]int
}
type RendezvousMethod string
const (
RendezvousHttp RendezvousMethod = "http"
RendezvousAmpCache RendezvousMethod = "ampcache"
RendezvousSqs RendezvousMethod = "sqs"
)
// Implements Observable
type Metrics struct {
logger *log.Logger
@ -52,10 +44,10 @@ type Metrics struct {
countryStats CountryStats
clientRoundtripEstimate time.Duration
proxyIdleCount uint
clientDeniedCount map[RendezvousMethod]uint
clientRestrictedDeniedCount map[RendezvousMethod]uint
clientUnrestrictedDeniedCount map[RendezvousMethod]uint
clientProxyMatchCount map[RendezvousMethod]uint
clientDeniedCount map[messages.RendezvousMethod]uint
clientRestrictedDeniedCount map[messages.RendezvousMethod]uint
clientUnrestrictedDeniedCount map[messages.RendezvousMethod]uint
clientProxyMatchCount map[messages.RendezvousMethod]uint
proxyPollWithRelayURLExtension uint
proxyPollWithoutRelayURLExtension uint
@ -160,10 +152,10 @@ func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {
m := new(Metrics)
m.clientDeniedCount = make(map[RendezvousMethod]uint)
m.clientRestrictedDeniedCount = make(map[RendezvousMethod]uint)
m.clientUnrestrictedDeniedCount = make(map[RendezvousMethod]uint)
m.clientProxyMatchCount = make(map[RendezvousMethod]uint)
m.clientDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint)
m.countryStats = CountryStats{
counts: make(map[string]int),
@ -215,7 +207,7 @@ func (m *Metrics) printMetrics() {
m.logger.Println("client-unrestricted-denied-count", binCount(sumMapValues(&m.clientUnrestrictedDeniedCount)))
m.logger.Println("client-snowflake-match-count", binCount(sumMapValues(&m.clientProxyMatchCount)))
for _, rendezvousMethod := range [3]RendezvousMethod{RendezvousHttp, RendezvousAmpCache, RendezvousSqs} {
for _, rendezvousMethod := range [3]messages.RendezvousMethod{messages.RendezvousHttp, messages.RendezvousAmpCache, messages.RendezvousSqs} {
m.logger.Printf("client-%s-denied-count %d\n", rendezvousMethod, binCount(m.clientDeniedCount[rendezvousMethod]))
m.logger.Printf("client-%s-restricted-denied-count %d\n", rendezvousMethod, binCount(m.clientRestrictedDeniedCount[rendezvousMethod]))
m.logger.Printf("client-%s-unrestricted-denied-count %d\n", rendezvousMethod, binCount(m.clientUnrestrictedDeniedCount[rendezvousMethod]))
@ -231,13 +223,13 @@ func (m *Metrics) printMetrics() {
// Restores all metrics to original values
func (m *Metrics) zeroMetrics() {
m.proxyIdleCount = 0
m.clientDeniedCount = make(map[RendezvousMethod]uint)
m.clientRestrictedDeniedCount = make(map[RendezvousMethod]uint)
m.clientUnrestrictedDeniedCount = make(map[RendezvousMethod]uint)
m.clientDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.proxyPollRejectedWithRelayURLExtension = 0
m.proxyPollWithRelayURLExtension = 0
m.proxyPollWithoutRelayURLExtension = 0
m.clientProxyMatchCount = make(map[RendezvousMethod]uint)
m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint)
m.countryStats.counts = make(map[string]int)
for pType := range m.countryStats.proxies {
m.countryStats.proxies[pType] = make(map[string]bool)
@ -253,7 +245,7 @@ func binCount(count uint) uint {
return uint((math.Ceil(float64(count) / 8)) * 8)
}
func sumMapValues(m *map[RendezvousMethod]uint) uint {
func sumMapValues(m *map[messages.RendezvousMethod]uint) uint {
var s uint = 0
for _, v := range *m {
s += v

View file

@ -145,10 +145,11 @@ func (r *sqsHandler) handleMessage(context context.Context, message *types.Messa
encPollReq = []byte(*message.Body)
arg := messages.Arg{
Body: encPollReq,
RemoteAddr: "",
Body: encPollReq,
RemoteAddr: "",
RendezvousMethod: messages.RendezvousSqs,
}
err = r.IPC.ClientOffers(arg, &response, RendezvousSqs)
err = r.IPC.ClientOffers(arg, &response)
if err != nil {
log.Printf("SQSHandler: error encountered when handling message: %v\n", err)

View file

@ -4,9 +4,18 @@ import (
"errors"
)
type RendezvousMethod string
const (
RendezvousHttp RendezvousMethod = "http"
RendezvousAmpCache RendezvousMethod = "ampcache"
RendezvousSqs RendezvousMethod = "sqs"
)
type Arg struct {
Body []byte
RemoteAddr string
Body []byte
RemoteAddr string
RendezvousMethod RendezvousMethod
}
var (