Ensure turbotunnel read and write loop terminate

Introduce a waitgroup and done channel to ensure that both the read and
write gorouting for turbotunnel connections terminate when the
connection is closed.
This commit is contained in:
Cecylia Bocovich 2021-05-12 09:32:07 -04:00
parent 11f0846264
commit 7c9005bed3
2 changed files with 27 additions and 14 deletions

View file

@ -140,5 +140,6 @@ func (inner *clientMapInner) Pop() interface{} {
inner.byAge = inner.byAge[:n-1] inner.byAge = inner.byAge[:n-1]
// Remove from byAddr map. // Remove from byAddr map.
delete(inner.byAddr, record.Addr) delete(inner.byAddr, record.Addr)
close(record.SendQueue)
return record return record
} }

View file

@ -8,6 +8,7 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"sync"
"time" "time"
"git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation" "git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation"
@ -139,18 +140,21 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
// credited for the entire KCP session. // credited for the entire KCP session.
clientIDAddrMap.Set(clientID, addr.String()) clientIDAddrMap.Set(clientID, addr.String())
errCh := make(chan error) var wg sync.WaitGroup
wg.Add(2)
done := make(chan struct{})
// The remainder of the WebSocket stream consists of encapsulated // The remainder of the WebSocket stream consists of encapsulated
// packets. We read them one by one and feed them into the // packets. We read them one by one and feed them into the
// QueuePacketConn on which kcp.ServeConn was set up, which eventually // QueuePacketConn on which kcp.ServeConn was set up, which eventually
// leads to KCP-level sessions in the acceptSessions function. // leads to KCP-level sessions in the acceptSessions function.
go func() { go func() {
defer wg.Done()
defer close(done) // Signal the write loop to finish
for { for {
p, err := encapsulation.ReadData(conn) p, err := encapsulation.ReadData(conn)
if err != nil { if err != nil {
errCh <- err return
break
} }
pconn.QueueIncoming(p, clientID) pconn.QueueIncoming(p, clientID)
} }
@ -159,24 +163,32 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
// At the same time, grab packets addressed to this ClientID and // At the same time, grab packets addressed to this ClientID and
// encapsulate them into the downstream. // encapsulate them into the downstream.
go func() { go func() {
defer wg.Done()
defer conn.Close() // Signal the read loop to finish
// Buffer encapsulation.WriteData operations to keep length // Buffer encapsulation.WriteData operations to keep length
// prefixes in the same send as the data that follows. // prefixes in the same send as the data that follows.
bw := bufio.NewWriter(conn) bw := bufio.NewWriter(conn)
for p := range pconn.OutgoingQueue(clientID) { for {
select {
case <-done:
return
case p, ok := <-pconn.OutgoingQueue(clientID):
if !ok {
return
}
_, err := encapsulation.WriteData(bw, p) _, err := encapsulation.WriteData(bw, p)
if err == nil { if err == nil {
err = bw.Flush() err = bw.Flush()
} }
if err != nil { if err != nil {
errCh <- err return
break }
} }
} }
}() }()
// Wait until one of the above loops terminates. The closing of the wg.Wait()
// WebSocket connection will terminate the other one.
<-errCh
return nil return nil
} }