diff --git a/proxy/lib/pt_event_logger.go b/proxy/lib/pt_event_logger.go index 5af9f28..83ee62e 100644 --- a/proxy/lib/pt_event_logger.go +++ b/proxy/lib/pt_event_logger.go @@ -6,9 +6,8 @@ import ( "log" "time" - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/task" - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event" + "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/task" ) func NewProxyEventLogger(output io.Writer) event.SnowflakeEventReceiver { @@ -25,16 +24,15 @@ func (p *proxyEventLogger) OnNewSnowflakeEvent(e event.SnowflakeEvent) { } type periodicProxyStats struct { - inboundSum int64 - outboundSum int64 + bytesLogger bytesLogger connectionCount int logPeriod time.Duration task *task.Periodic dispatcher event.SnowflakeEventDispatcher } -func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEventDispatcher) *periodicProxyStats { - el := &periodicProxyStats{logPeriod: logPeriod, dispatcher: dispatcher} +func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEventDispatcher, bytesLogger bytesLogger) *periodicProxyStats { + el := &periodicProxyStats{logPeriod: logPeriod, dispatcher: dispatcher, bytesLogger: bytesLogger} el.task = &task.Periodic{Interval: logPeriod, Execute: el.logTick} el.task.WaitThenStart() return el @@ -43,21 +41,17 @@ func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEv func (p *periodicProxyStats) OnNewSnowflakeEvent(e event.SnowflakeEvent) { switch e.(type) { case event.EventOnProxyConnectionOver: - e := e.(event.EventOnProxyConnectionOver) - p.inboundSum += e.InboundTraffic - p.outboundSum += e.OutboundTraffic p.connectionCount += 1 } } func (p *periodicProxyStats) logTick() error { - inbound, inboundUnit := formatTraffic(p.inboundSum) - outbound, outboundUnit := formatTraffic(p.outboundSum) - statString := fmt.Sprintf("In the last %v, there were %v connections. Traffic Relayed ↓ %v %v, ↑ %v %v.\n", + inboundSum, outboundSum := p.bytesLogger.GetStat() + inbound, inboundUnit := formatTraffic(inboundSum) + outbound, outboundUnit := formatTraffic(outboundSum) + statString := fmt.Sprintf("In the last %v, there were %v completed connections. Traffic Relayed ↓ %v %v, ↑ %v %v.", p.logPeriod.String(), p.connectionCount, inbound, inboundUnit, outbound, outboundUnit) p.dispatcher.OnNewSnowflakeEvent(&event.EventOnProxyStats{StatString: statString}) - p.outboundSum = 0 - p.inboundSum = 0 p.connectionCount = 0 return nil } diff --git a/proxy/lib/snowflake.go b/proxy/lib/snowflake.go index a0d630a..a0c0ce0 100644 --- a/proxy/lib/snowflake.go +++ b/proxy/lib/snowflake.go @@ -416,7 +416,7 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip close(dataChan) pr, pw := io.Pipe() - conn := newWebRTCConn(pc, dc, pr, sf.EventDispatcher) + conn := newWebRTCConn(pc, dc, pr, sf.bytesLogger) dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) @@ -446,11 +446,7 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip conn.lock.Lock() defer conn.lock.Unlock() log.Printf("Data Channel %s-%d close\n", dc.Label(), dc.ID()) - in, out := conn.bytesLogger.GetStat() - conn.eventLogger.OnNewSnowflakeEvent(event.EventOnProxyConnectionOver{ - InboundTraffic: in, - OutboundTraffic: out, - }) + sf.EventDispatcher.OnNewSnowflakeEvent(event.EventOnProxyConnectionOver{}) conn.dc = nil dc.Close() pw.Close() @@ -663,7 +659,8 @@ func (sf *SnowflakeProxy) Start() error { } if !sf.DisableStatsLogger { - sf.periodicProxyStats = newPeriodicProxyStats(sf.SummaryInterval, sf.EventDispatcher) + sf.bytesLogger = newBytesSyncLogger() + sf.periodicProxyStats = newPeriodicProxyStats(sf.SummaryInterval, sf.EventDispatcher, sf.bytesLogger) } broker, err = newSignalingServer(sf.BrokerURL, sf.KeepLocalAddresses) diff --git a/proxy/lib/webrtcconn.go b/proxy/lib/webrtcconn.go index 024945a..9ad88b5 100644 --- a/proxy/lib/webrtcconn.go +++ b/proxy/lib/webrtcconn.go @@ -14,7 +14,6 @@ import ( "github.com/pion/ice/v2" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event" ) const maxBufferedAmount uint64 = 512 * 1024 // 512 KB @@ -36,19 +35,17 @@ type webRTCConn struct { isClosing bool - bytesLogger bytesLogger - eventLogger event.SnowflakeEventReceiver - inactivityTimeout time.Duration activity chan struct{} sendMoreCh chan struct{} cancelTimeoutLoop context.CancelFunc + + bytesLogger bytesLogger } -func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, eventLogger event.SnowflakeEventReceiver) *webRTCConn { - conn := &webRTCConn{pc: pc, dc: dc, pr: pr, eventLogger: eventLogger} +func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, bytesLogger bytesLogger) *webRTCConn { + conn := &webRTCConn{pc: pc, dc: dc, pr: pr, bytesLogger: bytesLogger} conn.isClosing = false - conn.bytesLogger = newBytesSyncLogger() conn.activity = make(chan struct{}, 100) conn.sendMoreCh = make(chan struct{}, 1) conn.inactivityTimeout = 30 * time.Second