feat(proxy): add failed connection count stats

For the summary log and for Prometheus metrics.

Log output example:

> In the last 1h0m0s, there were 7 completed successful connections. 2 connections failed. Traffic Relayed ↓ 321 KB (0.10 KB/s), ↑ 123 KB (0.05 KB/s).
This commit is contained in:
WofWca 2025-02-21 02:00:47 +04:00 committed by Shelikhoo
parent 5ef4761968
commit 583178f4f2
No known key found for this signature in database
GPG key ID: 4C9764E9FE80A3DC
5 changed files with 53 additions and 13 deletions

View file

@ -75,6 +75,8 @@ func (e EventOnProxyClientConnected) String() string {
return fmt.Sprintf("client connected")
}
// The connection with the client has now been closed,
// after getting successfully established.
type EventOnProxyConnectionOver struct {
SnowflakeEvent
Country string
@ -84,17 +86,30 @@ func (e EventOnProxyConnectionOver) String() string {
return fmt.Sprintf("Proxy connection closed")
}
// Rendezvous with a client succeeded,
// but a data channel has not been created.
type EventOnProxyConnectionFailed struct {
SnowflakeEvent
}
func (e EventOnProxyConnectionFailed) String() string {
return "Failed to connect to the client"
}
type EventOnProxyStats struct {
SnowflakeEvent
ConnectionCount int
// Completed successful connections.
ConnectionCount int
// Connections that failed to establish.
FailedConnectionCount uint
InboundBytes, OutboundBytes int64
InboundUnit, OutboundUnit string
SummaryInterval time.Duration
}
func (e EventOnProxyStats) String() string {
statString := fmt.Sprintf("In the last %v, there were %v completed connections. Traffic Relayed ↓ %v %v (%.2f %v%s), ↑ %v %v (%.2f %v%s).",
e.SummaryInterval.String(), e.ConnectionCount,
statString := fmt.Sprintf("In the last %v, there were %v completed successful connections. %v connections failed to establish. Traffic Relayed ↓ %v %v (%.2f %v%s), ↑ %v %v (%.2f %v%s).",
e.SummaryInterval.String(), e.ConnectionCount, e.FailedConnectionCount,
e.InboundBytes, e.InboundUnit, float64(e.InboundBytes)/e.SummaryInterval.Seconds(), e.InboundUnit, "/s",
e.OutboundBytes, e.OutboundUnit, float64(e.OutboundBytes)/e.SummaryInterval.Seconds(), e.OutboundUnit, "/s")
return statString

View file

@ -13,9 +13,10 @@ const (
)
type Metrics struct {
totalInBoundTraffic prometheus.Counter
totalOutBoundTraffic prometheus.Counter
totalConnections *prometheus.CounterVec
totalInBoundTraffic prometheus.Counter
totalOutBoundTraffic prometheus.Counter
totalConnections *prometheus.CounterVec
totalFailedConnections prometheus.Counter
}
func NewMetrics() *Metrics {
@ -23,10 +24,15 @@ func NewMetrics() *Metrics {
totalConnections: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "connections_total",
Help: "The total number of connections handled by the snowflake proxy",
Help: "The total number of successful connections handled by the snowflake proxy",
},
[]string{"country"},
),
totalFailedConnections: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "failed_connections_total",
Help: "The total number of client connection attempts that failed after successful rendezvous",
}),
totalInBoundTraffic: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "traffic_inbound_bytes_total",
@ -54,6 +60,7 @@ func (m *Metrics) Start(addr string) error {
func (m *Metrics) Collect(ch chan<- prometheus.Metric) {
m.totalConnections.Collect(ch)
m.totalFailedConnections.Collect(ch)
m.totalInBoundTraffic.Collect(ch)
m.totalOutBoundTraffic.Collect(ch)
}
@ -78,3 +85,8 @@ func (m *Metrics) TrackNewConnection(country string) {
With(prometheus.Labels{"country": country}).
Inc()
}
// TrackFailedConnection counts failed connection attempts
func (m *Metrics) TrackFailedConnection() {
m.totalFailedConnections.Inc()
}

View file

@ -42,11 +42,14 @@ func (p *proxyEventLogger) OnNewSnowflakeEvent(e event.SnowflakeEvent) {
}
type periodicProxyStats struct {
bytesLogger bytesLogger
bytesLogger bytesLogger
// Completed successful connections.
connectionCount int
logPeriod time.Duration
task *task.Periodic
dispatcher event.SnowflakeEventDispatcher
// Connections that failed to establish.
failedConnectionCount uint
logPeriod time.Duration
task *task.Periodic
dispatcher event.SnowflakeEventDispatcher
}
func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEventDispatcher, bytesLogger bytesLogger) *periodicProxyStats {
@ -60,19 +63,23 @@ func (p *periodicProxyStats) OnNewSnowflakeEvent(e event.SnowflakeEvent) {
switch e.(type) {
case event.EventOnProxyConnectionOver:
p.connectionCount += 1
case event.EventOnProxyConnectionFailed:
p.failedConnectionCount += 1
}
}
func (p *periodicProxyStats) logTick() error {
inboundSum, outboundSum := p.bytesLogger.GetStat()
e := event.EventOnProxyStats{
SummaryInterval: p.logPeriod,
ConnectionCount: p.connectionCount,
SummaryInterval: p.logPeriod,
ConnectionCount: p.connectionCount,
FailedConnectionCount: p.failedConnectionCount,
}
e.InboundBytes, e.InboundUnit = formatTraffic(inboundSum)
e.OutboundBytes, e.OutboundUnit = formatTraffic(outboundSum)
p.dispatcher.OnNewSnowflakeEvent(e)
p.connectionCount = 0
p.failedConnectionCount = 0
return nil
}

View file

@ -8,6 +8,7 @@ type EventCollector interface {
TrackInBoundTraffic(value int64)
TrackOutBoundTraffic(value int64)
TrackNewConnection(country string)
TrackFailedConnection()
}
type EventMetrics struct {
@ -27,5 +28,7 @@ func (em *EventMetrics) OnNewSnowflakeEvent(e event.SnowflakeEvent) {
case event.EventOnProxyConnectionOver:
e := e.(event.EventOnProxyConnectionOver)
em.collector.TrackNewConnection(e.Country)
case event.EventOnProxyConnectionFailed:
em.collector.TrackFailedConnection()
}
}

View file

@ -682,6 +682,9 @@ func (sf *SnowflakeProxy) runSession(sid string) {
connectedToClient = true
case <-time.After(dataChannelTimeout):
log.Println("Timed out waiting for client to open data channel.")
sf.EventDispatcher.OnNewSnowflakeEvent(
event.EventOnProxyConnectionFailed{},
)
if err := pc.Close(); err != nil {
log.Printf("error calling pc.Close: %v", err)
}