mirror of
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake.git
synced 2025-10-14 05:11:19 -04:00
Fix datarace for WebRTCPeer.closed
The race condition occurs because concurrent goroutines are intermixing reads and writes of `WebRTCPeer.closed`. Spotted when integrating Snowflake inside OONI in https://github.com/ooni/probe-cli/pull/373.
This commit is contained in:
parent
ed2d5df87d
commit
ddcdfc4f09
3 changed files with 23 additions and 11 deletions
|
@ -33,7 +33,7 @@ type FakeDialer struct {
|
||||||
|
|
||||||
func (w FakeDialer) Catch() (*WebRTCPeer, error) {
|
func (w FakeDialer) Catch() (*WebRTCPeer, error) {
|
||||||
fmt.Println("Caught a dummy snowflake.")
|
fmt.Println("Caught a dummy snowflake.")
|
||||||
return &WebRTCPeer{}, nil
|
return &WebRTCPeer{closed: make(chan struct{})}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w FakeDialer) GetMax() int {
|
func (w FakeDialer) GetMax() int {
|
||||||
|
@ -97,7 +97,7 @@ func TestSnowflakeClient(t *testing.T) {
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
So(p.Count(), ShouldEqual, c)
|
So(p.Count(), ShouldEqual, c)
|
||||||
|
|
||||||
// But popping and closing allows it to continue.
|
// But popping allows it to continue.
|
||||||
s := p.Pop()
|
s := p.Pop()
|
||||||
s.Close()
|
s.Close()
|
||||||
So(s, ShouldNotBeNil)
|
So(s, ShouldNotBeNil)
|
||||||
|
@ -127,7 +127,7 @@ func TestSnowflakeClient(t *testing.T) {
|
||||||
cnt := 5
|
cnt := 5
|
||||||
p, _ := NewPeers(FakeDialer{max: cnt})
|
p, _ := NewPeers(FakeDialer{max: cnt})
|
||||||
for i := 0; i < cnt; i++ {
|
for i := 0; i < cnt; i++ {
|
||||||
p.activePeers.PushBack(&WebRTCPeer{})
|
p.activePeers.PushBack(&WebRTCPeer{closed: make(chan struct{})})
|
||||||
}
|
}
|
||||||
So(p.Count(), ShouldEqual, cnt)
|
So(p.Count(), ShouldEqual, cnt)
|
||||||
p.End()
|
p.End()
|
||||||
|
|
|
@ -83,7 +83,7 @@ func (p *Peers) Pop() *WebRTCPeer {
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if snowflake.closed {
|
if snowflake.Closed() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Set to use the same rate-limited traffic logger to keep consistency.
|
// Set to use the same rate-limited traffic logger to keep consistency.
|
||||||
|
@ -110,7 +110,7 @@ func (p *Peers) purgeClosedPeers() {
|
||||||
next := e.Next()
|
next := e.Next()
|
||||||
conn := e.Value.(*WebRTCPeer)
|
conn := e.Value.(*WebRTCPeer)
|
||||||
// Purge those marked for deletion.
|
// Purge those marked for deletion.
|
||||||
if conn.closed {
|
if conn.Closed() {
|
||||||
p.activePeers.Remove(e)
|
p.activePeers.Remove(e)
|
||||||
}
|
}
|
||||||
e = next
|
e = next
|
||||||
|
|
|
@ -28,7 +28,7 @@ type WebRTCPeer struct {
|
||||||
lastReceive time.Time
|
lastReceive time.Time
|
||||||
|
|
||||||
open chan struct{} // Channel to notify when datachannel opens
|
open chan struct{} // Channel to notify when datachannel opens
|
||||||
closed bool
|
closed chan struct{}
|
||||||
|
|
||||||
once sync.Once // Synchronization for PeerConnection destruction
|
once sync.Once // Synchronization for PeerConnection destruction
|
||||||
|
|
||||||
|
@ -46,6 +46,7 @@ func NewWebRTCPeer(config *webrtc.Configuration,
|
||||||
}
|
}
|
||||||
connection.id = "snowflake-" + hex.EncodeToString(buf[:])
|
connection.id = "snowflake-" + hex.EncodeToString(buf[:])
|
||||||
}
|
}
|
||||||
|
connection.closed = make(chan struct{})
|
||||||
|
|
||||||
// Override with something that's not NullLogger to have real logging.
|
// Override with something that's not NullLogger to have real logging.
|
||||||
connection.BytesLogger = &BytesNullLogger{}
|
connection.BytesLogger = &BytesNullLogger{}
|
||||||
|
@ -78,9 +79,19 @@ func (c *WebRTCPeer) Write(b []byte) (int, error) {
|
||||||
return len(b), nil
|
return len(b), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Returns a boolean indicated whether the peer is closed
|
||||||
|
func (c *WebRTCPeer) Closed() bool {
|
||||||
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (c *WebRTCPeer) Close() error {
|
func (c *WebRTCPeer) Close() error {
|
||||||
c.once.Do(func() {
|
c.once.Do(func() {
|
||||||
c.closed = true
|
close(c.closed)
|
||||||
c.cleanup()
|
c.cleanup()
|
||||||
log.Printf("WebRTC: Closing")
|
log.Printf("WebRTC: Closing")
|
||||||
})
|
})
|
||||||
|
@ -95,9 +106,6 @@ func (c *WebRTCPeer) checkForStaleness() {
|
||||||
c.lastReceive = time.Now()
|
c.lastReceive = time.Now()
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
for {
|
for {
|
||||||
if c.closed {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
lastReceive := c.lastReceive
|
lastReceive := c.lastReceive
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
@ -107,7 +115,11 @@ func (c *WebRTCPeer) checkForStaleness() {
|
||||||
c.Close()
|
c.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
<-time.After(time.Second)
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
return
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue