USERADDR support for turbotunnel sessions.

The difficulty here is that the whole point of turbotunnel sessions is
that they are not necessarily tied to a single WebSocket connection, nor
even a single client IP address. We use a heuristic: whenever a
WebSocket connection starts that has a new ClientID, we store a mapping
from that ClientID to the IP address attached to the WebSocket
connection in a lookup table. Later, when enough packets have arrived to
establish a turbotunnel session, we recover the ClientID associated with
the session (which kcp-go has stored in the RemoteAddr field), and look
it up in the table to get an IP address. We introduce a new data type,
clientIDMap, to store the clientID-to-IP mapping during the short time
between when a WebSocket connection starts and handleSession receives a
fully fledged KCP session.
This commit is contained in:
David Fifield 2020-02-04 22:27:58 -07:00
parent 70126177fb
commit 0790954020
3 changed files with 244 additions and 7 deletions

View file

@ -43,6 +43,11 @@ const requestTimeout = 10 * time.Second
// indefinitely.
const clientMapTimeout = 1 * time.Minute
// How big to make the map of ClientIDs to IP addresses. The map is used in
// turbotunnelMode to store a reasonable IP address for a client session that
// may outlive any single WebSocket connection.
const clientIDAddrMapCapacity = 1024
// How long to wait for ListenAndServe or ListenAndServeTLS to return an error
// before deciding that it's not going to return.
const listenAndServeErrorTimeout = 100 * time.Millisecond
@ -114,6 +119,15 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
// clientIDAddrMap stores short-term mappings from ClientIDs to IP addresses.
// When we call pt.DialOr, tor wants us to provide a USERADDR string that
// represents the remote IP address of the client (for metrics purposes, etc.).
// This data structure bridges the gap between ServeHTTP, which knows about IP
// addresses, and handleStream, which is what calls pt.DialOr. The common piece
// of information linking both ends of the chain is the ClientID, which is
// attached to the WebSocket connection and every session.
var clientIDAddrMap = newClientIDMap(clientIDAddrMapCapacity)
// overrideReadConn is a net.Conn with an overridden Read method. Compare to
// recordingConn at
// https://dave.cheney.net/2015/05/22/struct-composition-with-go.
@ -203,8 +217,16 @@ func turbotunnelMode(conn net.Conn, addr string, pconn *turbotunnel.QueuePacketC
return fmt.Errorf("reading ClientID: %v", err)
}
// TODO: ClientID-to-client_ip address mapping
// Peek at the first read packet to get the KCP conv ID.
// Store a a short-term mapping from the ClientID to the client IP
// address attached to this WebSocket connection. tor will want us to
// provide a client IP address when we call pt.DialOr. But a KCP session
// does not necessarily correspond to any single IP address--it's
// composed of packets that are carried in possibly multiple WebSocket
// streams. We apply the heuristic that the IP address of the most
// recent WebSocket connection that has had to do with a session, at the
// time the session is established, is the IP address that should be
// credited for the entire KCP session.
clientIDAddrMap.Set(clientID, addr)
errCh := make(chan error)
@ -249,10 +271,9 @@ func turbotunnelMode(conn net.Conn, addr string, pconn *turbotunnel.QueuePacketC
}
// handleStream bidirectionally connects a client stream with the ORPort.
func handleStream(stream net.Conn) error {
// TODO: This is where we need to provide the client IP address.
statsChannel <- false
or, err := pt.DialOr(&ptInfo, "", ptMethodName)
func handleStream(stream net.Conn, addr string) error {
statsChannel <- addr != ""
or, err := pt.DialOr(&ptInfo, addr, ptMethodName)
if err != nil {
return fmt.Errorf("connecting to ORPort: %v", err)
}
@ -266,6 +287,17 @@ func handleStream(stream net.Conn) error {
// acceptStreams layers an smux.Session on the KCP connection and awaits streams
// on it. Passes each stream to handleStream.
func acceptStreams(conn *kcp.UDPSession) error {
// Look up the IP address associated with this KCP session, via the
// ClientID that is returned by the session's RemoteAddr method.
addr, ok := clientIDAddrMap.Get(conn.RemoteAddr().(turbotunnel.ClientID))
if !ok {
// This means that the map is tending to run over capacity, not
// just that there was not client_ip on the incoming connection.
// We store "" in the map in the absence of client_ip. This log
// message means you should increase clientIDAddrMapCapacity.
log.Printf("no address in clientID-to-IP map (capacity %d)", clientIDAddrMapCapacity)
}
smuxConfig := smux.DefaultConfig()
smuxConfig.Version = 2
smuxConfig.KeepAliveTimeout = 10 * time.Minute
@ -273,6 +305,7 @@ func acceptStreams(conn *kcp.UDPSession) error {
if err != nil {
return err
}
for {
stream, err := sess.AcceptStream()
if err != nil {
@ -283,7 +316,7 @@ func acceptStreams(conn *kcp.UDPSession) error {
}
go func() {
defer stream.Close()
err := handleStream(stream)
err := handleStream(stream, addr)
if err != nil {
log.Printf("handleStream: %v", err)
}

85
server/turbotunnel.go Normal file
View file

@ -0,0 +1,85 @@
package main
import (
"sync"
"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
)
// clientIDMap is a fixed-capacity mapping from ClientIDs to address strings.
// Adding a new entry using the Set method causes the oldest existing entry to
// be forgotten.
//
// This data type is meant to be used to remember the IP address associated with
// a ClientID, during the short period of time between when a WebSocket
// connection with that ClientID began, and when a KCP session is established.
//
// The design requirements of this type are that it needs to remember a mapping
// for only a short time, and old entries should expire so as not to consume
// unbounded memory. It is not a critical error if an entry is forgotten before
// it is needed; better to forget entries than to use too much memory.
type clientIDMap struct {
lock sync.Mutex
// entries is a circular buffer of (ClientID, addr) pairs.
entries []struct {
clientID turbotunnel.ClientID
addr string
}
// oldest is the index of the oldest member of the entries buffer, the
// one that will be overwritten at the next call to Set.
oldest int
// current points to the index of the most recent entry corresponding to
// each ClientID.
current map[turbotunnel.ClientID]int
}
// newClientIDMap makes a new clientIDMap with the given capacity.
func newClientIDMap(capacity int) *clientIDMap {
return &clientIDMap{
entries: make([]struct {
clientID turbotunnel.ClientID
addr string
}, capacity),
oldest: 0,
current: make(map[turbotunnel.ClientID]int),
}
}
// Set adds a mapping from clientID to addr, replacing any previous mapping for
// clientID. It may also cause the clientIDMap to forget at most one other
// mapping, the oldest one.
func (m *clientIDMap) Set(clientID turbotunnel.ClientID, addr string) {
m.lock.Lock()
defer m.lock.Unlock()
if len(m.entries) == 0 {
// The invariant m.oldest < len(m.entries) does not hold in this
// special case.
return
}
// m.oldest is the index of the entry we're about to overwrite. If it's
// the current entry for any ClientID, we need to delete that clientID
// from the current map (that ClientID is now forgotten).
if i, ok := m.current[m.entries[m.oldest].clientID]; ok && i == m.oldest {
delete(m.current, m.entries[m.oldest].clientID)
}
// Overwrite the oldest entry.
m.entries[m.oldest].clientID = clientID
m.entries[m.oldest].addr = addr
// Add the overwritten entry to the quick-lookup map.
m.current[clientID] = m.oldest
// What was the oldest entry is now the newest.
m.oldest = (m.oldest + 1) % len(m.entries)
}
// Get returns a previously stored mapping. The second return value indicates
// whether clientID was actually present in the map. If it is false, then the
// returned address string will be "".
func (m *clientIDMap) Get(clientID turbotunnel.ClientID) (string, bool) {
m.lock.Lock()
defer m.lock.Unlock()
if i, ok := m.current[clientID]; ok {
return m.entries[i].addr, true
} else {
return "", false
}
}

119
server/turbotunnel_test.go Normal file
View file

@ -0,0 +1,119 @@
package main
import (
"encoding/binary"
"testing"
"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
)
func TestClientIDMap(t *testing.T) {
// Convert a uint64 into a ClientID.
id := func(n uint64) turbotunnel.ClientID {
var clientID turbotunnel.ClientID
binary.PutUvarint(clientID[:], n)
return clientID
}
// Does m.Get(key) and checks that the output matches what is expected.
expectGet := func(m *clientIDMap, clientID turbotunnel.ClientID, expectedAddr string, expectedOK bool) {
t.Helper()
addr, ok := m.Get(clientID)
if addr != expectedAddr || ok != expectedOK {
t.Errorf("expected (%+q, %v), got (%+q, %v)", expectedAddr, expectedOK, addr, ok)
}
}
// Checks that the len of m.current is as expected.
expectSize := func(m *clientIDMap, expectedLen int) {
t.Helper()
if len(m.current) != expectedLen {
t.Errorf("expected map len %d, got %d %+v", expectedLen, len(m.current), m.current)
}
}
// Zero-capacity map can't remember anything.
{
m := newClientIDMap(0)
expectSize(m, 0)
expectGet(m, id(0), "", false)
expectGet(m, id(1234), "", false)
m.Set(id(0), "A")
expectSize(m, 0)
expectGet(m, id(0), "", false)
expectGet(m, id(1234), "", false)
m.Set(id(1234), "A")
expectSize(m, 0)
expectGet(m, id(0), "", false)
expectGet(m, id(1234), "", false)
}
{
m := newClientIDMap(1)
expectSize(m, 0)
expectGet(m, id(0), "", false)
expectGet(m, id(1), "", false)
m.Set(id(0), "A")
expectSize(m, 1)
expectGet(m, id(0), "A", true)
expectGet(m, id(1), "", false)
m.Set(id(1), "B") // forgets the (0, "A") entry
expectSize(m, 1)
expectGet(m, id(0), "", false)
expectGet(m, id(1), "B", true)
m.Set(id(1), "C") // forgets the (1, "B") entry
expectSize(m, 1)
expectGet(m, id(0), "", false)
expectGet(m, id(1), "C", true)
}
{
m := newClientIDMap(5)
m.Set(id(0), "A")
m.Set(id(1), "B")
m.Set(id(2), "C")
m.Set(id(0), "D") // shadows the (0, "D") entry
m.Set(id(3), "E")
expectSize(m, 4)
expectGet(m, id(0), "D", true)
expectGet(m, id(1), "B", true)
expectGet(m, id(2), "C", true)
expectGet(m, id(3), "E", true)
expectGet(m, id(4), "", false)
m.Set(id(4), "F") // forgets the (0, "A") entry but should preserve (0, "D")
expectSize(m, 5)
expectGet(m, id(0), "D", true)
expectGet(m, id(1), "B", true)
expectGet(m, id(2), "C", true)
expectGet(m, id(3), "E", true)
expectGet(m, id(4), "F", true)
m.Set(id(5), "G") // forgets the (1, "B") entry
m.Set(id(0), "H") // forgets the (2, "C") entry and shadows (0, "D")
expectSize(m, 4)
expectGet(m, id(0), "H", true)
expectGet(m, id(1), "", false)
expectGet(m, id(2), "", false)
expectGet(m, id(3), "E", true)
expectGet(m, id(4), "F", true)
expectGet(m, id(5), "G", true)
m.Set(id(0), "I") // forgets the (0, "D") entry and shadows (0, "H")
m.Set(id(0), "J") // forgets the (3, "E") entry and shadows (0, "I")
m.Set(id(0), "K") // forgets the (4, "F") entry and shadows (0, "J")
m.Set(id(0), "L") // forgets the (5, "G") entry and shadows (0, "K")
expectSize(m, 1)
expectGet(m, id(0), "L", true)
expectGet(m, id(1), "", false)
expectGet(m, id(2), "", false)
expectGet(m, id(3), "", false)
expectGet(m, id(4), "", false)
expectGet(m, id(5), "", false)
}
}