diff --git a/proxy/lib/snowflake.go b/proxy/lib/snowflake.go index aa9175f..0e2517e 100644 --- a/proxy/lib/snowflake.go +++ b/proxy/lib/snowflake.go @@ -485,6 +485,10 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer( } }) dc.OnClose(func() { + // Make sure that the `Write()`s are not blocked any more. + dc.OnBufferedAmountLow(func() {}) + close(conn.sendMoreCh) + conn.lock.Lock() defer conn.lock.Unlock() log.Printf("Data Channel %s-%d close\n", dc.Label(), dc.ID()) diff --git a/proxy/lib/webrtcconn.go b/proxy/lib/webrtcconn.go index f92e87b..f849bfa 100644 --- a/proxy/lib/webrtcconn.go +++ b/proxy/lib/webrtcconn.go @@ -33,8 +33,6 @@ type webRTCConn struct { lock sync.Mutex // Synchronization for DataChannel destruction once sync.Once // Synchronization for PeerConnection destruction - isClosing bool - inactivityTimeout time.Duration activity chan struct{} sendMoreCh chan struct{} @@ -45,7 +43,6 @@ type webRTCConn struct { func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, bytesLogger bytesLogger) *webRTCConn { conn := &webRTCConn{pc: pc, dc: dc, pr: pr, bytesLogger: bytesLogger} - conn.isClosing = false conn.activity = make(chan struct{}, 100) conn.sendMoreCh = make(chan struct{}, 1) conn.inactivityTimeout = 30 * time.Second @@ -89,7 +86,7 @@ func (c *webRTCConn) Write(b []byte) (int, error) { defer c.lock.Unlock() if c.dc != nil { _ = c.dc.Send(b) - if !c.isClosing && c.dc.BufferedAmount() >= maxBufferedAmount { + if c.dc.BufferedAmount() >= maxBufferedAmount { <-c.sendMoreCh } } @@ -97,11 +94,6 @@ func (c *webRTCConn) Write(b []byte) (int, error) { } func (c *webRTCConn) Close() (err error) { - c.isClosing = true - select { - case c.sendMoreCh <- struct{}{}: - default: - } c.once.Do(func() { c.cancelTimeoutLoop() err = errors.Join(c.pr.Close(), c.pc.Close())