fix(proxy): race condition warning for isClosing

It appears that there is no need for the `isClosing` var at all:
we can just `close(c.sendMoreCh)` to ensure that it doesn't block
any more `Write()`s.

This is a follow-up to
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/merge_requests/144.
Related:
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/merge_requests/524.
This commit is contained in:
WofWca 2025-03-03 17:59:22 +04:00 committed by Cecylia Bocovich
parent 1aa5a61fe8
commit 01819eee32
No known key found for this signature in database
GPG key ID: 009DE379FD9B7B90
2 changed files with 5 additions and 9 deletions

View file

@ -485,6 +485,10 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(
} }
}) })
dc.OnClose(func() { dc.OnClose(func() {
// Make sure that the `Write()`s are not blocked any more.
dc.OnBufferedAmountLow(func() {})
close(conn.sendMoreCh)
conn.lock.Lock() conn.lock.Lock()
defer conn.lock.Unlock() defer conn.lock.Unlock()
log.Printf("Data Channel %s-%d close\n", dc.Label(), dc.ID()) log.Printf("Data Channel %s-%d close\n", dc.Label(), dc.ID())

View file

@ -33,8 +33,6 @@ type webRTCConn struct {
lock sync.Mutex // Synchronization for DataChannel destruction lock sync.Mutex // Synchronization for DataChannel destruction
once sync.Once // Synchronization for PeerConnection destruction once sync.Once // Synchronization for PeerConnection destruction
isClosing bool
inactivityTimeout time.Duration inactivityTimeout time.Duration
activity chan struct{} activity chan struct{}
sendMoreCh chan struct{} sendMoreCh chan struct{}
@ -45,7 +43,6 @@ type webRTCConn struct {
func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, bytesLogger bytesLogger) *webRTCConn { func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, bytesLogger bytesLogger) *webRTCConn {
conn := &webRTCConn{pc: pc, dc: dc, pr: pr, bytesLogger: bytesLogger} conn := &webRTCConn{pc: pc, dc: dc, pr: pr, bytesLogger: bytesLogger}
conn.isClosing = false
conn.activity = make(chan struct{}, 100) conn.activity = make(chan struct{}, 100)
conn.sendMoreCh = make(chan struct{}, 1) conn.sendMoreCh = make(chan struct{}, 1)
conn.inactivityTimeout = 30 * time.Second conn.inactivityTimeout = 30 * time.Second
@ -89,7 +86,7 @@ func (c *webRTCConn) Write(b []byte) (int, error) {
defer c.lock.Unlock() defer c.lock.Unlock()
if c.dc != nil { if c.dc != nil {
_ = c.dc.Send(b) _ = c.dc.Send(b)
if !c.isClosing && c.dc.BufferedAmount() >= maxBufferedAmount { if c.dc.BufferedAmount() >= maxBufferedAmount {
<-c.sendMoreCh <-c.sendMoreCh
} }
} }
@ -97,11 +94,6 @@ func (c *webRTCConn) Write(b []byte) (int, error) {
} }
func (c *webRTCConn) Close() (err error) { func (c *webRTCConn) Close() (err error) {
c.isClosing = true
select {
case c.sendMoreCh <- struct{}{}:
default:
}
c.once.Do(func() { c.once.Do(func() {
c.cancelTimeoutLoop() c.cancelTimeoutLoop()
err = errors.Join(c.pr.Close(), c.pc.Close()) err = errors.Join(c.pr.Close(), c.pc.Close())