mirror of
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake.git
synced 2025-10-13 11:11:30 -04:00
Bug #21314: maintains a separate snowflake connect loop per SOCKS connection. This way, if Tor decides to stop using Snowflake, Snowflake will stop using the client's network.
216 lines
6.1 KiB
Go
216 lines
6.1 KiB
Go
package lib
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
|
|
"github.com/xtaci/kcp-go/v5"
|
|
"github.com/xtaci/smux"
|
|
)
|
|
|
|
const (
|
|
ReconnectTimeout = 10 * time.Second
|
|
SnowflakeTimeout = 20 * time.Second
|
|
// How long to wait for the OnOpen callback on a DataChannel.
|
|
DataChannelTimeout = 10 * time.Second
|
|
)
|
|
|
|
type dummyAddr struct{}
|
|
|
|
func (addr dummyAddr) Network() string { return "dummy" }
|
|
func (addr dummyAddr) String() string { return "dummy" }
|
|
|
|
// newSession returns a new smux.Session and the net.PacketConn it is running
|
|
// over. The net.PacketConn successively connects through Snowflake proxies
|
|
// pulled from snowflakes.
|
|
func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
|
|
clientID := turbotunnel.NewClientID()
|
|
|
|
// We build a persistent KCP session on a sequence of ephemeral WebRTC
|
|
// connections. This dialContext tells RedialPacketConn how to get a new
|
|
// WebRTC connection when the previous one dies. Inside each WebRTC
|
|
// connection, we use EncapsulationPacketConn to encode packets into a
|
|
// stream.
|
|
dialContext := func(ctx context.Context) (net.PacketConn, error) {
|
|
log.Printf("redialing on same connection")
|
|
// Obtain an available WebRTC remote. May block.
|
|
conn := snowflakes.Pop()
|
|
if conn == nil {
|
|
return nil, errors.New("handler: Received invalid Snowflake")
|
|
}
|
|
log.Println("---- Handler: snowflake assigned ----")
|
|
// Send the magic Turbo Tunnel token.
|
|
_, err := conn.Write(turbotunnel.Token[:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Send ClientID prefix.
|
|
_, err = conn.Write(clientID[:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
|
|
}
|
|
pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
|
|
|
|
// conn is built on the underlying RedialPacketConn—when one WebRTC
|
|
// connection dies, another one will be found to take its place. The
|
|
// sequence of packets across multiple WebRTC connections drives the KCP
|
|
// engine.
|
|
conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
|
|
if err != nil {
|
|
pconn.Close()
|
|
return nil, nil, err
|
|
}
|
|
// Permit coalescing the payloads of consecutive sends.
|
|
conn.SetStreamMode(true)
|
|
// Disable the dynamic congestion window (limit only by the
|
|
// maximum of local and remote static windows).
|
|
conn.SetNoDelay(
|
|
0, // default nodelay
|
|
0, // default interval
|
|
0, // default resend
|
|
1, // nc=1 => congestion window off
|
|
)
|
|
// On the KCP connection we overlay an smux session and stream.
|
|
smuxConfig := smux.DefaultConfig()
|
|
smuxConfig.Version = 2
|
|
smuxConfig.KeepAliveTimeout = 10 * time.Minute
|
|
sess, err := smux.Client(conn, smuxConfig)
|
|
if err != nil {
|
|
conn.Close()
|
|
pconn.Close()
|
|
return nil, nil, err
|
|
}
|
|
|
|
return pconn, sess, err
|
|
}
|
|
|
|
// sessionManager_ maintains a single global smux.Session that is shared among
|
|
// incoming SOCKS connections.
|
|
type sessionManager_ struct {
|
|
mutex sync.Mutex
|
|
sess *smux.Session
|
|
}
|
|
|
|
// Get creates and returns a new global smux.Session if none exists yet. If one
|
|
// already exists, it returns the existing one. It monitors the returned session
|
|
// and if it ever fails, sets things up so the next call to Get will create a
|
|
// new session.
|
|
func (manager *sessionManager_) Get(snowflakes SnowflakeCollector) (*smux.Session, error) {
|
|
manager.mutex.Lock()
|
|
defer manager.mutex.Unlock()
|
|
|
|
if manager.sess == nil {
|
|
log.Printf("starting a new session")
|
|
pconn, sess, err := newSession(snowflakes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
manager.sess = sess
|
|
go func() {
|
|
// If the session dies, set it to be recreated.
|
|
for {
|
|
<-time.After(5 * time.Second)
|
|
if sess.IsClosed() {
|
|
break
|
|
}
|
|
}
|
|
log.Printf("discarding finished session")
|
|
// Close the underlying to force any ongoing WebRTC
|
|
// connection to close as well, and relinquish the
|
|
// SnowflakeCollector.
|
|
pconn.Close()
|
|
manager.mutex.Lock()
|
|
manager.sess = nil
|
|
manager.mutex.Unlock()
|
|
}()
|
|
} else {
|
|
log.Printf("reusing the existing session")
|
|
}
|
|
|
|
return manager.sess, nil
|
|
}
|
|
|
|
var sessionManager = sessionManager_{}
|
|
|
|
// Given an accepted SOCKS connection, establish a WebRTC connection to the
|
|
// remote peer and exchange traffic.
|
|
func Handler(socks net.Conn, tongue Tongue) error {
|
|
// Prepare to collect remote WebRTC peers.
|
|
snowflakes := NewPeers(1)
|
|
snowflakes.Tongue = tongue
|
|
|
|
// Use a real logger to periodically output how much traffic is happening.
|
|
snowflakes.BytesLogger = NewBytesSyncLogger()
|
|
|
|
log.Printf("---- Handler: begin collecting snowflakes ---")
|
|
go connectLoop(snowflakes)
|
|
|
|
// Return the global smux.Session.
|
|
sess, err := sessionManager.Get(snowflakes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// On the smux session we overlay a stream.
|
|
stream, err := sess.OpenStream()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer stream.Close()
|
|
|
|
// Begin exchanging data.
|
|
log.Printf("---- Handler: begin stream %v ---", stream.ID())
|
|
copyLoop(socks, stream)
|
|
log.Printf("---- Handler: closed stream %v ---", stream.ID())
|
|
snowflakes.End()
|
|
log.Printf("---- Handler: end collecting snowflakes ---")
|
|
return nil
|
|
}
|
|
|
|
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
|
|
// transfer to the Tor SOCKS handler when needed.
|
|
func connectLoop(snowflakes SnowflakeCollector) {
|
|
for {
|
|
// Check if ending is necessary.
|
|
_, err := snowflakes.Collect()
|
|
if err != nil {
|
|
log.Printf("WebRTC: %v Retrying in %v...",
|
|
err, ReconnectTimeout)
|
|
}
|
|
select {
|
|
case <-time.After(ReconnectTimeout):
|
|
continue
|
|
case <-snowflakes.Melted():
|
|
log.Println("ConnectLoop: stopped.")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Exchanges bytes between two ReadWriters.
|
|
// (In this case, between a SOCKS connection and smux stream.)
|
|
func copyLoop(socks, stream io.ReadWriter) {
|
|
done := make(chan struct{}, 2)
|
|
go func() {
|
|
if _, err := io.Copy(socks, stream); err != nil {
|
|
log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
|
|
}
|
|
done <- struct{}{}
|
|
}()
|
|
go func() {
|
|
if _, err := io.Copy(stream, socks); err != nil {
|
|
log.Printf("copying SOCKS to stream resulted in error: %v", err)
|
|
}
|
|
done <- struct{}{}
|
|
}()
|
|
<-done
|
|
log.Println("copy loop ended")
|
|
}
|