From ac9d49b8727b953c12a76e3645fe71a9ec3aab75 Mon Sep 17 00:00:00 2001 From: Serene H Date: Mon, 1 Aug 2016 12:17:28 -0700 Subject: [PATCH] ensure closing stale remotes from the client side --- client/snowflake.go | 3 +- client/webrtc.go | 113 ++++++++++++++++++++++++++------------------ 2 files changed, 70 insertions(+), 46 deletions(-) diff --git a/client/snowflake.go b/client/snowflake.go index 8dfa390..75e999f 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -20,6 +20,7 @@ import ( const ( ReconnectTimeout = 10 DefaultSnowflakeCapacity = 1 + SnowflakeTimeout = 30 ) // When a connection handler starts, +1 is written to this channel; when it @@ -81,7 +82,7 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error { return errors.New("handler: Received invalid Snowflake") } defer socks.Close() - defer snowflake.Reset() + defer snowflake.Close() log.Println("---- Handler: snowflake assigned ----") err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0}) if err != nil { diff --git a/client/webrtc.go b/client/webrtc.go index 1f7ac00..0492466 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -29,6 +29,7 @@ type WebRTCPeer struct { errorChannel chan error recvPipe *io.PipeReader writePipe *io.PipeWriter + lastReceive time.Time buffer bytes.Buffer reset chan struct{} @@ -37,49 +38,6 @@ type WebRTCPeer struct { BytesLogger } -// Read bytes from local SOCKS. -// As part of |io.ReadWriter| -func (c *WebRTCPeer) Read(b []byte) (int, error) { - return c.recvPipe.Read(b) -} - -// Writes bytes out to remote WebRTC. -// As part of |io.ReadWriter| -func (c *WebRTCPeer) Write(b []byte) (int, error) { - c.BytesLogger.AddOutbound(len(b)) - if nil == c.transport { - log.Printf("Buffered %d bytes --> WebRTC", len(b)) - c.buffer.Write(b) - } else { - c.transport.Send(b) - } - return len(b), nil -} - -// As part of |Snowflake| -func (c *WebRTCPeer) Close() error { - if c.closed { // Skip if already closed. - return nil - } - log.Printf("WebRTC: Closing") - c.cleanup() - // Mark for deletion. - c.closed = true - return nil -} - -// As part of |Resetter| -func (c *WebRTCPeer) Reset() { - c.Close() - go func() { - c.reset <- struct{}{} - log.Println("WebRTC resetting...") - }() -} - -// As part of |Resetter| -func (c *WebRTCPeer) WaitForReset() { <-c.reset } - // Construct a WebRTC PeerConnection. func NewWebRTCPeer(config *webrtc.Configuration, broker *BrokerChannel) *WebRTCPeer { @@ -102,6 +60,69 @@ func NewWebRTCPeer(config *webrtc.Configuration, return connection } +// Read bytes from local SOCKS. +// As part of |io.ReadWriter| +func (c *WebRTCPeer) Read(b []byte) (int, error) { + return c.recvPipe.Read(b) +} + +// Writes bytes out to remote WebRTC. +// As part of |io.ReadWriter| +func (c *WebRTCPeer) Write(b []byte) (int, error) { + c.BytesLogger.AddOutbound(len(b)) + // TODO: Buffering could be improved / separated out of WebRTCPeer. + if nil == c.transport { + log.Printf("Buffered %d bytes --> WebRTC", len(b)) + c.buffer.Write(b) + } else { + c.transport.Send(b) + } + return len(b), nil +} + +// As part of |Snowflake| +func (c *WebRTCPeer) Close() error { + if c.closed { // Skip if already closed. + return nil + } + // Mark for deletion. + c.closed = true + c.cleanup() + c.Reset() + log.Printf("WebRTC: Closing") + return nil +} + +// As part of |Resetter| +func (c *WebRTCPeer) Reset() { + if nil == c.reset { + return + } + c.reset <- struct{}{} +} + +// As part of |Resetter| +func (c *WebRTCPeer) WaitForReset() { <-c.reset } + +// Prevent long-lived broken remotes. +// Should also update the DataChannel in underlying go-webrtc's to make Closes +// more immediate / responsive. +func (c *WebRTCPeer) checkForStaleness() { + c.lastReceive = time.Now() + for { + if c.closed { + return + } + if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout { + log.Println("WebRTC: No messages received for", SnowflakeTimeout, + "seconds -- closing stale connection.") + c.Close() + return + } + <-time.After(time.Second) + } +} + // As part of |Connector| interface. func (c *WebRTCPeer) Connect() error { log.Println(c.id, " connecting...") @@ -119,6 +140,7 @@ func (c *WebRTCPeer) Connect() error { if err != nil { return err } + go c.checkForStaleness() return nil } @@ -208,7 +230,7 @@ func (c *WebRTCPeer) establishDataChannel() error { // Disable the DataChannel as a write destination. log.Println("WebRTC: DataChannel.OnClose [remotely]") c.transport = nil - c.Reset() + c.Close() } dc.OnMessage = func(msg []byte) { if len(msg) <= 0 { @@ -225,6 +247,7 @@ func (c *WebRTCPeer) establishDataChannel() error { log.Println("Error: short write") panic("short write") } + c.lastReceive = time.Now() } log.Println("WebRTC: DataChannel created.") return nil @@ -257,7 +280,7 @@ func (c *WebRTCPeer) exchangeSDP() error { } case err := <-c.errorChannel: log.Println("Failed to prepare offer", err) - c.Reset() + c.Close() return err } // Keep trying the same offer until a valid answer arrives.