Remove proxy churn measurements from broker.

We've done the analysis we planned to do on these measurements.

A program to analyze the proxy churn and extract hour-by-hour
intersections is available at:
https://github.com/turfed/snowflake-paper/tree/main/figures/proxy-churn

Closes #40280.
This commit is contained in:
David Fifield 2023-09-05 05:42:15 +00:00 committed by Shelikhoo
parent a615e8b1ab
commit 6393af6bab
No known key found for this signature in database
GPG key ID: C4D5E79D22B25316
12 changed files with 0 additions and 362 deletions

View file

@ -11,8 +11,6 @@ import (
"crypto/tls" "crypto/tls"
"flag" "flag"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster"
"io" "io"
"log" "log"
"net/http" "net/http"
@ -196,8 +194,6 @@ func main() {
var certFilename, keyFilename string var certFilename, keyFilename string
var disableGeoip bool var disableGeoip bool
var metricsFilename string var metricsFilename string
var ipCountFilename, ipCountMaskingKey string
var ipCountInterval time.Duration
var unsafeLogging bool var unsafeLogging bool
flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications") flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications")
@ -214,9 +210,6 @@ func main() {
flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS") flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection") flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output") flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
flag.StringVar(&ipCountFilename, "ip-count-log", "", "path to ip count logging output")
flag.StringVar(&ipCountMaskingKey, "ip-count-mask", "", "masking key for ip count logging")
flag.DurationVar(&ipCountInterval, "ip-count-interval", time.Hour, "time interval between each chunk")
flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed") flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed")
flag.Parse() flag.Parse()
@ -264,16 +257,6 @@ func main() {
} }
} }
if ipCountFilename != "" {
ipCountFile, err := os.OpenFile(ipCountFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err.Error())
}
ipSetSink := ipsetsink.NewIPSetSink(ipCountMaskingKey)
ctx.metrics.distinctIPWriter = sinkcluster.NewClusterWriter(ipCountFile, ipCountInterval, ipSetSink)
}
go ctx.Broker() go ctx.Broker()
i := &IPC{ctx} i := &IPC{ctx}

View file

@ -107,7 +107,6 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
} else { } else {
i.ctx.metrics.lock.Lock() i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType) i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType)
i.ctx.metrics.RecordIPAddress(remoteIP)
i.ctx.metrics.lock.Unlock() i.ctx.metrics.lock.Unlock()
} }

View file

@ -16,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"gitlab.torproject.org/tpo/anti-censorship/geoip" "gitlab.torproject.org/tpo/anti-censorship/geoip"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
) )
@ -42,8 +41,6 @@ type Metrics struct {
logger *log.Logger logger *log.Logger
geoipdb *geoip.Geoip geoipdb *geoip.Geoip
distinctIPWriter *sinkcluster.ClusterWriter
countryStats CountryStats countryStats CountryStats
clientRoundtripEstimate time.Duration clientRoundtripEstimate time.Duration
proxyIdleCount uint proxyIdleCount uint
@ -327,13 +324,3 @@ func initPrometheus() *PromMetrics {
return promMetrics return promMetrics
} }
func (m *Metrics) RecordIPAddress(ip string) {
if m.distinctIPWriter != nil {
m.distinctIPWriter.AddIPToSet(ip)
}
}
func (m *Metrics) SetIPAddressRecorder(recorder *sinkcluster.ClusterWriter) {
m.distinctIPWriter = recorder
}

View file

@ -1,55 +0,0 @@
package ipsetsink
import (
"bytes"
"crypto/hmac"
"encoding/binary"
"hash"
"github.com/clarkduvall/hyperloglog"
"golang.org/x/crypto/sha3"
)
func NewIPSetSink(maskingKey string) *IPSetSink {
countDistinct, _ := hyperloglog.NewPlus(18)
return &IPSetSink{
ipMaskingKey: maskingKey,
countDistinct: countDistinct,
}
}
type IPSetSink struct {
ipMaskingKey string
countDistinct *hyperloglog.HyperLogLogPlus
}
func (s *IPSetSink) maskIPAddress(ipAddress string) []byte {
hmacIPMasker := hmac.New(func() hash.Hash {
return sha3.New256()
}, []byte(s.ipMaskingKey))
hmacIPMasker.Write([]byte(ipAddress))
return hmacIPMasker.Sum(nil)
}
func (s *IPSetSink) AddIPToSet(ipAddress string) {
s.countDistinct.Add(truncatedHash64FromBytes{hashValue(s.maskIPAddress(ipAddress))})
}
func (s *IPSetSink) Dump() ([]byte, error) {
return s.countDistinct.GobEncode()
}
func (s *IPSetSink) Reset() {
s.countDistinct.Clear()
}
type hashValue []byte
type truncatedHash64FromBytes struct {
hashValue
}
func (c truncatedHash64FromBytes) Sum64() uint64 {
var value uint64
binary.Read(bytes.NewReader(c.hashValue), binary.BigEndian, &value)
return value
}

View file

@ -1,47 +0,0 @@
package ipsetsink
import (
"fmt"
"github.com/clarkduvall/hyperloglog"
"testing"
)
import . "github.com/smartystreets/goconvey/convey"
func TestSinkInit(t *testing.T) {
Convey("Context", t, func() {
sink := NewIPSetSink("demo")
sink.AddIPToSet("test1")
sink.AddIPToSet("test2")
data, err := sink.Dump()
So(err, ShouldBeNil)
structure, err := hyperloglog.NewPlus(18)
So(err, ShouldBeNil)
err = structure.GobDecode(data)
So(err, ShouldBeNil)
count := structure.Count()
So(count, ShouldBeBetweenOrEqual, 1, 3)
})
}
func TestSinkCounting(t *testing.T) {
Convey("Context", t, func() {
for itemCount := 300; itemCount <= 10000; itemCount += 200 {
sink := NewIPSetSink("demo")
for i := 0; i <= itemCount; i++ {
sink.AddIPToSet(fmt.Sprintf("demo%v", i))
}
for i := 0; i <= itemCount; i++ {
sink.AddIPToSet(fmt.Sprintf("demo%v", i))
}
data, err := sink.Dump()
So(err, ShouldBeNil)
structure, err := hyperloglog.NewPlus(18)
So(err, ShouldBeNil)
err = structure.GobDecode(data)
So(err, ShouldBeNil)
count := structure.Count()
So((float64(count)/float64(itemCount))-1.0, ShouldAlmostEqual, 0, 0.01)
}
})
}

View file

@ -1,24 +0,0 @@
package sinkcluster
/* ClusterWriter, and (ClusterCountResult).Count output a streamed IP set journal file to remember distinct IP address
its format is as follows:
This file should be in newline-delimited JSON format(https://jsonlines.org/).
For each line, the format of json data should be in the format of:
{"recordingStart":"2022-05-30T14:38:44.678610091Z","recordingEnd":"2022-05-30T14:39:48.157630926Z","recorded":""}
recordingStart:datetime is the time this chunk of recording start.
recordingEnd:datetime is the time this chunk of recording end.
recorded is the checkpoint data generated by hyperloglog.
*/
import "time"
type SinkEntry struct {
RecordingStart time.Time `json:"recordingStart"`
RecordingEnd time.Time `json:"recordingEnd"`
Recorded []byte `json:"recorded"`
}

View file

@ -1,64 +0,0 @@
package sinkcluster
import (
"bufio"
"encoding/json"
"github.com/clarkduvall/hyperloglog"
"io"
"time"
)
func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter {
return &ClusterCounter{from: from, to: to}
}
type ClusterCounter struct {
from time.Time
to time.Time
}
type ClusterCountResult struct {
Sum uint64
ChunkIncluded int64
}
func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) {
result := ClusterCountResult{}
counter, err := hyperloglog.NewPlus(18)
if err != nil {
return nil, err
}
inputScanner := bufio.NewScanner(reader)
for inputScanner.Scan() {
inputLine := inputScanner.Bytes()
sinkInfo := SinkEntry{}
if err := json.Unmarshal(inputLine, &sinkInfo); err != nil {
return nil, err
}
if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) ||
sinkInfo.RecordingEnd.After(c.to) {
continue
}
restoredCounter, err := hyperloglog.NewPlus(18)
if err != nil {
return nil, err
}
err = restoredCounter.GobDecode(sinkInfo.Recorded)
if err != nil {
return nil, err
}
result.ChunkIncluded++
err = counter.Merge(restoredCounter)
if err != nil {
return nil, err
}
}
err = inputScanner.Err()
if err != nil {
return nil, err
}
result.Sum = counter.Count()
return &result, nil
}

View file

@ -1,68 +0,0 @@
package sinkcluster
import (
"bytes"
"encoding/json"
"io"
"log"
"time"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink"
)
func NewClusterWriter(writer WriteSyncer, writeInterval time.Duration, sink *ipsetsink.IPSetSink) *ClusterWriter {
c := &ClusterWriter{
writer: writer,
lastWriteTime: time.Now(),
writeInterval: writeInterval,
current: sink,
}
return c
}
type ClusterWriter struct {
writer WriteSyncer
lastWriteTime time.Time
writeInterval time.Duration
current *ipsetsink.IPSetSink
}
type WriteSyncer interface {
Sync() error
io.Writer
}
func (c *ClusterWriter) WriteIPSetToDisk() {
currentTime := time.Now()
data, err := c.current.Dump()
if err != nil {
log.Println("unable able to write ipset to file:", err)
return
}
entry := &SinkEntry{
RecordingStart: c.lastWriteTime,
RecordingEnd: currentTime,
Recorded: data,
}
jsonData, err := json.Marshal(entry)
if err != nil {
log.Println("unable able to write ipset to file:", err)
return
}
jsonData = append(jsonData, byte('\n'))
_, err = io.Copy(c.writer, bytes.NewReader(jsonData))
if err != nil {
log.Println("unable able to write ipset to file:", err)
return
}
c.writer.Sync()
c.lastWriteTime = currentTime
c.current.Reset()
}
func (c *ClusterWriter) AddIPToSet(ipAddress string) {
if c.lastWriteTime.Add(c.writeInterval).Before(time.Now()) {
c.WriteIPSetToDisk()
}
c.current.AddIPToSet(ipAddress)
}

View file

@ -1,33 +0,0 @@
package sinkcluster
import (
"bytes"
"io"
"testing"
"time"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink"
. "github.com/smartystreets/goconvey/convey"
)
type writerStub struct {
io.Writer
}
func (w writerStub) Sync() error {
return nil
}
func TestSinkWriter(t *testing.T) {
Convey("Context", t, func() {
buffer := bytes.NewBuffer(nil)
writerStubInst := &writerStub{buffer}
sink := ipsetsink.NewIPSetSink("demo")
clusterWriter := NewClusterWriter(writerStubInst, time.Minute, sink)
clusterWriter.AddIPToSet("1")
clusterWriter.WriteIPSetToDisk()
So(buffer.Bytes(), ShouldNotBeNil)
})
}

View file

@ -1,37 +0,0 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"time"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster"
)
func main() {
inputFile := flag.String("in", "", "")
start := flag.String("from", "", "")
end := flag.String("to", "", "")
flag.Parse()
startTime, err := time.Parse(time.RFC3339, *start)
if err != nil {
log.Fatal("unable to parse start time:", err)
}
endTime, err := time.Parse(time.RFC3339, *end)
if err != nil {
log.Fatal("unable to parse end time:", err)
}
fd, err := os.Open(*inputFile)
if err != nil {
log.Fatal("unable to open input file:", err)
}
counter := sinkcluster.NewClusterCounter(startTime, endTime)
result, err := counter.Count(fd)
if err != nil {
log.Fatal("unable to count:", err)
}
fmt.Printf("sum = %v\n", result.Sum)
fmt.Printf("chunkIncluded = %v\n", result.ChunkIncluded)
}

1
go.mod
View file

@ -3,7 +3,6 @@ module gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/
go 1.21 go 1.21
require ( require (
github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/pion/ice/v2 v2.3.11 github.com/pion/ice/v2 v2.3.11
github.com/pion/sdp/v3 v3.0.6 github.com/pion/sdp/v3 v3.0.6

2
go.sum
View file

@ -7,8 +7,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 h1:mK6JroY6bLiPS3s6QCYOSjRyErFc2iHNkhhmRfF0nHo=
github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004/go.mod h1:drodPoQNro6QBO6TJ/MpMZbz8Bn2eSDtRN6jpG4VGw8=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs= github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs=
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=