Proxy stats log only what occurred that time interval

Modify the periodic stats output by standalone snowflake proxies to only
include the data transferred during the time interval being logged. This
is an improvement of previous behaviour that logged the total data
transferred by all proxy connections that were closed within the time
interval being logged..

Closes #40302:
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40302
This commit is contained in:
Cecylia Bocovich 2023-10-28 16:45:22 -04:00
parent 354cb65432
commit 018bbd6d65
No known key found for this signature in database
GPG key ID: 009DE379FD9B7B90
3 changed files with 16 additions and 28 deletions

View file

@ -6,9 +6,8 @@ import (
"log" "log"
"time" "time"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/task"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/task"
) )
func NewProxyEventLogger(output io.Writer) event.SnowflakeEventReceiver { func NewProxyEventLogger(output io.Writer) event.SnowflakeEventReceiver {
@ -25,16 +24,15 @@ func (p *proxyEventLogger) OnNewSnowflakeEvent(e event.SnowflakeEvent) {
} }
type periodicProxyStats struct { type periodicProxyStats struct {
inboundSum int64 bytesLogger bytesLogger
outboundSum int64
connectionCount int connectionCount int
logPeriod time.Duration logPeriod time.Duration
task *task.Periodic task *task.Periodic
dispatcher event.SnowflakeEventDispatcher dispatcher event.SnowflakeEventDispatcher
} }
func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEventDispatcher) *periodicProxyStats { func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEventDispatcher, bytesLogger bytesLogger) *periodicProxyStats {
el := &periodicProxyStats{logPeriod: logPeriod, dispatcher: dispatcher} el := &periodicProxyStats{logPeriod: logPeriod, dispatcher: dispatcher, bytesLogger: bytesLogger}
el.task = &task.Periodic{Interval: logPeriod, Execute: el.logTick} el.task = &task.Periodic{Interval: logPeriod, Execute: el.logTick}
el.task.WaitThenStart() el.task.WaitThenStart()
return el return el
@ -43,21 +41,17 @@ func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEv
func (p *periodicProxyStats) OnNewSnowflakeEvent(e event.SnowflakeEvent) { func (p *periodicProxyStats) OnNewSnowflakeEvent(e event.SnowflakeEvent) {
switch e.(type) { switch e.(type) {
case event.EventOnProxyConnectionOver: case event.EventOnProxyConnectionOver:
e := e.(event.EventOnProxyConnectionOver)
p.inboundSum += e.InboundTraffic
p.outboundSum += e.OutboundTraffic
p.connectionCount += 1 p.connectionCount += 1
} }
} }
func (p *periodicProxyStats) logTick() error { func (p *periodicProxyStats) logTick() error {
inbound, inboundUnit := formatTraffic(p.inboundSum) inboundSum, outboundSum := p.bytesLogger.GetStat()
outbound, outboundUnit := formatTraffic(p.outboundSum) inbound, inboundUnit := formatTraffic(inboundSum)
statString := fmt.Sprintf("In the last %v, there were %v connections. Traffic Relayed ↓ %v %v, ↑ %v %v.\n", outbound, outboundUnit := formatTraffic(outboundSum)
statString := fmt.Sprintf("In the last %v, there were %v completed connections. Traffic Relayed ↓ %v %v, ↑ %v %v.",
p.logPeriod.String(), p.connectionCount, inbound, inboundUnit, outbound, outboundUnit) p.logPeriod.String(), p.connectionCount, inbound, inboundUnit, outbound, outboundUnit)
p.dispatcher.OnNewSnowflakeEvent(&event.EventOnProxyStats{StatString: statString}) p.dispatcher.OnNewSnowflakeEvent(&event.EventOnProxyStats{StatString: statString})
p.outboundSum = 0
p.inboundSum = 0
p.connectionCount = 0 p.connectionCount = 0
return nil return nil
} }

View file

@ -416,7 +416,7 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip
close(dataChan) close(dataChan)
pr, pw := io.Pipe() pr, pw := io.Pipe()
conn := newWebRTCConn(pc, dc, pr, sf.EventDispatcher) conn := newWebRTCConn(pc, dc, pr, sf.bytesLogger)
dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
@ -446,11 +446,7 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip
conn.lock.Lock() conn.lock.Lock()
defer conn.lock.Unlock() defer conn.lock.Unlock()
log.Printf("Data Channel %s-%d close\n", dc.Label(), dc.ID()) log.Printf("Data Channel %s-%d close\n", dc.Label(), dc.ID())
in, out := conn.bytesLogger.GetStat() sf.EventDispatcher.OnNewSnowflakeEvent(event.EventOnProxyConnectionOver{})
conn.eventLogger.OnNewSnowflakeEvent(event.EventOnProxyConnectionOver{
InboundTraffic: in,
OutboundTraffic: out,
})
conn.dc = nil conn.dc = nil
dc.Close() dc.Close()
pw.Close() pw.Close()
@ -663,7 +659,8 @@ func (sf *SnowflakeProxy) Start() error {
} }
if !sf.DisableStatsLogger { if !sf.DisableStatsLogger {
sf.periodicProxyStats = newPeriodicProxyStats(sf.SummaryInterval, sf.EventDispatcher) sf.bytesLogger = newBytesSyncLogger()
sf.periodicProxyStats = newPeriodicProxyStats(sf.SummaryInterval, sf.EventDispatcher, sf.bytesLogger)
} }
broker, err = newSignalingServer(sf.BrokerURL, sf.KeepLocalAddresses) broker, err = newSignalingServer(sf.BrokerURL, sf.KeepLocalAddresses)

View file

@ -14,7 +14,6 @@ import (
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
"github.com/pion/sdp/v3" "github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event"
) )
const maxBufferedAmount uint64 = 512 * 1024 // 512 KB const maxBufferedAmount uint64 = 512 * 1024 // 512 KB
@ -36,19 +35,17 @@ type webRTCConn struct {
isClosing bool isClosing bool
bytesLogger bytesLogger
eventLogger event.SnowflakeEventReceiver
inactivityTimeout time.Duration inactivityTimeout time.Duration
activity chan struct{} activity chan struct{}
sendMoreCh chan struct{} sendMoreCh chan struct{}
cancelTimeoutLoop context.CancelFunc cancelTimeoutLoop context.CancelFunc
bytesLogger bytesLogger
} }
func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, eventLogger event.SnowflakeEventReceiver) *webRTCConn { func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, bytesLogger bytesLogger) *webRTCConn {
conn := &webRTCConn{pc: pc, dc: dc, pr: pr, eventLogger: eventLogger} conn := &webRTCConn{pc: pc, dc: dc, pr: pr, bytesLogger: bytesLogger}
conn.isClosing = false conn.isClosing = false
conn.bytesLogger = newBytesSyncLogger()
conn.activity = make(chan struct{}, 100) conn.activity = make(chan struct{}, 100)
conn.sendMoreCh = make(chan struct{}, 1) conn.sendMoreCh = make(chan struct{}, 1)
conn.inactivityTimeout = 30 * time.Second conn.inactivityTimeout = 30 * time.Second