client interfaces compose better, remove some globals, test ConnectLoop

This commit is contained in:
Serene Han 2016-05-19 18:06:34 -07:00
parent 00196bbd74
commit 6b8568cc6c
3 changed files with 194 additions and 83 deletions

View file

@ -2,6 +2,7 @@ package main
import ( import (
"bytes" "bytes"
"fmt"
"github.com/keroserene/go-webrtc" "github.com/keroserene/go-webrtc"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
"io/ioutil" "io/ioutil"
@ -48,9 +49,64 @@ func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return r, nil return r, nil
} }
func TestConnect(t *testing.T) { type FakeDialer struct{}
func (w FakeDialer) Catch() (*webRTCConn, error) {
fmt.Println("Caught a dummy snowflake.")
return &webRTCConn{}, nil
}
func TestSnowflakeClient(t *testing.T) {
Convey("Snowflake", t, func() { Convey("Snowflake", t, func() {
webrtcRemotes = make(map[int]*webRTCConn)
Convey("Peers", func() {
Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() {
peers := NewPeers(1)
peers.Tongue = FakeDialer{}
go ConnectLoop(peers)
<-peers.maxedChan
So(peers.Count(), ShouldEqual, 1)
r := <-peers.snowflakeChan
So(r, ShouldNotBeNil)
So(peers.Count(), ShouldEqual, 0)
})
Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() {
peers := NewPeers(3)
peers.Tongue = FakeDialer{}
go ConnectLoop(peers)
<-peers.maxedChan
So(peers.Count(), ShouldEqual, 3)
<-peers.snowflakeChan
<-peers.snowflakeChan
<-peers.snowflakeChan
So(peers.Count(), ShouldEqual, 0)
})
Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() {
peers := NewPeers(3)
peers.Tongue = FakeDialer{}
go ConnectLoop(peers)
<-peers.maxedChan
So(peers.Count(), ShouldEqual, 3)
r := <-peers.snowflakeChan
So(peers.Count(), ShouldEqual, 2)
r.Close()
<-peers.maxedChan
So(peers.Count(), ShouldEqual, 3)
<-peers.snowflakeChan
<-peers.snowflakeChan
<-peers.snowflakeChan
So(peers.Count(), ShouldEqual, 0)
})
})
Convey("WebRTC Connection", func() { Convey("WebRTC Connection", func() {
c := new(webRTCConn) c := new(webRTCConn)
@ -60,17 +116,13 @@ func TestConnect(t *testing.T) {
} }
So(c.buffer.Bytes(), ShouldEqual, nil) So(c.buffer.Bytes(), ShouldEqual, nil)
Convey("Create and remove from WebRTCConn set", func() { Convey("Can construct a WebRTCConn", func() {
So(len(webrtcRemotes), ShouldEqual, 0)
So(remoteIndex, ShouldEqual, 0)
s := NewWebRTCConnection(nil, nil) s := NewWebRTCConnection(nil, nil)
So(s, ShouldNotBeNil) So(s, ShouldNotBeNil)
So(s.index, ShouldEqual, 0) So(s.index, ShouldEqual, 0)
So(len(webrtcRemotes), ShouldEqual, 1) So(s.offerChannel, ShouldNotBeNil)
So(remoteIndex, ShouldEqual, 1) So(s.answerChannel, ShouldNotBeNil)
s.Close() s.Close()
So(len(webrtcRemotes), ShouldEqual, 0)
So(remoteIndex, ShouldEqual, 1)
}) })
Convey("Write buffers when datachannel is nil", func() { Convey("Write buffers when datachannel is nil", func() {
@ -113,9 +165,6 @@ func TestConnect(t *testing.T) {
<-c.reset <-c.reset
}) })
Convey("Connect Loop", func() {
// TODO
})
}) })
}) })
@ -124,14 +173,14 @@ func TestConnect(t *testing.T) {
transport := &MockTransport{http.StatusOK} transport := &MockTransport{http.StatusOK}
fakeOffer := webrtc.DeserializeSessionDescription("test") fakeOffer := webrtc.DeserializeSessionDescription("test")
Convey("BrokerChannel with no front domain", func() { Convey("Construct BrokerChannel with no front domain", func() {
b := NewBrokerChannel("test.broker", "", transport) b := NewBrokerChannel("test.broker", "", transport)
So(b.url, ShouldNotBeNil) So(b.url, ShouldNotBeNil)
So(b.url.Path, ShouldResemble, "test.broker") So(b.url.Path, ShouldResemble, "test.broker")
So(b.transport, ShouldNotBeNil) So(b.transport, ShouldNotBeNil)
}) })
Convey("BrokerChannel with front domain", func() { Convey("Construct BrokerChannel *with* front domain", func() {
b := NewBrokerChannel("test.broker", "front", transport) b := NewBrokerChannel("test.broker", "front", transport)
So(b.url, ShouldNotBeNil) So(b.url, ShouldNotBeNil)
So(b.url.Path, ShouldResemble, "test.broker") So(b.url.Path, ShouldResemble, "test.broker")
@ -139,7 +188,7 @@ func TestConnect(t *testing.T) {
So(b.transport, ShouldNotBeNil) So(b.transport, ShouldNotBeNil)
}) })
Convey("BrokerChannel Negotiate responds with answer", func() { Convey("BrokerChannel.Negotiate responds with answer", func() {
b := NewBrokerChannel("test.broker", "", transport) b := NewBrokerChannel("test.broker", "", transport)
answer, err := b.Negotiate(fakeOffer) answer, err := b.Negotiate(fakeOffer)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -147,7 +196,7 @@ func TestConnect(t *testing.T) {
So(answer.Sdp, ShouldResemble, "fake") So(answer.Sdp, ShouldResemble, "fake")
}) })
Convey("BrokerChannel Negotiate fails with 503", func() { Convey("BrokerChannel.Negotiate fails with 503", func() {
b := NewBrokerChannel("test.broker", "", b := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusServiceUnavailable}) &MockTransport{http.StatusServiceUnavailable})
answer, err := b.Negotiate(fakeOffer) answer, err := b.Negotiate(fakeOffer)
@ -156,7 +205,7 @@ func TestConnect(t *testing.T) {
So(err.Error(), ShouldResemble, BrokerError503) So(err.Error(), ShouldResemble, BrokerError503)
}) })
Convey("BrokerChannel Negotiate fails with 400", func() { Convey("BrokerChannel.Negotiate fails with 400", func() {
b := NewBrokerChannel("test.broker", "", b := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusBadRequest}) &MockTransport{http.StatusBadRequest})
answer, err := b.Negotiate(fakeOffer) answer, err := b.Negotiate(fakeOffer)
@ -165,7 +214,7 @@ func TestConnect(t *testing.T) {
So(err.Error(), ShouldResemble, BrokerError400) So(err.Error(), ShouldResemble, BrokerError400)
}) })
Convey("BrokerChannel Negotiate fails with unexpected", func() { Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
b := NewBrokerChannel("test.broker", "", b := NewBrokerChannel("test.broker", "",
&MockTransport{123}) &MockTransport{123})
answer, err := b.Negotiate(fakeOffer) answer, err := b.Negotiate(fakeOffer)

View file

@ -5,6 +5,7 @@ import (
"bufio" "bufio"
"errors" "errors"
"flag" "flag"
"fmt"
"io" "io"
"log" "log"
"net" "net"
@ -22,14 +23,12 @@ var ptInfo pt.ClientInfo
const ( const (
ReconnectTimeout = 10 ReconnectTimeout = 10
SnowflakeCapacity = 3 SnowflakeCapacity = 1
) )
var brokerURL string var brokerURL string
var frontDomain string var frontDomain string
var iceServers IceServerList var iceServers IceServerList
var snowflakeChan = make(chan *webRTCConn, 1)
var broker *BrokerChannel
// When a connection handler starts, +1 is written to this channel; when it // When a connection handler starts, +1 is written to this channel; when it
// ends, -1 is written. // ends, -1 is written.
@ -50,62 +49,110 @@ func copyLoop(a, b net.Conn) {
log.Println("copy loop ended") log.Println("copy loop ended")
} }
// Interface that matches both webrtc.DataChannel and for testing. // Interface for catching Snowflakes.
type Tongue interface {
Catch() (*webRTCConn, error)
}
// Interface for the Snowflake transport. (usually a webrtc.DataChannel)
type SnowflakeChannel interface { type SnowflakeChannel interface {
Send([]byte) Send([]byte)
Close() error Close() error
} }
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to // Collect and track available remote WebRTC Peers, to switch between if the
// transfer to the Tor SOCKS handler when needed. // current one disconnects.
func SnowflakeConnectLoop() { // Right now, it is only possible to use one remote in a circuit. This can be
transport := CreateBrokerTransport() // updated once multiplexed transport on a single circuit is available.
broker = NewBrokerChannel(brokerURL, frontDomain, transport) type Peers struct {
for { Tongue
numRemotes := len(webrtcRemotes)
if numRemotes >= SnowflakeCapacity { snowflakeChan chan *webRTCConn
log.Println("At Capacity: ", numRemotes, "snowflake. Re-checking in 10s") current *webRTCConn
<-time.After(time.Second * 10) capacity int
continue maxedChan chan struct{}
}
s, err := dialWebRTC()
if nil == s || nil != err {
log.Println("WebRTC Error: ", err, " retrying...")
<-time.After(time.Second * ReconnectTimeout)
continue
}
snowflakeChan <- s
}
} }
// Initialize a WebRTC Connection. func NewPeers(max int) *Peers {
func dialWebRTC() (*webRTCConn, error) { p := &Peers{capacity: max}
// TODO: [#3] Fetch ICE server information from Broker. p.snowflakeChan = make(chan *webRTCConn, max)
// TODO: [#18] Consider TURN servers here too. p.maxedChan = make(chan struct{}, 1)
config := webrtc.NewConfiguration(iceServers...) return p
if nil == broker {
return nil, errors.New("Failed to prepare BrokerChannel")
}
connection := NewWebRTCConnection(config, broker)
err := connection.Connect()
return connection, err
} }
func endWebRTC() { // Find, connect, and add a new peer to the internal collection.
func (p *Peers) FindSnowflake() (*webRTCConn, error) {
if p.Count() >= p.capacity {
s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
p.maxedChan <- struct{}{}
return nil, errors.New(s)
}
connection, err := p.Catch()
if err != nil {
return nil, err
}
return connection, nil
}
// TODO: Needs fixing.
func (p *Peers) Count() int {
return len(p.snowflakeChan)
}
// Close all remote peers.
func (p *Peers) End() {
log.Printf("WebRTC: interruped") log.Printf("WebRTC: interruped")
for _, r := range webrtcRemotes { if nil != p.current {
p.current.Close()
}
for r := range p.snowflakeChan {
r.Close() r.Close()
} }
} }
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func ConnectLoop(peers *Peers) {
for {
s, err := peers.FindSnowflake()
if nil == s || nil != err {
log.Println("WebRTC Error:", err,
" Retrying in", ReconnectTimeout, "seconds...")
<-time.After(time.Second * ReconnectTimeout)
continue
}
peers.snowflakeChan <- s
<-time.After(time.Second)
}
}
// Implements |Tongue|
type WebRTCDialer struct {
*BrokerChannel
}
// Initialize a WebRTC Connection by signaling through the broker.
func (w WebRTCDialer) Catch() (*webRTCConn, error) {
if nil == w.BrokerChannel {
return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.")
}
// TODO: [#3] Fetch ICE server information from Broker.
// TODO: [#18] Consider TURN servers here too.
config := webrtc.NewConfiguration(iceServers...)
connection := NewWebRTCConnection(config, w.BrokerChannel)
err := connection.Connect()
return connection, err
}
// Establish a WebRTC channel for SOCKS connections. // Establish a WebRTC channel for SOCKS connections.
func handler(conn *pt.SocksConn) error { func handler(conn *pt.SocksConn, peers *Peers) error {
handlerChan <- 1 handlerChan <- 1
defer func() { defer func() {
handlerChan <- -1 handlerChan <- -1
}() }()
// Wait for an available WebRTC remote... // Wait for an available WebRTC remote...
remote, ok := <-snowflakeChan remote, ok := <-peers.snowflakeChan
peers.current = remote
if remote == nil || !ok { if remote == nil || !ok {
conn.Reject() conn.Reject()
return errors.New("handler: Received invalid Snowflake") return errors.New("handler: Received invalid Snowflake")
@ -125,7 +172,7 @@ func handler(conn *pt.SocksConn) error {
return nil return nil
} }
func acceptLoop(ln *pt.SocksListener) error { func acceptLoop(ln *pt.SocksListener, peers *Peers) error {
defer ln.Close() defer ln.Close()
for { for {
log.Println("SOCKS listening...", ln) log.Println("SOCKS listening...", ln)
@ -138,7 +185,7 @@ func acceptLoop(ln *pt.SocksListener) error {
return err return err
} }
go func() { go func() {
err := handler(conn) err := handler(conn, peers)
if err != nil { if err != nil {
log.Printf("handler error: %s", err) log.Printf("handler error: %s", err)
} }
@ -146,6 +193,7 @@ func acceptLoop(ln *pt.SocksListener) error {
} }
} }
// TODO: Fix since multiplexing changes access to remotes.
func readSignalingMessages(f *os.File) { func readSignalingMessages(f *os.File) {
log.Printf("readSignalingMessages") log.Printf("readSignalingMessages")
s := bufio.NewScanner(f) s := bufio.NewScanner(f)
@ -157,10 +205,10 @@ func readSignalingMessages(f *os.File) {
log.Printf("ignoring invalid signal message %+q", msg) log.Printf("ignoring invalid signal message %+q", msg)
continue continue
} }
webrtcRemotes[0].answerChannel <- sdp // webrtcRemotes[0].answerChannel <- sdp
} }
log.Printf("close answerChannel") log.Printf("close answerChannel")
close(webrtcRemotes[0].answerChannel) // close(webrtcRemotes[0].answerChannel)
if err := s.Err(); err != nil { if err := s.Err(); err != nil {
log.Printf("signal FIFO: %s", err) log.Printf("signal FIFO: %s", err)
} }
@ -204,8 +252,13 @@ func main() {
go readSignalingMessages(signalFile) go readSignalingMessages(signalFile)
} }
webrtcRemotes = make(map[int]*webRTCConn) // Prepare WebRTC Peers and the Broker, then accumulate connections.
go SnowflakeConnectLoop() // TODO: Expose remote peer capacity as a flag?
remotes := NewPeers(SnowflakeCapacity)
broker := NewBrokerChannel(brokerURL, frontDomain, CreateBrokerTransport())
remotes.Tongue = WebRTCDialer{broker}
go ConnectLoop(remotes)
ptInfo, err = pt.ClientSetup(nil) ptInfo, err = pt.ClientSetup(nil)
if err != nil { if err != nil {
@ -221,12 +274,13 @@ func main() {
for _, methodName := range ptInfo.MethodNames { for _, methodName := range ptInfo.MethodNames {
switch methodName { switch methodName {
case "snowflake": case "snowflake":
// TODO: Be able to recover when SOCKS dies.
ln, err := pt.ListenSocks("tcp", "127.0.0.1:0") ln, err := pt.ListenSocks("tcp", "127.0.0.1:0")
if err != nil { if err != nil {
pt.CmethodError(methodName, err.Error()) pt.CmethodError(methodName, err.Error())
break break
} }
go acceptLoop(ln) go acceptLoop(ln, remotes)
pt.Cmethod(methodName, ln.Version(), ln.Addr()) pt.Cmethod(methodName, ln.Version(), ln.Addr())
listeners = append(listeners, ln) listeners = append(listeners, ln)
default: default:
@ -234,7 +288,6 @@ func main() {
} }
} }
pt.CmethodsDone() pt.CmethodsDone()
defer endWebRTC()
var numHandlers int = 0 var numHandlers int = 0
var sig os.Signal var sig os.Signal
@ -254,6 +307,8 @@ func main() {
ln.Close() ln.Close()
} }
remotes.End()
// wait for second signal or no more handlers // wait for second signal or no more handlers
sig = nil sig = nil
for sig == nil && numHandlers != 0 { for sig == nil && numHandlers != 0 {

View file

@ -11,26 +11,27 @@ import (
"time" "time"
) )
// Implements net.Conn interface // Remote WebRTC peer. Implements the |net.Conn| interface.
type webRTCConn struct { type webRTCConn struct {
config *webrtc.Configuration config *webrtc.Configuration
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. snowflake SnowflakeChannel // Holds the WebRTC DataChannel.
broker *BrokerChannel broker *BrokerChannel
offerChannel chan *webrtc.SessionDescription offerChannel chan *webrtc.SessionDescription
answerChannel chan *webrtc.SessionDescription answerChannel chan *webrtc.SessionDescription
errorChannel chan error errorChannel chan error
endChannel chan struct{}
recvPipe *io.PipeReader recvPipe *io.PipeReader
writePipe *io.PipeWriter writePipe *io.PipeWriter
buffer bytes.Buffer buffer bytes.Buffer
reset chan struct{} reset chan struct{}
index int index int
closed bool
*BytesInfo *BytesInfo
} }
var webrtcRemotes map[int]*webRTCConn
var remoteIndex int = 0
func (c *webRTCConn) Read(b []byte) (int, error) { func (c *webRTCConn) Read(b []byte) (int, error) {
return c.recvPipe.Read(b) return c.recvPipe.Read(b)
} }
@ -51,10 +52,17 @@ func (c *webRTCConn) Close() error {
var err error = nil var err error = nil
log.Printf("WebRTC: Closing") log.Printf("WebRTC: Closing")
c.cleanup() c.cleanup()
if nil != c.offerChannel {
close(c.offerChannel) close(c.offerChannel)
}
if nil != c.answerChannel {
close(c.answerChannel) close(c.answerChannel)
}
if nil != c.errorChannel {
close(c.errorChannel) close(c.errorChannel)
delete(webrtcRemotes, c.index) }
// Mark for deletion.
c.closed = true
return err return err
} }
@ -78,6 +86,7 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
return fmt.Errorf("SetWriteDeadline not implemented") return fmt.Errorf("SetWriteDeadline not implemented")
} }
// Construct a WebRTC PeerConnection.
func NewWebRTCConnection(config *webrtc.Configuration, func NewWebRTCConnection(config *webrtc.Configuration,
broker *BrokerChannel) *webRTCConn { broker *BrokerChannel) *webRTCConn {
connection := new(webRTCConn) connection := new(webRTCConn)
@ -90,6 +99,7 @@ func NewWebRTCConnection(config *webrtc.Configuration,
connection.errorChannel = make(chan error, 1) connection.errorChannel = make(chan error, 1)
connection.reset = make(chan struct{}, 1) connection.reset = make(chan struct{}, 1)
// TODO: Separate out.
// Log every few seconds. // Log every few seconds.
connection.BytesInfo = &BytesInfo{ connection.BytesInfo = &BytesInfo{
inboundChan: make(chan int, 5), outboundChan: make(chan int, 5), inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
@ -99,9 +109,6 @@ func NewWebRTCConnection(config *webrtc.Configuration,
// Pipes remain the same even when DataChannel gets switched. // Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe() connection.recvPipe, connection.writePipe = io.Pipe()
connection.index = remoteIndex
webrtcRemotes[connection.index] = connection
remoteIndex++
return connection return connection
} }
@ -296,12 +303,12 @@ func (c *webRTCConn) Reset() {
func (c *webRTCConn) cleanup() { func (c *webRTCConn) cleanup() {
if nil != c.snowflake { if nil != c.snowflake {
s := c.snowflake
log.Printf("WebRTC: closing DataChannel") log.Printf("WebRTC: closing DataChannel")
dataChannel := c.snowflake
// Setting snowflake to nil *before* Close indicates to OnClose that it // Setting snowflake to nil *before* Close indicates to OnClose that it
// was locally triggered. // was locally triggered.
c.snowflake = nil c.snowflake = nil
s.Close() dataChannel.Close()
} }
if nil != c.pc { if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection") log.Printf("WebRTC: closing PeerConnection")