mirror of
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake.git
synced 2025-10-13 20:11:19 -04:00
This design is easier to misuse, because it allows the caller to modify the contents of the slice after queueing it, but it avoids an extra allocation + memmove per incoming packet. Before: $ go test -bench='Benchmark(QueueIncoming|WriteTo)' -benchtime=2s -benchmem BenchmarkQueueIncoming-4 7001494 342.4 ns/op 1024 B/op 2 allocs/op BenchmarkWriteTo-4 3777459 627 ns/op 1024 B/op 2 allocs/op After: $ go test -bench=BenchmarkWriteTo -benchtime 2s -benchmem BenchmarkQueueIncoming-4 13361600 170.1 ns/op 512 B/op 1 allocs/op BenchmarkWriteTo-4 6702324 373 ns/op 512 B/op 1 allocs/op Despite the benchmark results, the change in QueueIncoming turns out not to have an effect in practice. It appears that the compiler had already been optimizing out the allocation and copy in QueueIncoming. https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40187 The WriteTo change, on the other hand, in practice reduces the frequency of garbage collection. https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40199
133 lines
4.4 KiB
Go
133 lines
4.4 KiB
Go
package turbotunnel
|
|
|
|
import (
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
|
|
// return type of PacketConn.ReadFrom.
|
|
type taggedPacket struct {
|
|
P []byte
|
|
Addr net.Addr
|
|
}
|
|
|
|
// QueuePacketConn implements net.PacketConn by storing queues of packets. There
|
|
// is one incoming queue (where packets are additionally tagged by the source
|
|
// address of the client that sent them). There are many outgoing queues, one
|
|
// for each client address that has been recently seen. The QueueIncoming method
|
|
// inserts a packet into the incoming queue, to eventually be returned by
|
|
// ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
|
|
// which can later by accessed through the OutgoingQueue method.
|
|
type QueuePacketConn struct {
|
|
clients *ClientMap
|
|
localAddr net.Addr
|
|
recvQueue chan taggedPacket
|
|
closeOnce sync.Once
|
|
closed chan struct{}
|
|
// What error to return when the QueuePacketConn is closed.
|
|
err atomic.Value
|
|
}
|
|
|
|
// NewQueuePacketConn makes a new QueuePacketConn, set to track recent clients
|
|
// for at least a duration of timeout.
|
|
func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
|
|
return &QueuePacketConn{
|
|
clients: NewClientMap(timeout),
|
|
localAddr: localAddr,
|
|
recvQueue: make(chan taggedPacket, queueSize),
|
|
closed: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// QueueIncoming queues an incoming packet and its source address, to be
|
|
// returned in a future call to ReadFrom. This function takes ownership of p and
|
|
// the caller must not reuse it.
|
|
func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
|
|
select {
|
|
case <-c.closed:
|
|
// If we're closed, silently drop it.
|
|
return
|
|
default:
|
|
}
|
|
select {
|
|
case c.recvQueue <- taggedPacket{p, addr}:
|
|
default:
|
|
// Drop the incoming packet if the receive queue is full.
|
|
}
|
|
}
|
|
|
|
// OutgoingQueue returns the queue of outgoing packets corresponding to addr,
|
|
// creating it if necessary. The contents of the queue will be packets that are
|
|
// written to the address in question using WriteTo.
|
|
func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
|
|
return c.clients.SendQueue(addr)
|
|
}
|
|
|
|
// ReadFrom returns a packet and address previously stored by QueueIncoming.
|
|
func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
|
|
select {
|
|
case <-c.closed:
|
|
return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
|
|
default:
|
|
}
|
|
select {
|
|
case <-c.closed:
|
|
return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
|
|
case packet := <-c.recvQueue:
|
|
return copy(p, packet.P), packet.Addr, nil
|
|
}
|
|
}
|
|
|
|
// WriteTo queues an outgoing packet for the given address. The queue can later
|
|
// be retrieved using the OutgoingQueue method. This function takes ownership of
|
|
// p and the caller must not reuse it.
|
|
func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
|
|
select {
|
|
case <-c.closed:
|
|
return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
|
|
default:
|
|
}
|
|
select {
|
|
case c.clients.SendQueue(addr) <- p:
|
|
return len(p), nil
|
|
default:
|
|
// Drop the outgoing packet if the send queue is full.
|
|
return len(p), nil
|
|
}
|
|
}
|
|
|
|
// closeWithError unblocks pending operations and makes future operations fail
|
|
// with the given error. If err is nil, it becomes errClosedPacketConn.
|
|
func (c *QueuePacketConn) closeWithError(err error) error {
|
|
var newlyClosed bool
|
|
c.closeOnce.Do(func() {
|
|
newlyClosed = true
|
|
// Store the error to be returned by future PacketConn
|
|
// operations.
|
|
if err == nil {
|
|
err = errClosedPacketConn
|
|
}
|
|
c.err.Store(err)
|
|
close(c.closed)
|
|
})
|
|
if !newlyClosed {
|
|
return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close unblocks pending operations and makes future operations fail with a
|
|
// "closed connection" error.
|
|
func (c *QueuePacketConn) Close() error {
|
|
return c.closeWithError(nil)
|
|
}
|
|
|
|
// LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
|
|
func (c *QueuePacketConn) LocalAddr() net.Addr { return c.localAddr }
|
|
|
|
func (c *QueuePacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
|
|
func (c *QueuePacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
|
|
func (c *QueuePacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }
|