diff --git a/client/lib/interfaces.go b/client/lib/interfaces.go index 71426d6..5378f4a 100644 --- a/client/lib/interfaces.go +++ b/client/lib/interfaces.go @@ -7,6 +7,9 @@ import ( // Interface for catching Snowflakes. (aka the remote dialer) type Tongue interface { Catch() (*WebRTCPeer, error) + + // Get the maximum number of snowflakes + GetMax() int } // Interface for collecting some number of Snowflakes, for passing along diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go index ebcf284..5537a52 100644 --- a/client/lib/lib_test.go +++ b/client/lib/lib_test.go @@ -27,13 +27,19 @@ func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) { return r, nil } -type FakeDialer struct{} +type FakeDialer struct { + max int +} func (w FakeDialer) Catch() (*WebRTCPeer, error) { fmt.Println("Caught a dummy snowflake.") return &WebRTCPeer{}, nil } +func (w FakeDialer) GetMax() int { + return w.max +} + type FakeSocksConn struct { net.Conn rejected bool @@ -55,19 +61,19 @@ func TestSnowflakeClient(t *testing.T) { Convey("Peers", t, func() { Convey("Can construct", func() { - p := NewPeers(1) - So(p.capacity, ShouldEqual, 1) + d := &FakeDialer{max: 1} + p, _ := NewPeers(d) + So(p.Tongue.GetMax(), ShouldEqual, 1) So(p.snowflakeChan, ShouldNotBeNil) So(cap(p.snowflakeChan), ShouldEqual, 1) }) Convey("Collecting a Snowflake requires a Tongue.", func() { - p := NewPeers(1) - _, err := p.Collect() + p, err := NewPeers(nil) So(err, ShouldNotBeNil) - So(p.Count(), ShouldEqual, 0) // Set the dialer so that collection is possible. - p.Tongue = FakeDialer{} + d := &FakeDialer{max: 1} + p, err = NewPeers(d) _, err = p.Collect() So(err, ShouldBeNil) So(p.Count(), ShouldEqual, 1) @@ -77,8 +83,7 @@ func TestSnowflakeClient(t *testing.T) { Convey("Collection continues until capacity.", func() { c := 5 - p := NewPeers(c) - p.Tongue = FakeDialer{} + p, _ := NewPeers(FakeDialer{max: c}) // Fill up to capacity. for i := 0; i < c; i++ { fmt.Println("Adding snowflake ", i) @@ -104,8 +109,7 @@ func TestSnowflakeClient(t *testing.T) { }) Convey("Count correctly purges peers marked for deletion.", func() { - p := NewPeers(4) - p.Tongue = FakeDialer{} + p, _ := NewPeers(FakeDialer{max: 5}) p.Collect() p.Collect() p.Collect() @@ -121,7 +125,7 @@ func TestSnowflakeClient(t *testing.T) { Convey("End Closes all peers.", func() { cnt := 5 - p := NewPeers(cnt) + p, _ := NewPeers(FakeDialer{max: cnt}) for i := 0; i < cnt; i++ { p.activePeers.PushBack(&WebRTCPeer{}) } @@ -132,8 +136,7 @@ func TestSnowflakeClient(t *testing.T) { }) Convey("Pop skips over closed peers.", func() { - p := NewPeers(4) - p.Tongue = FakeDialer{} + p, _ := NewPeers(FakeDialer{max: 4}) wc1, _ := p.Collect() wc2, _ := p.Collect() wc3, _ := p.Collect() @@ -157,11 +160,11 @@ func TestSnowflakeClient(t *testing.T) { SkipConvey("Handler Grants correctly", func() { socks := &FakeSocksConn{} - snowflakes := &FakePeers{} + broker := &BrokerChannel{Host: "test"} + d := NewWebRTCDialer(broker, nil, 1) So(socks.rejected, ShouldEqual, false) - snowflakes.toRelease = nil - Handler(socks, snowflakes) + Handler(socks, d) So(socks.rejected, ShouldEqual, true) }) }) @@ -169,14 +172,14 @@ func TestSnowflakeClient(t *testing.T) { Convey("Dialers", t, func() { Convey("Can construct WebRTCDialer.", func() { broker := &BrokerChannel{Host: "test"} - d := NewWebRTCDialer(broker, nil) + d := NewWebRTCDialer(broker, nil, 1) So(d, ShouldNotBeNil) So(d.BrokerChannel, ShouldNotBeNil) So(d.BrokerChannel.Host, ShouldEqual, "test") }) SkipConvey("WebRTCDialer can Catch a snowflake.", func() { broker := &BrokerChannel{Host: "test"} - d := NewWebRTCDialer(broker, nil) + d := NewWebRTCDialer(broker, nil, 1) conn, err := d.Catch() So(conn, ShouldBeNil) So(err, ShouldNotBeNil) diff --git a/client/lib/peers.go b/client/lib/peers.go index f766a66..d864fc8 100644 --- a/client/lib/peers.go +++ b/client/lib/peers.go @@ -24,33 +24,37 @@ type Peers struct { snowflakeChan chan *WebRTCPeer activePeers *list.List - capacity int melt chan struct{} } // Construct a fresh container of remote peers. -func NewPeers(max int) *Peers { - p := &Peers{capacity: max} +func NewPeers(tongue Tongue) (*Peers, error) { + p := &Peers{} // Use buffered go channel to pass snowflakes onwards to the SOCKS handler. - p.snowflakeChan = make(chan *WebRTCPeer, max) + if tongue == nil { + return nil, errors.New("missing Tongue to catch Snowflakes with") + } + p.snowflakeChan = make(chan *WebRTCPeer, tongue.GetMax()) p.activePeers = list.New() p.melt = make(chan struct{}) - return p + p.Tongue = tongue + return p, nil } // As part of |SnowflakeCollector| interface. func (p *Peers) Collect() (*WebRTCPeer, error) { - cnt := p.Count() - s := fmt.Sprintf("Currently at [%d/%d]", cnt, p.capacity) - if cnt >= p.capacity { - return nil, fmt.Errorf("At capacity [%d/%d]", cnt, p.capacity) - } - log.Println("WebRTC: Collecting a new Snowflake.", s) // Engage the Snowflake Catching interface, which must be available. if nil == p.Tongue { return nil, errors.New("missing Tongue to catch Snowflakes with") } + cnt := p.Count() + capacity := p.Tongue.GetMax() + s := fmt.Sprintf("Currently at [%d/%d]", cnt, capacity) + if cnt >= capacity { + return nil, fmt.Errorf("At capacity [%d/%d]", cnt, capacity) + } + log.Println("WebRTC: Collecting a new Snowflake.", s) // BUG: some broker conflict here. connection, err := p.Tongue.Catch() if nil != err { diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go index 37ade35..10853a5 100644 --- a/client/lib/rendezvous.go +++ b/client/lib/rendezvous.go @@ -155,15 +155,18 @@ func (bc *BrokerChannel) SetNATType(NATType string) { type WebRTCDialer struct { *BrokerChannel webrtcConfig *webrtc.Configuration + max int } -func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer) *WebRTCDialer { +func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int) *WebRTCDialer { config := webrtc.Configuration{ ICEServers: iceServers, } + return &WebRTCDialer{ BrokerChannel: broker, webrtcConfig: &config, + max: max, } } @@ -173,3 +176,8 @@ func (w WebRTCDialer) Catch() (*WebRTCPeer, error) { // TODO: [#25596] Consider TURN servers here too. return NewWebRTCPeer(w.webrtcConfig, w.BrokerChannel) } + +// Returns the maximum number of snowflakes to collect +func (w WebRTCDialer) GetMax() int { + return w.max +} diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go index b355c3e..0ba5667 100644 --- a/client/lib/snowflake.go +++ b/client/lib/snowflake.go @@ -142,7 +142,19 @@ var sessionManager = sessionManager_{} // Given an accepted SOCKS connection, establish a WebRTC connection to the // remote peer and exchange traffic. -func Handler(socks net.Conn, snowflakes SnowflakeCollector) error { +func Handler(socks net.Conn, tongue Tongue) error { + // Prepare to collect remote WebRTC peers. + snowflakes, err := NewPeers(tongue) + if err != nil { + return err + } + + // Use a real logger to periodically output how much traffic is happening. + snowflakes.BytesLogger = NewBytesSyncLogger() + + log.Printf("---- Handler: begin collecting snowflakes ---") + go connectLoop(snowflakes) + // Return the global smux.Session. sess, err := sessionManager.Get(snowflakes) if err != nil { @@ -160,9 +172,31 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error { log.Printf("---- Handler: begin stream %v ---", stream.ID()) copyLoop(socks, stream) log.Printf("---- Handler: closed stream %v ---", stream.ID()) + snowflakes.End() + log.Printf("---- Handler: end collecting snowflakes ---") return nil } +// Maintain |SnowflakeCapacity| number of available WebRTC connections, to +// transfer to the Tor SOCKS handler when needed. +func connectLoop(snowflakes SnowflakeCollector) { + for { + // Check if ending is necessary. + _, err := snowflakes.Collect() + if err != nil { + log.Printf("WebRTC: %v Retrying in %v...", + err, ReconnectTimeout) + } + select { + case <-time.After(ReconnectTimeout): + continue + case <-snowflakes.Melted(): + log.Println("ConnectLoop: stopped.") + return + } + } +} + // Exchanges bytes between two ReadWriters. // (In this case, between a SOCKS connection and smux stream.) func copyLoop(socks, stream io.ReadWriter) { diff --git a/client/snowflake.go b/client/snowflake.go index 55bc48e..a1b97fa 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -26,28 +26,8 @@ const ( DefaultSnowflakeCapacity = 1 ) -// Maintain |SnowflakeCapacity| number of available WebRTC connections, to -// transfer to the Tor SOCKS handler when needed. -func ConnectLoop(snowflakes sf.SnowflakeCollector) { - for { - // Check if ending is necessary. - _, err := snowflakes.Collect() - if err != nil { - log.Printf("WebRTC: %v Retrying in %v...", - err, sf.ReconnectTimeout) - } - select { - case <-time.After(sf.ReconnectTimeout): - continue - case <-snowflakes.Melted(): - log.Println("ConnectLoop: stopped.") - return - } - } -} - // Accept local SOCKS connections and pass them to the handler. -func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) { +func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue) { defer ln.Close() for { conn, err := ln.AcceptSocks() @@ -68,7 +48,7 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) { return } - err = sf.Handler(conn, snowflakes) + err = sf.Handler(conn, tongue) if err != nil { log.Printf("handler error: %s", err) return @@ -158,9 +138,6 @@ func main() { log.Printf("url: %v", strings.Join(server.URLs, " ")) } - // Prepare to collect remote WebRTC peers. - snowflakes := sf.NewPeers(*max) - // Use potentially domain-fronting broker to rendezvous. broker, err := sf.NewBrokerChannel( *brokerURL, *frontDomain, sf.CreateBrokerTransport(), @@ -170,12 +147,8 @@ func main() { } go updateNATType(iceServers, broker) - snowflakes.Tongue = sf.NewWebRTCDialer(broker, iceServers) - - // Use a real logger to periodically output how much traffic is happening. - snowflakes.BytesLogger = sf.NewBytesSyncLogger() - - go ConnectLoop(snowflakes) + // Create a new WebRTCDialer to use as the |Tongue| to catch snowflakes + dialer := sf.NewWebRTCDialer(broker, iceServers, *max) // Begin goptlib client process. ptInfo, err := pt.ClientSetup(nil) @@ -197,7 +170,7 @@ func main() { break } log.Printf("Started SOCKS listener at %v.", ln.Addr()) - go socksAcceptLoop(ln, snowflakes) + go socksAcceptLoop(ln, dialer) pt.Cmethod(methodName, ln.Version(), ln.Addr()) listeners = append(listeners, ln) default: @@ -228,7 +201,6 @@ func main() { for _, ln := range listeners { ln.Close() } - snowflakes.End() log.Println("snowflake is done.") }