David Fifield 2020-01-28 02:29:34 -07:00
parent 904af9cb8a
commit 222ab3d85a
7 changed files with 1050 additions and 0 deletions

View file

@ -0,0 +1,28 @@
package turbotunnel
import (
"crypto/rand"
"encoding/hex"
)
// ClientID is an abstract identifier that binds together all the communications
// belonging to a single client session, even though those communications may
// arrive from multiple IP addresses or over multiple lower-level connections.
// It plays the same role that an (IP address, port number) tuple plays in a
// net.UDPConn: it's the return address pertaining to a long-lived abstract
// client session. The client attaches its ClientID to each of its
// communications, enabling the server to disambiguate requests among its many
// clients. ClientID implements the net.Addr interface.
type ClientID [8]byte
func NewClientID() ClientID {
var id ClientID
_, err := rand.Read(id[:])
if err != nil {
panic(err)
}
return id
}
func (id ClientID) Network() string { return "clientid" }
func (id ClientID) String() string { return hex.EncodeToString(id[:]) }

View file

@ -0,0 +1,144 @@
package turbotunnel
import (
"container/heap"
"net"
"sync"
"time"
)
// clientRecord is a record of a recently seen client, with the time it was last
// seen and a send queue.
type clientRecord struct {
Addr net.Addr
LastSeen time.Time
SendQueue chan []byte
}
// ClientMap manages a mapping of live clients (keyed by address, which will be
// a ClientID) to their respective send queues. ClientMap's functions are safe
// to call from multiple goroutines.
type ClientMap struct {
// We use an inner structure to avoid exposing public heap.Interface
// functions to users of clientMap.
inner clientMapInner
// Synchronizes access to inner.
lock sync.Mutex
}
// NewClientMap creates a ClientMap that expires clients after a timeout.
//
// The timeout does not have to be kept in sync with QUIC's internal idle
// timeout. If a client is removed from the client map while the QUIC session is
// still live, the worst that can happen is a loss of whatever packets were in
// the send queue at the time. If QUIC later decides to send more packets to the
// same client, we'll instantiate a new send queue, and if the client ever
// connects again with the proper client ID, we'll deliver them.
func NewClientMap(timeout time.Duration) *ClientMap {
m := &ClientMap{
inner: clientMapInner{
byAge: make([]*clientRecord, 0),
byAddr: make(map[net.Addr]int),
},
}
go func() {
for {
time.Sleep(timeout / 2)
now := time.Now()
m.lock.Lock()
m.inner.removeExpired(now, timeout)
m.lock.Unlock()
}
}()
return m
}
// SendQueue returns the send queue corresponding to addr, creating it if
// necessary.
func (m *ClientMap) SendQueue(addr net.Addr) chan []byte {
m.lock.Lock()
defer m.lock.Unlock()
return m.inner.SendQueue(addr, time.Now())
}
// clientMapInner is the inner type of ClientMap, implementing heap.Interface.
// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
// expiring old client records. byAddr is a map from addresses (i.e., ClientIDs)
// to heap indices, to allow looking up by address. Unlike ClientMap,
// clientMapInner requires external synchonization.
type clientMapInner struct {
byAge []*clientRecord
byAddr map[net.Addr]int
}
// removeExpired removes all client records whose LastSeen timestamp is more
// than timeout in the past.
func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) {
for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
heap.Pop(inner)
}
}
// SendQueue finds the existing client record corresponding to addr, or creates
// a new one if none exists yet. It updates the client record's LastSeen time
// and returns its SendQueue.
func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
var record *clientRecord
i, ok := inner.byAddr[addr]
if ok {
// Found one, update its LastSeen.
record = inner.byAge[i]
record.LastSeen = now
heap.Fix(inner, i)
} else {
// Not found, create a new one.
record = &clientRecord{
Addr: addr,
LastSeen: now,
SendQueue: make(chan []byte, queueSize),
}
heap.Push(inner, record)
}
return record.SendQueue
}
// heap.Interface for clientMapInner.
func (inner *clientMapInner) Len() int {
if len(inner.byAge) != len(inner.byAddr) {
panic("inconsistent clientMap")
}
return len(inner.byAge)
}
func (inner *clientMapInner) Less(i, j int) bool {
return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
}
func (inner *clientMapInner) Swap(i, j int) {
inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
inner.byAddr[inner.byAge[i].Addr] = i
inner.byAddr[inner.byAge[j].Addr] = j
}
func (inner *clientMapInner) Push(x interface{}) {
record := x.(*clientRecord)
if _, ok := inner.byAddr[record.Addr]; ok {
panic("duplicate address in clientMap")
}
// Insert into byAddr map.
inner.byAddr[record.Addr] = len(inner.byAge)
// Insert into byAge slice.
inner.byAge = append(inner.byAge, record)
}
func (inner *clientMapInner) Pop() interface{} {
n := len(inner.byAddr)
// Remove from byAge slice.
record := inner.byAge[n-1]
inner.byAge[n-1] = nil
inner.byAge = inner.byAge[:n-1]
// Remove from byAddr map.
delete(inner.byAddr, record.Addr)
return record
}

View file

@ -0,0 +1,13 @@
// Package turbotunnel provides support for overlaying a virtual net.PacketConn
// on some other network carrier.
//
// https://github.com/net4people/bbs/issues/9
package turbotunnel
import "errors"
// The size of receive and send queues.
const queueSize = 32
var errClosedPacketConn = errors.New("operation on closed connection")
var errNotImplemented = errors.New("not implemented")

View file

@ -0,0 +1,137 @@
package turbotunnel
import (
"net"
"sync"
"sync/atomic"
"time"
)
// taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
// return type of PacketConn.ReadFrom.
type taggedPacket struct {
P []byte
Addr net.Addr
}
// QueuePacketConn implements net.PacketConn by storing queues of packets. There
// is one incoming queue (where packets are additionally tagged by the source
// address of the client that sent them). There are many outgoing queues, one
// for each client address that has been recently seen. The QueueIncoming method
// inserts a packet into the incoming queue, to eventually be returned by
// ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
// which can later by accessed through the OutgoingQueue method.
type QueuePacketConn struct {
clients *ClientMap
localAddr net.Addr
recvQueue chan taggedPacket
closeOnce sync.Once
closed chan struct{}
// What error to return when the QueuePacketConn is closed.
err atomic.Value
}
// NewQueuePacketConn makes a new QueuePacketConn, set to track recent clients
// for at least a duration of timeout.
func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
return &QueuePacketConn{
clients: NewClientMap(timeout),
localAddr: localAddr,
recvQueue: make(chan taggedPacket, queueSize),
closed: make(chan struct{}),
}
}
// QueueIncoming queues and incoming packet and its source address, to be
// returned in a future call to ReadFrom.
func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
select {
case <-c.closed:
// If we're closed, silently drop it.
return
default:
}
// Copy the slice so that the caller may reuse it.
buf := make([]byte, len(p))
copy(buf, p)
select {
case c.recvQueue <- taggedPacket{buf, addr}:
default:
// Drop the incoming packet if the receive queue is full.
}
}
// OutgoingQueue returns the queue of outgoing packets corresponding to addr,
// creating it if necessary. The contents of the queue will be packets that are
// written to the address in question using WriteTo.
func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
return c.clients.SendQueue(addr)
}
// ReadFrom returns a packet and address previously stored by QueueIncoming.
func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
select {
case <-c.closed:
return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
default:
}
select {
case <-c.closed:
return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
case packet := <-c.recvQueue:
return copy(p, packet.P), packet.Addr, nil
}
}
// WriteTo queues an outgoing packet for the given address. The queue can later
// be retrieved using the OutgoingQueue method.
func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
select {
case <-c.closed:
return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
default:
}
// Copy the slice so that the caller may reuse it.
buf := make([]byte, len(p))
copy(buf, p)
select {
case c.clients.SendQueue(addr) <- buf:
return len(buf), nil
default:
// Drop the outgoing packet if the send queue is full.
return len(buf), nil
}
}
// closeWithError unblocks pending operations and makes future operations fail
// with the given error. If err is nil, it becomes errClosedPacketConn.
func (c *QueuePacketConn) closeWithError(err error) error {
var newlyClosed bool
c.closeOnce.Do(func() {
newlyClosed = true
// Store the error to be returned by future PacketConn
// operations.
if err == nil {
err = errClosedPacketConn
}
c.err.Store(err)
close(c.closed)
})
if !newlyClosed {
return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
}
return nil
}
// Close unblocks pending operations and makes future operations fail with a
// "closed connection" error.
func (c *QueuePacketConn) Close() error {
return c.closeWithError(nil)
}
// LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
func (c *QueuePacketConn) LocalAddr() net.Addr { return c.localAddr }
func (c *QueuePacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
func (c *QueuePacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
func (c *QueuePacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }

View file

@ -0,0 +1,204 @@
package turbotunnel
import (
"context"
"errors"
"net"
"sync"
"sync/atomic"
"time"
)
// RedialPacketConn implements a long-lived net.PacketConn atop a sequence of
// other, transient net.PacketConns. RedialPacketConn creates a new
// net.PacketConn by calling a provided dialContext function. Whenever the
// net.PacketConn experiences a ReadFrom or WriteTo error, RedialPacketConn
// calls the dialContext function again and starts sending and receiving packets
// on the new net.PacketConn. RedialPacketConn's own ReadFrom and WriteTo
// methods return an error only when the dialContext function returns an error.
//
// RedialPacketConn uses static local and remote addresses that are independent
// of those of any dialed net.PacketConn.
type RedialPacketConn struct {
localAddr net.Addr
remoteAddr net.Addr
dialContext func(context.Context) (net.PacketConn, error)
recvQueue chan []byte
sendQueue chan []byte
closed chan struct{}
closeOnce sync.Once
// The first dial error, which causes the clientPacketConn to be
// closed and is returned from future read/write operations. Compare to
// the rerr and werr in io.Pipe.
err atomic.Value
}
// NewQueuePacketConn makes a new RedialPacketConn, with the given static local
// and remote addresses, and dialContext function.
func NewRedialPacketConn(
localAddr, remoteAddr net.Addr,
dialContext func(context.Context) (net.PacketConn, error),
) *RedialPacketConn {
c := &RedialPacketConn{
localAddr: localAddr,
remoteAddr: remoteAddr,
dialContext: dialContext,
recvQueue: make(chan []byte, queueSize),
sendQueue: make(chan []byte, queueSize),
closed: make(chan struct{}),
err: atomic.Value{},
}
go c.dialLoop()
return c
}
// dialLoop repeatedly calls c.dialContext and passes the resulting
// net.PacketConn to c.exchange. It returns only when c is closed or dialContext
// returns an error.
func (c *RedialPacketConn) dialLoop() {
ctx, cancel := context.WithCancel(context.Background())
for {
select {
case <-c.closed:
cancel()
return
default:
}
conn, err := c.dialContext(ctx)
if err != nil {
c.closeWithError(err)
cancel()
return
}
c.exchange(conn)
conn.Close()
}
}
// exchange calls ReadFrom on the given net.PacketConn and places the resulting
// packets in the receive queue, and takes packets from the send queue and calls
// WriteTo on them, making the current net.PacketConn active.
func (c *RedialPacketConn) exchange(conn net.PacketConn) {
readErrCh := make(chan error)
writeErrCh := make(chan error)
go func() {
defer close(readErrCh)
for {
select {
case <-c.closed:
return
case <-writeErrCh:
return
default:
}
var buf [1500]byte
n, _, err := conn.ReadFrom(buf[:])
if err != nil {
readErrCh <- err
return
}
p := make([]byte, n)
copy(p, buf[:])
select {
case c.recvQueue <- p:
default: // OK to drop packets.
}
}
}()
go func() {
defer close(writeErrCh)
for {
select {
case <-c.closed:
return
case <-readErrCh:
return
case p := <-c.sendQueue:
_, err := conn.WriteTo(p, c.remoteAddr)
if err != nil {
writeErrCh <- err
return
}
}
}
}()
select {
case <-readErrCh:
case <-writeErrCh:
}
}
// ReadFrom reads a packet from the currently active net.PacketConn. The
// packet's original remote address is replaced with the RedialPacketConn's own
// remote address.
func (c *RedialPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
select {
case <-c.closed:
return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Source: c.LocalAddr(), Addr: c.remoteAddr, Err: c.err.Load().(error)}
default:
}
select {
case <-c.closed:
return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Source: c.LocalAddr(), Addr: c.remoteAddr, Err: c.err.Load().(error)}
case buf := <-c.recvQueue:
return copy(p, buf), c.remoteAddr, nil
}
}
// WriteTo writes a packet to the currently active net.PacketConn. The addr
// argument is ignored and instead replaced with the RedialPacketConn's own
// remote address.
func (c *RedialPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
// addr is ignored.
select {
case <-c.closed:
return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Source: c.LocalAddr(), Addr: c.remoteAddr, Err: c.err.Load().(error)}
default:
}
buf := make([]byte, len(p))
copy(buf, p)
select {
case c.sendQueue <- buf:
return len(buf), nil
default:
// Drop the outgoing packet if the send queue is full.
return len(buf), nil
}
}
// closeWithError unblocks pending operations and makes future operations fail
// with the given error. If err is nil, it becomes errClosedPacketConn.
func (c *RedialPacketConn) closeWithError(err error) error {
var once bool
c.closeOnce.Do(func() {
// Store the error to be returned by future read/write
// operations.
if err == nil {
err = errors.New("operation on closed connection")
}
c.err.Store(err)
close(c.closed)
once = true
})
if !once {
return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
}
return nil
}
// Close unblocks pending operations and makes future operations fail with a
// "closed connection" error.
func (c *RedialPacketConn) Close() error {
return c.closeWithError(nil)
}
// LocalAddr returns the localAddr value that was passed to NewRedialPacketConn.
func (c *RedialPacketConn) LocalAddr() net.Addr { return c.localAddr }
func (c *RedialPacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
func (c *RedialPacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
func (c *RedialPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }