Update Snowflake client library documentation

Follow best practices for documenting the exported pieces of the
Snowflake client library.
This commit is contained in:
Cecylia Bocovich 2021-09-09 16:01:38 -04:00
parent 99887cd05d
commit 638ec6c222
8 changed files with 83 additions and 50 deletions

View file

@ -1,23 +1,24 @@
package lib package lib
// Interface for catching Snowflakes. (aka the remote dialer) // Tongue is an interface for catching Snowflakes. (aka the remote dialer)
type Tongue interface { type Tongue interface {
// Catch makes a connection to a new snowflake.
Catch() (*WebRTCPeer, error) Catch() (*WebRTCPeer, error)
// Get the maximum number of snowflakes // GetMax returns the maximum number of snowflakes a client can have.
GetMax() int GetMax() int
} }
// Interface for collecting some number of Snowflakes, for passing along // SnowflakeCollector is an interface for managing a client's collection of snowflakes.
// ultimately to the SOCKS handler.
type SnowflakeCollector interface { type SnowflakeCollector interface {
// Add a Snowflake to the collection. // Collect adds a snowflake to the collection.
// Implementation should decide how to connect and maintain the webRTCConn. // The implementation of Collect should decide how to connect to and maintain
// the connection to the WebRTCPeer.
Collect() (*WebRTCPeer, error) Collect() (*WebRTCPeer, error)
// Remove and return the most available Snowflake from the collection. // Pop removes and returns the most available snowflake from the collection.
Pop() *WebRTCPeer Pop() *WebRTCPeer
// Signal when the collector has stopped collecting. // Melted returns a channel that will signal when the collector has stopped.
Melted() <-chan struct{} Melted() <-chan struct{}
} }

View file

@ -8,7 +8,7 @@ import (
"sync" "sync"
) )
// Container which keeps track of multiple WebRTC remote peers. // Peers is a container that keeps track of multiple WebRTC remote peers.
// Implements |SnowflakeCollector|. // Implements |SnowflakeCollector|.
// //
// Maintaining a set of pre-connected Peers with fresh but inactive datachannels // Maintaining a set of pre-connected Peers with fresh but inactive datachannels
@ -31,7 +31,7 @@ type Peers struct {
collectLock sync.Mutex collectLock sync.Mutex
} }
// Construct a fresh container of remote peers. // NewPeers constructs a fresh container of remote peers.
func NewPeers(tongue Tongue) (*Peers, error) { func NewPeers(tongue Tongue) (*Peers, error) {
p := &Peers{} p := &Peers{}
// Use buffered go channel to pass snowflakes onwards to the SOCKS handler. // Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
@ -45,7 +45,7 @@ func NewPeers(tongue Tongue) (*Peers, error) {
return p, nil return p, nil
} }
// As part of |SnowflakeCollector| interface. // Collect connects to and adds a new remote peer as part of |SnowflakeCollector| interface.
func (p *Peers) Collect() (*WebRTCPeer, error) { func (p *Peers) Collect() (*WebRTCPeer, error) {
// Engage the Snowflake Catching interface, which must be available. // Engage the Snowflake Catching interface, which must be available.
p.collectLock.Lock() p.collectLock.Lock()
@ -76,8 +76,8 @@ func (p *Peers) Collect() (*WebRTCPeer, error) {
return connection, nil return connection, nil
} }
// Pop blocks until an available, valid snowflake appears. Returns nil after End // Pop blocks until an available, valid snowflake appears.
// has been called. // Pop will return nil after End has been called.
func (p *Peers) Pop() *WebRTCPeer { func (p *Peers) Pop() *WebRTCPeer {
for { for {
snowflake, ok := <-p.snowflakeChan snowflake, ok := <-p.snowflakeChan
@ -93,12 +93,13 @@ func (p *Peers) Pop() *WebRTCPeer {
} }
} }
// As part of |SnowflakeCollector| interface. // Melted returns a channel that will close when peers stop being collected.
// Melted is a necessary part of |SnowflakeCollector| interface.
func (p *Peers) Melted() <-chan struct{} { func (p *Peers) Melted() <-chan struct{} {
return p.melt return p.melt
} }
// Returns total available Snowflakes (including the active one) // Count returns the total available Snowflakes (including the active ones)
// The count only reduces when connections themselves close, rather than when // The count only reduces when connections themselves close, rather than when
// they are popped. // they are popped.
func (p *Peers) Count() int { func (p *Peers) Count() int {
@ -118,7 +119,8 @@ func (p *Peers) purgeClosedPeers() {
} }
} }
// Close all Peers contained here. // End closes all active connections to Peers contained here, and stops the
// collection of future Peers.
func (p *Peers) End() { func (p *Peers) End() {
close(p.melt) close(p.melt)
p.collectLock.Lock() p.collectLock.Lock()

View file

@ -1,10 +1,5 @@
// WebRTC rendezvous requires the exchange of SessionDescriptions between // WebRTC rendezvous requires the exchange of SessionDescriptions between
// peers in order to establish a PeerConnection. // peers in order to establish a PeerConnection.
//
// This file contains the one method currently available to Snowflake:
//
// - Domain-fronted HTTP signaling. The Broker automatically exchange offers
// and answers between this client and some remote WebRTC proxy.
package lib package lib
@ -22,7 +17,7 @@ import (
) )
const ( const (
BrokerErrorUnexpected string = "Unexpected error, no answer." brokerErrorUnexpected string = "Unexpected error, no answer."
readLimit = 100000 //Maximum number of bytes to be read from an HTTP response readLimit = 100000 //Maximum number of bytes to be read from an HTTP response
) )
@ -55,7 +50,7 @@ func createBrokerTransport() http.RoundTripper {
return transport return transport
} }
// Construct a new BrokerChannel, where: // NewBrokerChannel construct a new BrokerChannel, where:
// |broker| is the full URL of the facilitating program which assigns proxies // |broker| is the full URL of the facilitating program which assigns proxies
// to clients, and |front| is the option fronting domain. // to clients, and |front| is the option fronting domain.
func NewBrokerChannel(broker, ampCache, front string, keepLocalAddresses bool) (*BrokerChannel, error) { func NewBrokerChannel(broker, ampCache, front string, keepLocalAddresses bool) (*BrokerChannel, error) {
@ -85,10 +80,8 @@ func NewBrokerChannel(broker, ampCache, front string, keepLocalAddresses bool) (
}, nil }, nil
} }
// Roundtrip HTTP POST using WebRTC SessionDescriptions. // Negotiate uses a RendezvousMethod to send the client's WebRTC SDP offer
// // and receive a snowflake proxy WebRTC SDP answer in return.
// Send an SDP offer to the broker, which assigns a proxy and responds
// with an SDP answer from a designated remote WebRTC peer.
func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
*webrtc.SessionDescription, error) { *webrtc.SessionDescription, error) {
// Ideally, we could specify an `RTCIceTransportPolicy` that would handle // Ideally, we could specify an `RTCIceTransportPolicy` that would handle
@ -135,6 +128,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
return util.DeserializeSessionDescription(resp.Answer) return util.DeserializeSessionDescription(resp.Answer)
} }
// SetNATType sets the NAT type of the client so we can send it to the WebRTC broker.
func (bc *BrokerChannel) SetNATType(NATType string) { func (bc *BrokerChannel) SetNATType(NATType string) {
bc.lock.Lock() bc.lock.Lock()
bc.natType = NATType bc.natType = NATType
@ -142,13 +136,14 @@ func (bc *BrokerChannel) SetNATType(NATType string) {
log.Printf("NAT Type: %s", NATType) log.Printf("NAT Type: %s", NATType)
} }
// Implements the |Tongue| interface to catch snowflakes, using BrokerChannel. // WebRTCDialer implements the |Tongue| interface to catch snowflakes, using BrokerChannel.
type WebRTCDialer struct { type WebRTCDialer struct {
*BrokerChannel *BrokerChannel
webrtcConfig *webrtc.Configuration webrtcConfig *webrtc.Configuration
max int max int
} }
// NewWebRTCDialer constructs a new WebRTCDialer.
func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int) *WebRTCDialer { func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int) *WebRTCDialer {
config := webrtc.Configuration{ config := webrtc.Configuration{
ICEServers: iceServers, ICEServers: iceServers,
@ -161,14 +156,14 @@ func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer, max i
} }
} }
// Initialize a WebRTC Connection by signaling through the broker. // Catch initializes a WebRTC Connection by signaling through the BrokerChannel.
func (w WebRTCDialer) Catch() (*WebRTCPeer, error) { func (w WebRTCDialer) Catch() (*WebRTCPeer, error) {
// TODO: [#25591] Fetch ICE server information from Broker. // TODO: [#25591] Fetch ICE server information from Broker.
// TODO: [#25596] Consider TURN servers here too. // TODO: [#25596] Consider TURN servers here too.
return NewWebRTCPeer(w.webrtcConfig, w.BrokerChannel) return NewWebRTCPeer(w.webrtcConfig, w.BrokerChannel)
} }
// Returns the maximum number of snowflakes to collect // GetMax returns the maximum number of snowflakes to collect.
func (w WebRTCDialer) GetMax() int { func (w WebRTCDialer) GetMax() int {
return w.max return w.max
} }

View file

@ -93,7 +93,7 @@ func (r *ampCacheRendezvous) Exchange(encPollReq []byte) ([]byte, error) {
// * If the broker returns a 5xx status, the AMP cache // * If the broker returns a 5xx status, the AMP cache
// translates it to a 404. // translates it to a 404.
// https://amp.dev/documentation/guides-and-tutorials/learn/amp-caches-and-cors/amp-cache-urls/#redirect-%26-error-handling // https://amp.dev/documentation/guides-and-tutorials/learn/amp-caches-and-cors/amp-cache-urls/#redirect-%26-error-handling
return nil, errors.New(BrokerErrorUnexpected) return nil, errors.New(brokerErrorUnexpected)
} }
if _, err := resp.Location(); err == nil { if _, err := resp.Location(); err == nil {
// The Google AMP Cache may return a "silent redirect" with // The Google AMP Cache may return a "silent redirect" with
@ -103,7 +103,7 @@ func (r *ampCacheRendezvous) Exchange(encPollReq []byte) ([]byte, error) {
// follow redirects nor execute JavaScript, but in any case we // follow redirects nor execute JavaScript, but in any case we
// cannot extract information from this response and can only // cannot extract information from this response and can only
// treat it as an error. // treat it as an error.
return nil, errors.New(BrokerErrorUnexpected) return nil, errors.New(brokerErrorUnexpected)
} }
lr := io.LimitReader(resp.Body, readLimit+1) lr := io.LimitReader(resp.Body, readLimit+1)

View file

@ -60,7 +60,7 @@ func (r *httpRendezvous) Exchange(encPollReq []byte) ([]byte, error) {
log.Printf("HTTP rendezvous response: %s", resp.Status) log.Printf("HTTP rendezvous response: %s", resp.Status)
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return nil, errors.New(BrokerErrorUnexpected) return nil, errors.New(brokerErrorUnexpected)
} }
return limitedRead(resp.Body, readLimit) return limitedRead(resp.Body, readLimit)

View file

@ -122,7 +122,7 @@ func TestHTTPRendezvous(t *testing.T) {
answer, err := rend.Exchange(fakeEncPollReq) answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(answer, ShouldBeNil) So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerErrorUnexpected) So(err.Error(), ShouldResemble, brokerErrorUnexpected)
}) })
Convey("httpRendezvous.Exchange fails with error", func() { Convey("httpRendezvous.Exchange fails with error", func() {
@ -243,7 +243,7 @@ func TestAMPCacheRendezvous(t *testing.T) {
answer, err := rend.Exchange(fakeEncPollReq) answer, err := rend.Exchange(fakeEncPollReq)
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(answer, ShouldBeNil) So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerErrorUnexpected) So(err.Error(), ShouldResemble, brokerErrorUnexpected)
}) })
Convey("ampCacheRendezvous.Exchange fails with error", func() { Convey("ampCacheRendezvous.Exchange fails with error", func() {

View file

@ -17,12 +17,21 @@ import (
) )
const ( const (
// ReconnectTimeout is the time a Snowflake client will wait before collecting
// more snowflakes.
ReconnectTimeout = 10 * time.Second ReconnectTimeout = 10 * time.Second
// SnowflakeTimeout is the time a Snowflake client will wait before determining that
// a remote snowflake has been disconnected. If no new messages are sent or received
// in this time period, the client will terminate the connection with the remote
// peer and collect a new snowflake.
SnowflakeTimeout = 20 * time.Second SnowflakeTimeout = 20 * time.Second
// How long to wait for the OnOpen callback on a DataChannel. // DataChannelTimeout is how long the client will wait for the OnOpen callback
// on a newly created DataChannel.
DataChannelTimeout = 10 * time.Second DataChannelTimeout = 10 * time.Second
// WindowSize is the number of packets in the send and receive window of a KCP connection.
WindowSize = 65535 WindowSize = 65535
// StreamSize controls the maximum amount of in flight data between a client and server.
StreamSize = 1048576 //1MB StreamSize = 1048576 //1MB
) )
@ -37,16 +46,31 @@ type Transport struct {
dialer *WebRTCDialer dialer *WebRTCDialer
} }
// ClientConfig defines how the SnowflakeClient will connect to the broker and Snowflake proxies.
type ClientConfig struct { type ClientConfig struct {
// BrokerURL is the full URL of the Snowflake broker that the client will connect to.
BrokerURL string BrokerURL string
// AmpCacheURL is the full URL of a valid AMP cache. A nonzero value indicates
// that AMP cache will be used as the rendezvous method with the broker.
AmpCacheURL string AmpCacheURL string
// FrontDomain is a the full URL of an optional front domain that can be used with either
// the AMP cache or HTTP domain fronting rendezvous method.
FrontDomain string FrontDomain string
// ICEAddresses are a slice of ICE server URLs that will be used for NAT traversal and
// the creation of the client's WebRTC SDP offer.
ICEAddresses []string ICEAddresses []string
// KeepLocalAddresses is an optional setting that will prevent the removal of local or
// invalid addresses from the client's SDP offer. This is useful for local deployments
// and testing.
KeepLocalAddresses bool KeepLocalAddresses bool
// Max is the maximum number of snowflake proxy peers that the client should attempt to
// connect to.
Max int Max int
} }
// Create a new Snowflake transport client that can spawn multiple Snowflake connections. // NewSnowflakeClient creates a new Snowflake transport client that can spawn multiple
// Snowflake connections.
//
// brokerURL and frontDomain are the urls for the broker host and domain fronting host // brokerURL and frontDomain are the urls for the broker host and domain fronting host
// iceAddresses are the STUN/TURN urls needed for WebRTC negotiation // iceAddresses are the STUN/TURN urls needed for WebRTC negotiation
// keepLocalAddresses is a flag to enable sending local network addresses (for testing purposes) // keepLocalAddresses is a flag to enable sending local network addresses (for testing purposes)
@ -82,8 +106,10 @@ func NewSnowflakeClient(config ClientConfig) (*Transport, error) {
return transport, nil return transport, nil
} }
// Create a new Snowflake connection. Starts the collection of snowflakes and returns a // Dial creates a new Snowflake connection.
// smux Stream. // Dial starts the collection of snowflakes and returns a SnowflakeConn that is a
// wrapper around a smux.Stream that will reliably deliver data to a Snowflake
// server through one or more snowflake proxies.
func (t *Transport) Dial() (net.Conn, error) { func (t *Transport) Dial() (net.Conn, error) {
// Cleanup functions to run before returning, in case of an error. // Cleanup functions to run before returning, in case of an error.
var cleanup []func() var cleanup []func()
@ -132,10 +158,12 @@ func (t *Transport) Dial() (net.Conn, error) {
return &SnowflakeConn{Stream: stream, sess: sess, pconn: pconn, snowflakes: snowflakes}, nil return &SnowflakeConn{Stream: stream, sess: sess, pconn: pconn, snowflakes: snowflakes}, nil
} }
// SetRendezvousMethod sets the rendezvous method to the Snowflake broker.
func (t *Transport) SetRendezvousMethod(r RendezvousMethod) { func (t *Transport) SetRendezvousMethod(r RendezvousMethod) {
t.dialer.Rendezvous = r t.dialer.Rendezvous = r
} }
// SnowflakeConn is a reliable connection to a snowflake server that implements net.Conn.
type SnowflakeConn struct { type SnowflakeConn struct {
*smux.Stream *smux.Stream
sess *smux.Session sess *smux.Session
@ -143,6 +171,9 @@ type SnowflakeConn struct {
snowflakes *Peers snowflakes *Peers
} }
// Close closes the connection.
//
// The collection of snowflake proxies for this connection is stopped.
func (conn *SnowflakeConn) Close() error { func (conn *SnowflakeConn) Close() error {
log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID()) log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID())
conn.Stream.Close() conn.Stream.Close()

View file

@ -12,10 +12,9 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
) )
// Remote WebRTC peer. // WebRTCPeer represents a WebRTC connection to a remote snowflake proxy.
// //
// Handles preparation of go-webrtc PeerConnection. Only ever has // Each WebRTCPeer only ever has one DataChannel that is used as the peer's transport.
// one DataChannel.
type WebRTCPeer struct { type WebRTCPeer struct {
id string id string
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
@ -35,7 +34,11 @@ type WebRTCPeer struct {
bytesLogger bytesLogger bytesLogger bytesLogger
} }
// Construct a WebRTC PeerConnection. // NewWebRTCPeer constructs a WebRTC PeerConnection to a snowflake proxy.
//
// The creation of the peer handles the signaling to the Snowflake broker, including
// the exchange of SDP information, the creation of a PeerConnection, and the establishment
// of a DataChannel to the Snowflake proxy.
func NewWebRTCPeer(config *webrtc.Configuration, func NewWebRTCPeer(config *webrtc.Configuration,
broker *BrokerChannel) (*WebRTCPeer, error) { broker *BrokerChannel) (*WebRTCPeer, error) {
connection := new(WebRTCPeer) connection := new(WebRTCPeer)
@ -79,7 +82,7 @@ func (c *WebRTCPeer) Write(b []byte) (int, error) {
return len(b), nil return len(b), nil
} }
//Returns a boolean indicated whether the peer is closed // Closed returns a boolean indicated whether the peer is closed.
func (c *WebRTCPeer) Closed() bool { func (c *WebRTCPeer) Closed() bool {
select { select {
case <-c.closed: case <-c.closed:
@ -89,6 +92,7 @@ func (c *WebRTCPeer) Closed() bool {
return false return false
} }
// Close closes the connection the snowflake proxy.
func (c *WebRTCPeer) Close() error { func (c *WebRTCPeer) Close() error {
c.once.Do(func() { c.once.Do(func() {
close(c.closed) close(c.closed)
@ -225,7 +229,7 @@ func (c *WebRTCPeer) preparePeerConnection(config *webrtc.Configuration) error {
return nil return nil
} }
// Close all channels and transports // cleanup closes all channels and transports
func (c *WebRTCPeer) cleanup() { func (c *WebRTCPeer) cleanup() {
// Close this side of the SOCKS pipe. // Close this side of the SOCKS pipe.
if c.writePipe != nil { // c.writePipe can be nil in tests. if c.writePipe != nil { // c.writePipe can be nil in tests.