Move creation of periodic stats task inside proxy library

This adds a new type of SnowflakeEvent. EventOnProxyStats is triggered
by the periodic task run at SummaryInterval and produces an event with a
proxy stats output string.
This commit is contained in:
Cecylia Bocovich 2023-10-28 16:04:09 -04:00
parent 83a7422fe6
commit 354cb65432
No known key found for this signature in database
GPG key ID: 009DE379FD9B7B90
4 changed files with 54 additions and 19 deletions

View file

@ -76,6 +76,15 @@ func (e EventOnProxyConnectionOver) String() string {
return fmt.Sprintf("Proxy connection closed (↑ %d, ↓ %d)", e.InboundTraffic, e.OutboundTraffic) return fmt.Sprintf("Proxy connection closed (↑ %d, ↓ %d)", e.InboundTraffic, e.OutboundTraffic)
} }
type EventOnProxyStats struct {
SnowflakeEvent
StatString string
}
func (e EventOnProxyStats) String() string {
return e.StatString
}
type EventOnCurrentNATTypeDetermined struct { type EventOnCurrentNATTypeDetermined struct {
SnowflakeEvent SnowflakeEvent
CurNATType string CurNATType string

View file

@ -1,54 +1,67 @@
package snowflake_proxy package snowflake_proxy
import ( import (
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/task" "fmt"
"io" "io"
"log" "log"
"time" "time"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/task"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event"
) )
func NewProxyEventLogger(logPeriod time.Duration, output io.Writer) event.SnowflakeEventReceiver { func NewProxyEventLogger(output io.Writer) event.SnowflakeEventReceiver {
logger := log.New(output, "", log.LstdFlags|log.LUTC) logger := log.New(output, "", log.LstdFlags|log.LUTC)
el := &logEventLogger{logPeriod: logPeriod, logger: logger} return &proxyEventLogger{logger: logger}
el.task = &task.Periodic{Interval: logPeriod, Execute: el.logTick}
el.task.WaitThenStart()
return el
} }
type logEventLogger struct { type proxyEventLogger struct {
logger *log.Logger
}
func (p *proxyEventLogger) OnNewSnowflakeEvent(e event.SnowflakeEvent) {
p.logger.Println(e.String())
}
type periodicProxyStats struct {
inboundSum int64 inboundSum int64
outboundSum int64 outboundSum int64
connectionCount int connectionCount int
logPeriod time.Duration logPeriod time.Duration
task *task.Periodic task *task.Periodic
logger *log.Logger dispatcher event.SnowflakeEventDispatcher
} }
func (p *logEventLogger) OnNewSnowflakeEvent(e event.SnowflakeEvent) { func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEventDispatcher) *periodicProxyStats {
el := &periodicProxyStats{logPeriod: logPeriod, dispatcher: dispatcher}
el.task = &task.Periodic{Interval: logPeriod, Execute: el.logTick}
el.task.WaitThenStart()
return el
}
func (p *periodicProxyStats) OnNewSnowflakeEvent(e event.SnowflakeEvent) {
switch e.(type) { switch e.(type) {
case event.EventOnProxyConnectionOver: case event.EventOnProxyConnectionOver:
e := e.(event.EventOnProxyConnectionOver) e := e.(event.EventOnProxyConnectionOver)
p.inboundSum += e.InboundTraffic p.inboundSum += e.InboundTraffic
p.outboundSum += e.OutboundTraffic p.outboundSum += e.OutboundTraffic
p.connectionCount += 1 p.connectionCount += 1
default:
p.logger.Println(e.String())
} }
} }
func (p *logEventLogger) logTick() error { func (p *periodicProxyStats) logTick() error {
inbound, inboundUnit := formatTraffic(p.inboundSum) inbound, inboundUnit := formatTraffic(p.inboundSum)
outbound, outboundUnit := formatTraffic(p.outboundSum) outbound, outboundUnit := formatTraffic(p.outboundSum)
p.logger.Printf("In the last %v, there were %v connections. Traffic Relayed ↓ %v %v, ↑ %v %v.\n", statString := fmt.Sprintf("In the last %v, there were %v connections. Traffic Relayed ↓ %v %v, ↑ %v %v.\n",
p.logPeriod.String(), p.connectionCount, inbound, inboundUnit, outbound, outboundUnit) p.logPeriod.String(), p.connectionCount, inbound, inboundUnit, outbound, outboundUnit)
p.dispatcher.OnNewSnowflakeEvent(&event.EventOnProxyStats{StatString: statString})
p.outboundSum = 0 p.outboundSum = 0
p.inboundSum = 0 p.inboundSum = 0
p.connectionCount = 0 p.connectionCount = 0
return nil return nil
} }
func (p *logEventLogger) Close() error { func (p *periodicProxyStats) Close() error {
return p.task.Close() return p.task.Close()
} }

View file

@ -139,6 +139,14 @@ type SnowflakeProxy struct {
ProxyType string ProxyType string
EventDispatcher event.SnowflakeEventDispatcher EventDispatcher event.SnowflakeEventDispatcher
shutdown chan struct{} shutdown chan struct{}
// DisableStatsLogger indicates whether proxy stats will be logged periodically
DisableStatsLogger bool
// SummaryInterval is the time interval at which proxy stats will be logged
SummaryInterval time.Duration
periodicProxyStats *periodicProxyStats
bytesLogger bytesLogger
} }
// Checks whether an IP address is a remote address for the client // Checks whether an IP address is a remote address for the client
@ -654,6 +662,10 @@ func (sf *SnowflakeProxy) Start() error {
sf.EventDispatcher = event.NewSnowflakeEventDispatcher() sf.EventDispatcher = event.NewSnowflakeEventDispatcher()
} }
if !sf.DisableStatsLogger {
sf.periodicProxyStats = newPeriodicProxyStats(sf.SummaryInterval, sf.EventDispatcher)
}
broker, err = newSignalingServer(sf.BrokerURL, sf.KeepLocalAddresses) broker, err = newSignalingServer(sf.BrokerURL, sf.KeepLocalAddresses)
if err != nil { if err != nil {
return fmt.Errorf("error configuring broker: %s", err) return fmt.Errorf("error configuring broker: %s", err)

View file

@ -31,7 +31,7 @@ func main() {
allowNonTLSRelay := flag.Bool("allow-non-tls-relay", false, "allow relay without tls encryption") allowNonTLSRelay := flag.Bool("allow-non-tls-relay", false, "allow relay without tls encryption")
NATTypeMeasurementInterval := flag.Duration("nat-retest-interval", time.Hour*24, NATTypeMeasurementInterval := flag.Duration("nat-retest-interval", time.Hour*24,
"the time interval in second before NAT type is retested, 0s disables retest. Valid time units are \"s\", \"m\", \"h\". ") "the time interval in second before NAT type is retested, 0s disables retest. Valid time units are \"s\", \"m\", \"h\". ")
SummaryInterval := flag.Duration("summary-interval", time.Hour, summaryInterval := flag.Duration("summary-interval", time.Hour,
"the time interval to output summary, 0s disables summaries. Valid time units are \"s\", \"m\", \"h\". ") "the time interval to output summary, 0s disables summaries. Valid time units are \"s\", \"m\", \"h\". ")
disableStatsLogger := flag.Bool("disable-stats-logger", false, "disable the exposing mechanism for stats using logs") disableStatsLogger := flag.Bool("disable-stats-logger", false, "disable the exposing mechanism for stats using logs")
enableMetrics := flag.Bool("metrics", false, "enable the exposing mechanism for stats using metrics") enableMetrics := flag.Bool("metrics", false, "enable the exposing mechanism for stats using metrics")
@ -96,6 +96,9 @@ func main() {
RelayDomainNamePattern: *allowedRelayHostNamePattern, RelayDomainNamePattern: *allowedRelayHostNamePattern,
AllowNonTLSRelay: *allowNonTLSRelay, AllowNonTLSRelay: *allowNonTLSRelay,
DisableStatsLogger: *disableStatsLogger,
SummaryInterval: *summaryInterval,
} }
var logOutput = ioutil.Discard var logOutput = ioutil.Discard
@ -124,10 +127,8 @@ func main() {
log.SetOutput(&safelog.LogScrubber{Output: logOutput}) log.SetOutput(&safelog.LogScrubber{Output: logOutput})
} }
if !*disableStatsLogger { proxyEventLogger := sf.NewProxyEventLogger(eventlogOutput)
periodicEventLogger := sf.NewProxyEventLogger(*SummaryInterval, eventlogOutput) eventLogger.AddSnowflakeEventListener(proxyEventLogger)
eventLogger.AddSnowflakeEventListener(periodicEventLogger)
}
if *enableMetrics { if *enableMetrics {
metrics := sf.NewMetrics() metrics := sf.NewMetrics()