Start refactoring out a client and library

This commit is contained in:
Arlo Breault 2018-11-20 22:17:24 -05:00
parent 7662ccb00c
commit cce7ee64a7
8 changed files with 120 additions and 105 deletions

56
client/lib/interfaces.go Normal file
View file

@ -0,0 +1,56 @@
package lib
import (
"io"
"net"
)
type Connector interface {
Connect() error
}
type Resetter interface {
Reset()
WaitForReset()
}
// Interface for a single remote WebRTC peer.
// In the Client context, "Snowflake" refers to the remote browser proxy.
type Snowflake interface {
io.ReadWriteCloser
Resetter
Connector
}
// Interface for catching Snowflakes. (aka the remote dialer)
type Tongue interface {
Catch() (Snowflake, error)
}
// Interface for collecting some number of Snowflakes, for passing along
// ultimately to the SOCKS handler.
type SnowflakeCollector interface {
// Add a Snowflake to the collection.
// Implementation should decide how to connect and maintain the webRTCConn.
Collect() (Snowflake, error)
// Remove and return the most available Snowflake from the collection.
Pop() Snowflake
// Signal when the collector has stopped collecting.
Melted() <-chan struct{}
}
// Interface to adapt to goptlib's SocksConn struct.
type SocksConnector interface {
Grant(*net.TCPAddr) error
Reject() error
net.Conn
}
// Interface for the Snowflake's transport. (Typically just webrtc.DataChannel)
type SnowflakeDataChannel interface {
io.Closer
Send([]byte)
}

319
client/lib/lib_test.go Normal file
View file

@ -0,0 +1,319 @@
package lib
import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"
"testing"
"github.com/keroserene/go-webrtc"
. "github.com/smartystreets/goconvey/convey"
)
type MockDataChannel struct {
destination bytes.Buffer
done chan bool
}
func (m *MockDataChannel) Send(data []byte) {
m.destination.Write(data)
m.done <- true
}
func (*MockDataChannel) Close() error { return nil }
type MockResponse struct{}
func (m *MockResponse) Read(p []byte) (int, error) {
p = []byte(`{"type":"answer","sdp":"fake"}`)
return 0, nil
}
func (m *MockResponse) Close() error { return nil }
type MockTransport struct{ statusOverride int }
// Just returns a response with fake SDP answer.
func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
s := ioutil.NopCloser(strings.NewReader(`{"type":"answer","sdp":"fake"}`))
r := &http.Response{
StatusCode: m.statusOverride,
Body: s,
}
return r, nil
}
type FakeDialer struct{}
func (w FakeDialer) Catch() (Snowflake, error) {
fmt.Println("Caught a dummy snowflake.")
return &WebRTCPeer{}, nil
}
type FakeSocksConn struct {
net.Conn
rejected bool
}
func (f FakeSocksConn) Reject() error {
f.rejected = true
return nil
}
func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { return nil }
type FakePeers struct{ toRelease *WebRTCPeer }
func (f FakePeers) Collect() (Snowflake, error) { return &WebRTCPeer{}, nil }
func (f FakePeers) Pop() Snowflake { return nil }
func (f FakePeers) Melted() <-chan struct{} { return nil }
func TestSnowflakeClient(t *testing.T) {
Convey("Peers", t, func() {
Convey("Can construct", func() {
p := NewPeers(1)
So(p.capacity, ShouldEqual, 1)
So(p.snowflakeChan, ShouldNotBeNil)
So(cap(p.snowflakeChan), ShouldEqual, 1)
})
Convey("Collecting a Snowflake requires a Tongue.", func() {
p := NewPeers(1)
_, err := p.Collect()
So(err, ShouldNotBeNil)
So(p.Count(), ShouldEqual, 0)
// Set the dialer so that collection is possible.
p.Tongue = FakeDialer{}
_, err = p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, 1)
// S
_, err = p.Collect()
})
Convey("Collection continues until capacity.", func() {
c := 5
p := NewPeers(c)
p.Tongue = FakeDialer{}
// Fill up to capacity.
for i := 0; i < c; i++ {
fmt.Println("Adding snowflake ", i)
_, err := p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, i+1)
}
// But adding another gives an error.
So(p.Count(), ShouldEqual, c)
_, err := p.Collect()
So(err, ShouldNotBeNil)
So(p.Count(), ShouldEqual, c)
// But popping and closing allows it to continue.
s := p.Pop()
s.Close()
So(s, ShouldNotBeNil)
So(p.Count(), ShouldEqual, c-1)
_, err = p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, c)
})
Convey("Count correctly purges peers marked for deletion.", func() {
p := NewPeers(4)
p.Tongue = FakeDialer{}
p.Collect()
p.Collect()
p.Collect()
p.Collect()
So(p.Count(), ShouldEqual, 4)
s := p.Pop()
s.Close()
So(p.Count(), ShouldEqual, 3)
s = p.Pop()
s.Close()
So(p.Count(), ShouldEqual, 2)
})
Convey("End Closes all peers.", func() {
cnt := 5
p := NewPeers(cnt)
for i := 0; i < cnt; i++ {
p.activePeers.PushBack(&WebRTCPeer{})
}
So(p.Count(), ShouldEqual, cnt)
p.End()
<-p.Melted()
So(p.Count(), ShouldEqual, 0)
})
Convey("Pop skips over closed peers.", func() {
p := NewPeers(4)
p.Tongue = FakeDialer{}
wc1, _ := p.Collect()
wc2, _ := p.Collect()
wc3, _ := p.Collect()
So(wc1, ShouldNotBeNil)
So(wc2, ShouldNotBeNil)
So(wc3, ShouldNotBeNil)
wc1.Close()
r := p.Pop()
So(p.Count(), ShouldEqual, 2)
So(r, ShouldEqual, wc2)
wc4, _ := p.Collect()
wc2.Close()
wc3.Close()
r = p.Pop()
So(r, ShouldEqual, wc4)
})
})
Convey("Snowflake", t, func() {
SkipConvey("Handler Grants correctly", func() {
socks := &FakeSocksConn{}
snowflakes := &FakePeers{}
So(socks.rejected, ShouldEqual, false)
snowflakes.toRelease = nil
Handler(socks, snowflakes)
So(socks.rejected, ShouldEqual, true)
})
Convey("WebRTC Connection", func() {
c := NewWebRTCPeer(nil, nil)
So(c.buffer.Bytes(), ShouldEqual, nil)
Convey("Can construct a WebRTCConn", func() {
s := NewWebRTCPeer(nil, nil)
So(s, ShouldNotBeNil)
So(s.offerChannel, ShouldNotBeNil)
So(s.answerChannel, ShouldNotBeNil)
s.Close()
})
Convey("Write buffers when datachannel is nil", func() {
c.Write([]byte("test"))
c.transport = nil
So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
})
Convey("Write sends to datachannel when not nil", func() {
mock := new(MockDataChannel)
c.transport = mock
mock.done = make(chan bool, 1)
c.Write([]byte("test"))
<-mock.done
So(c.buffer.Bytes(), ShouldEqual, nil)
So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
})
Convey("Exchange SDP sets remote description", func() {
c.offerChannel = make(chan *webrtc.SessionDescription, 1)
c.answerChannel = make(chan *webrtc.SessionDescription, 1)
c.config = webrtc.NewConfiguration()
c.preparePeerConnection()
c.offerChannel <- nil
answer := webrtc.DeserializeSessionDescription(
`{"type":"answer","sdp":""}`)
c.answerChannel <- answer
c.exchangeSDP()
})
SkipConvey("Exchange SDP fails on nil answer", func() {
c.reset = make(chan struct{})
c.offerChannel = make(chan *webrtc.SessionDescription, 1)
c.answerChannel = make(chan *webrtc.SessionDescription, 1)
c.offerChannel <- nil
c.answerChannel <- nil
c.exchangeSDP()
<-c.reset
})
})
})
Convey("Dialers", t, func() {
Convey("Can construct WebRTCDialer.", func() {
broker := &BrokerChannel{Host: "test"}
d := NewWebRTCDialer(broker, nil)
So(d, ShouldNotBeNil)
So(d.BrokerChannel, ShouldNotBeNil)
So(d.BrokerChannel.Host, ShouldEqual, "test")
})
Convey("WebRTCDialer cannot Catch a snowflake with nil broker.", func() {
d := NewWebRTCDialer(nil, nil)
conn, err := d.Catch()
So(conn, ShouldBeNil)
So(err, ShouldNotBeNil)
})
SkipConvey("WebRTCDialer can Catch a snowflake.", func() {
broker := &BrokerChannel{Host: "test"}
d := NewWebRTCDialer(broker, nil)
conn, err := d.Catch()
So(conn, ShouldBeNil)
So(err, ShouldNotBeNil)
})
})
Convey("Rendezvous", t, func() {
webrtc.SetLoggingVerbosity(0)
transport := &MockTransport{http.StatusOK}
fakeOffer := webrtc.DeserializeSessionDescription("test")
Convey("Construct BrokerChannel with no front domain", func() {
b := NewBrokerChannel("test.broker", "", transport)
So(b.url, ShouldNotBeNil)
So(b.url.Path, ShouldResemble, "test.broker")
So(b.transport, ShouldNotBeNil)
})
Convey("Construct BrokerChannel *with* front domain", func() {
b := NewBrokerChannel("test.broker", "front", transport)
So(b.url, ShouldNotBeNil)
So(b.url.Path, ShouldResemble, "test.broker")
So(b.url.Host, ShouldResemble, "front")
So(b.transport, ShouldNotBeNil)
})
Convey("BrokerChannel.Negotiate responds with answer", func() {
b := NewBrokerChannel("test.broker", "", transport)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldBeNil)
So(answer, ShouldNotBeNil)
So(answer.Sdp, ShouldResemble, "fake")
})
Convey("BrokerChannel.Negotiate fails with 503", func() {
b := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusServiceUnavailable})
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerError503)
})
Convey("BrokerChannel.Negotiate fails with 400", func() {
b := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusBadRequest})
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerError400)
})
Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
b := NewBrokerChannel("test.broker", "",
&MockTransport{123})
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerErrorUnexpected)
})
})
}

124
client/lib/peers.go Normal file
View file

@ -0,0 +1,124 @@
package lib
import (
"container/list"
"errors"
"fmt"
"log"
)
// Container which keeps track of multiple WebRTC remote peers.
// Implements |SnowflakeCollector|.
//
// Maintaining a set of pre-connected Peers with fresh but inactive datachannels
// allows allows rapid recovery when the current WebRTC Peer disconnects.
//
// Note: For now, only one remote can be active at any given moment.
// This is a property of Tor circuits & its current multiplexing constraints,
// but could be updated if that changes.
// (Also, this constraint does not necessarily apply to the more generic PT
// version of Snowflake)
type Peers struct {
Tongue
BytesLogger
snowflakeChan chan Snowflake
activePeers *list.List
capacity int
melt chan struct{}
}
// Construct a fresh container of remote peers.
func NewPeers(max int) *Peers {
p := &Peers{capacity: max}
// Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
p.snowflakeChan = make(chan Snowflake, max)
p.activePeers = list.New()
p.melt = make(chan struct{}, 1)
return p
}
// As part of |SnowflakeCollector| interface.
func (p *Peers) Collect() (Snowflake, error) {
cnt := p.Count()
s := fmt.Sprintf("Currently at [%d/%d]", cnt, p.capacity)
if cnt >= p.capacity {
s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
return nil, errors.New(s)
}
log.Println("WebRTC: Collecting a new Snowflake.", s)
// Engage the Snowflake Catching interface, which must be available.
if nil == p.Tongue {
return nil, errors.New("Missing Tongue to catch Snowflakes with.")
}
// BUG: some broker conflict here.
connection, err := p.Tongue.Catch()
if nil != err {
return nil, err
}
// Track new valid Snowflake in internal collection and pass along.
p.activePeers.PushBack(connection)
p.snowflakeChan <- connection
return connection, nil
}
// As part of |SnowflakeCollector| interface.
func (p *Peers) Pop() Snowflake {
// Blocks until an available, valid snowflake appears.
var snowflake Snowflake
var ok bool
for nil == snowflake {
snowflake, ok = <-p.snowflakeChan
conn := snowflake.(*WebRTCPeer)
if !ok {
return nil
}
if conn.closed {
snowflake = nil
}
}
// Set to use the same rate-limited traffic logger to keep consistency.
snowflake.(*WebRTCPeer).BytesLogger = p.BytesLogger
return snowflake
}
// As part of |SnowflakeCollector| interface.
func (p *Peers) Melted() <-chan struct{} {
return p.melt
}
// Returns total available Snowflakes (including the active one)
// The count only reduces when connections themselves close, rather than when
// they are popped.
func (p *Peers) Count() int {
p.purgeClosedPeers()
return p.activePeers.Len()
}
func (p *Peers) purgeClosedPeers() {
for e := p.activePeers.Front(); e != nil; {
next := e.Next()
conn := e.Value.(*WebRTCPeer)
// Purge those marked for deletion.
if conn.closed {
p.activePeers.Remove(e)
}
e = next
}
}
// Close all Peers contained here.
func (p *Peers) End() {
close(p.snowflakeChan)
p.melt <- struct{}{}
cnt := p.Count()
for e := p.activePeers.Front(); e != nil; {
next := e.Next()
conn := e.Value.(*WebRTCPeer)
conn.Close()
p.activePeers.Remove(e)
e = next
}
log.Println("WebRTC: melted all", cnt, "snowflakes.")
}

213
client/lib/rendezvous.go Normal file
View file

@ -0,0 +1,213 @@
// WebRTC rendezvous requires the exchange of SessionDescriptions between
// peers in order to establish a PeerConnection.
//
// This file contains the two methods currently available to Snowflake:
//
// - Domain-fronted HTTP signaling. The Broker automatically exchange offers
// and answers between this client and some remote WebRTC proxy.
// (This is the recommended default, enabled via the flags in "torrc".)
//
// - Manual copy-paste signaling. User must create a signaling pipe.
// (The flags in torrc-manual allow this)
package lib
import (
"bufio"
"bytes"
"errors"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"syscall"
"github.com/keroserene/go-webrtc"
)
const (
BrokerError503 string = "No snowflake proxies currently available."
BrokerError400 string = "You sent an invalid offer in the request."
BrokerErrorUnexpected string = "Unexpected error, no answer."
)
// Signalling Channel to the Broker.
type BrokerChannel struct {
// The Host header to put in the HTTP request (optional and may be
// different from the host name in URL).
Host string
url *url.URL
transport http.RoundTripper // Used to make all requests.
}
// We make a copy of DefaultTransport because we want the default Dial
// and TLSHandshakeTimeout settings. But we want to disable the default
// ProxyFromEnvironment setting.
func CreateBrokerTransport() http.RoundTripper {
transport := http.DefaultTransport.(*http.Transport)
transport.Proxy = nil
return transport
}
// Construct a new BrokerChannel, where:
// |broker| is the full URL of the facilitating program which assigns proxies
// to clients, and |front| is the option fronting domain.
func NewBrokerChannel(broker string, front string, transport http.RoundTripper) *BrokerChannel {
targetURL, err := url.Parse(broker)
if nil != err {
return nil
}
log.Println("Rendezvous using Broker at:", broker)
bc := new(BrokerChannel)
bc.url = targetURL
if "" != front { // Optional front domain.
log.Println("Domain fronting using:", front)
bc.Host = bc.url.Host
bc.url.Host = front
}
bc.transport = transport
return bc
}
// Roundtrip HTTP POST using WebRTC SessionDescriptions.
//
// Send an SDP offer to the broker, which assigns a proxy and responds
// with an SDP answer from a designated remote WebRTC peer.
func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
*webrtc.SessionDescription, error) {
log.Println("Negotiating via BrokerChannel...\nTarget URL: ",
bc.Host, "\nFront URL: ", bc.url.Host)
data := bytes.NewReader([]byte(offer.Serialize()))
// Suffix with broker's client registration handler.
clientURL := bc.url.ResolveReference(&url.URL{Path: "client"})
request, err := http.NewRequest("POST", clientURL.String(), data)
if nil != err {
return nil, err
}
if "" != bc.Host { // Set true host if necessary.
request.Host = bc.Host
}
resp, err := bc.transport.RoundTrip(request)
if nil != err {
return nil, err
}
defer resp.Body.Close()
log.Printf("BrokerChannel Response:\n%s\n\n", resp.Status)
switch resp.StatusCode {
case http.StatusOK:
body, err := ioutil.ReadAll(resp.Body)
if nil != err {
return nil, err
}
answer := webrtc.DeserializeSessionDescription(string(body))
return answer, nil
case http.StatusServiceUnavailable:
return nil, errors.New(BrokerError503)
case http.StatusBadRequest:
return nil, errors.New(BrokerError400)
default:
return nil, errors.New(BrokerErrorUnexpected)
}
}
// Implements the |Tongue| interface to catch snowflakes, using BrokerChannel.
type WebRTCDialer struct {
*BrokerChannel
webrtcConfig *webrtc.Configuration
}
func NewWebRTCDialer(
broker *BrokerChannel, iceServers IceServerList) *WebRTCDialer {
config := webrtc.NewConfiguration(iceServers...)
if nil == config {
log.Println("Unable to prepare WebRTC configuration.")
return nil
}
return &WebRTCDialer{
BrokerChannel: broker,
webrtcConfig: config,
}
}
// Initialize a WebRTC Connection by signaling through the broker.
func (w WebRTCDialer) Catch() (Snowflake, error) {
if nil == w.BrokerChannel {
return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.")
}
// TODO: [#3] Fetch ICE server information from Broker.
// TODO: [#18] Consider TURN servers here too.
connection := NewWebRTCPeer(w.webrtcConfig, w.BrokerChannel)
err := connection.Connect()
return connection, err
}
// CopyPasteDialer handles the interaction required to copy-paste the
// offers and answers.
// Implements |Tongue| interface to catch snowflakes manually.
// Supports recovery of connections.
type CopyPasteDialer struct {
webrtcConfig *webrtc.Configuration
signal *os.File
current *WebRTCPeer
}
func NewCopyPasteDialer(iceServers IceServerList) *CopyPasteDialer {
log.Println("No HTTP signaling detected. Using manual copy-paste signaling.")
log.Println("Waiting for a \"signal\" pipe...")
// This FIFO receives signaling messages.
err := syscall.Mkfifo("signal", 0600)
if err != nil {
if syscall.EEXIST != err.(syscall.Errno) {
log.Fatal(err)
}
}
signalFile, err := os.OpenFile("signal", os.O_RDONLY, 0600)
if nil != err {
log.Fatal(err)
return nil
}
config := webrtc.NewConfiguration(iceServers...)
dialer := &CopyPasteDialer{
webrtcConfig: config,
signal: signalFile,
}
go dialer.readSignals()
return dialer
}
// Initialize a WebRTC Peer via manual copy-paste.
func (d *CopyPasteDialer) Catch() (Snowflake, error) {
if nil == d.signal {
return nil, errors.New("Cannot copy-paste dial without signal pipe.")
}
connection := NewWebRTCPeer(d.webrtcConfig, nil)
// Must keep track of pending new connection until copy-paste completes.
d.current = connection
// Outputs SDP offer to log, expecting user to copy-paste to the remote Peer.
// Blocks until user pastes back the answer.
err := connection.Connect()
d.current = nil
return connection, err
}
// Manual copy-paste signalling.
func (d *CopyPasteDialer) readSignals() {
defer d.signal.Close()
log.Printf("CopyPasteDialer: reading messages from signal pipe.")
s := bufio.NewScanner(d.signal)
for s.Scan() {
msg := s.Text()
sdp := webrtc.DeserializeSessionDescription(msg)
if sdp == nil {
log.Printf("CopyPasteDialer: ignoring invalid signal message %+q", msg)
continue
}
d.current.answerChannel <- sdp
}
if err := s.Err(); err != nil {
log.Printf("signal FIFO: %s", err)
}
}

69
client/lib/snowflake.go Normal file
View file

@ -0,0 +1,69 @@
package lib
import (
"errors"
"io"
"log"
"net"
"sync"
)
const (
ReconnectTimeout = 10
SnowflakeTimeout = 30
)
// When a connection handler starts, +1 is written to this channel; when it
// ends, -1 is written.
var HandlerChan = make(chan int)
// Given an accepted SOCKS connection, establish a WebRTC connection to the
// remote peer and exchange traffic.
func Handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
HandlerChan <- 1
defer func() {
HandlerChan <- -1
}()
// Obtain an available WebRTC remote. May block.
snowflake := snowflakes.Pop()
if nil == snowflake {
socks.Reject()
return errors.New("handler: Received invalid Snowflake")
}
defer socks.Close()
defer snowflake.Close()
log.Println("---- Handler: snowflake assigned ----")
err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
if err != nil {
return err
}
go func() {
// When WebRTC resets, close the SOCKS connection too.
snowflake.WaitForReset()
socks.Close()
}()
// Begin exchanging data. Either WebRTC or localhost SOCKS will close first.
// In eithercase, this closes the handler and induces a new handler.
copyLoop(socks, snowflake)
log.Println("---- Handler: closed ---")
return nil
}
// Exchanges bytes between two ReadWriters.
// (In this case, between a SOCKS and WebRTC connection.)
func copyLoop(a, b io.ReadWriter) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
io.Copy(b, a)
wg.Done()
}()
go func() {
io.Copy(a, b)
wg.Done()
}()
wg.Wait()
log.Println("copy loop ended")
}

95
client/lib/util.go Normal file
View file

@ -0,0 +1,95 @@
package lib
import (
"fmt"
"log"
"time"
"github.com/keroserene/go-webrtc"
)
const (
LogTimeInterval = 5
)
type IceServerList []webrtc.ConfigurationOption
func (i *IceServerList) String() string {
return fmt.Sprint(*i)
}
type BytesLogger interface {
Log()
AddOutbound(int)
AddInbound(int)
}
// Default BytesLogger does nothing.
type BytesNullLogger struct{}
func (b BytesNullLogger) Log() {}
func (b BytesNullLogger) AddOutbound(amount int) {}
func (b BytesNullLogger) AddInbound(amount int) {}
// BytesSyncLogger uses channels to safely log from multiple sources with output
// occuring at reasonable intervals.
type BytesSyncLogger struct {
OutboundChan chan int
InboundChan chan int
Outbound int
Inbound int
OutEvents int
InEvents int
IsLogging bool
}
func (b *BytesSyncLogger) Log() {
b.IsLogging = true
var amount int
output := func() {
log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
b.Inbound, b.Outbound, b.InEvents, b.OutEvents)
b.Outbound = 0
b.OutEvents = 0
b.Inbound = 0
b.InEvents = 0
}
last := time.Now()
for {
select {
case amount = <-b.OutboundChan:
b.Outbound += amount
b.OutEvents++
last := time.Now()
if time.Since(last) > time.Second*LogTimeInterval {
last = time.Now()
output()
}
case amount = <-b.InboundChan:
b.Inbound += amount
b.InEvents++
if time.Since(last) > time.Second*LogTimeInterval {
last = time.Now()
output()
}
case <-time.After(time.Second * LogTimeInterval):
if b.InEvents > 0 || b.OutEvents > 0 {
output()
}
}
}
}
func (b *BytesSyncLogger) AddOutbound(amount int) {
if !b.IsLogging {
return
}
b.OutboundChan <- amount
}
func (b *BytesSyncLogger) AddInbound(amount int) {
if !b.IsLogging {
return
}
b.InboundChan <- amount
}

363
client/lib/webrtc.go Normal file
View file

@ -0,0 +1,363 @@
package lib
import (
"bytes"
"errors"
"io"
"log"
"sync"
"time"
"github.com/dchest/uniuri"
"github.com/keroserene/go-webrtc"
)
// Remote WebRTC peer.
// Implements the |Snowflake| interface, which includes
// |io.ReadWriter|, |Resetter|, and |Connector|.
//
// Handles preparation of go-webrtc PeerConnection. Only ever has
// one DataChannel.
type WebRTCPeer struct {
id string
config *webrtc.Configuration
pc *webrtc.PeerConnection
transport SnowflakeDataChannel // Holds the WebRTC DataChannel.
broker *BrokerChannel
offerChannel chan *webrtc.SessionDescription
answerChannel chan *webrtc.SessionDescription
errorChannel chan error
recvPipe *io.PipeReader
writePipe *io.PipeWriter
lastReceive time.Time
buffer bytes.Buffer
reset chan struct{}
closed bool
lock sync.Mutex // Synchronization for DataChannel destruction
once sync.Once // Synchronization for PeerConnection destruction
BytesLogger
}
// Construct a WebRTC PeerConnection.
func NewWebRTCPeer(config *webrtc.Configuration,
broker *BrokerChannel) *WebRTCPeer {
connection := new(WebRTCPeer)
connection.id = "snowflake-" + uniuri.New()
connection.config = config
connection.broker = broker
connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
// Error channel is mostly for reporting during the initial SDP offer
// creation & local description setting, which happens asynchronously.
connection.errorChannel = make(chan error, 1)
connection.reset = make(chan struct{}, 1)
// Override with something that's not NullLogger to have real logging.
connection.BytesLogger = &BytesNullLogger{}
// Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe()
return connection
}
// Read bytes from local SOCKS.
// As part of |io.ReadWriter|
func (c *WebRTCPeer) Read(b []byte) (int, error) {
return c.recvPipe.Read(b)
}
// Writes bytes out to remote WebRTC.
// As part of |io.ReadWriter|
func (c *WebRTCPeer) Write(b []byte) (int, error) {
c.lock.Lock()
defer c.lock.Unlock()
c.BytesLogger.AddOutbound(len(b))
// TODO: Buffering could be improved / separated out of WebRTCPeer.
if nil == c.transport {
log.Printf("Buffered %d bytes --> WebRTC", len(b))
c.buffer.Write(b)
} else {
c.transport.Send(b)
}
return len(b), nil
}
// As part of |Snowflake|
func (c *WebRTCPeer) Close() error {
c.once.Do(func() {
c.closed = true
c.cleanup()
c.Reset()
log.Printf("WebRTC: Closing")
})
return nil
}
// As part of |Resetter|
func (c *WebRTCPeer) Reset() {
if nil == c.reset {
return
}
c.reset <- struct{}{}
}
// As part of |Resetter|
func (c *WebRTCPeer) WaitForReset() { <-c.reset }
// Prevent long-lived broken remotes.
// Should also update the DataChannel in underlying go-webrtc's to make Closes
// more immediate / responsive.
func (c *WebRTCPeer) checkForStaleness() {
c.lastReceive = time.Now()
for {
if c.closed {
return
}
if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout {
log.Println("WebRTC: No messages received for", SnowflakeTimeout,
"seconds -- closing stale connection.")
c.Close()
return
}
<-time.After(time.Second)
}
}
// As part of |Connector| interface.
func (c *WebRTCPeer) Connect() error {
log.Println(c.id, " connecting...")
// TODO: When go-webrtc is more stable, it's possible that a new
// PeerConnection won't need to be re-prepared each time.
err := c.preparePeerConnection()
if err != nil {
return err
}
err = c.establishDataChannel()
if err != nil {
return errors.New("WebRTC: Could not establish DataChannel.")
}
err = c.exchangeSDP()
if err != nil {
return err
}
go c.checkForStaleness()
return nil
}
// Create and prepare callbacks on a new WebRTC PeerConnection.
func (c *WebRTCPeer) preparePeerConnection() error {
if nil != c.pc {
c.pc.Destroy()
c.pc = nil
}
pc, err := webrtc.NewPeerConnection(c.config)
if err != nil {
log.Printf("NewPeerConnection ERROR: %s", err)
return err
}
// Prepare PeerConnection callbacks.
pc.OnNegotiationNeeded = func() {
log.Println("WebRTC: OnNegotiationNeeded")
go func() {
offer, err := pc.CreateOffer()
// TODO: Potentially timeout and retry if ICE isn't working.
if err != nil {
c.errorChannel <- err
return
}
err = pc.SetLocalDescription(offer)
if err != nil {
c.errorChannel <- err
return
}
}()
}
// Allow candidates to accumulate until OnIceComplete.
pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
log.Printf(candidate.Candidate)
}
// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
pc.OnIceComplete = func() {
log.Printf("WebRTC: OnIceComplete")
c.offerChannel <- pc.LocalDescription()
}
// This callback is not expected, as the Client initiates the creation
// of the data channel, not the remote peer.
pc.OnDataChannel = func(channel *webrtc.DataChannel) {
log.Println("OnDataChannel")
panic("Unexpected OnDataChannel!")
}
c.pc = pc
log.Println("WebRTC: PeerConnection created.")
return nil
}
// Create a WebRTC DataChannel locally.
func (c *WebRTCPeer) establishDataChannel() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.transport != nil {
panic("Unexpected datachannel already exists!")
}
dc, err := c.pc.CreateDataChannel(c.id)
// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
// an SDP offer while other goroutines operating on this struct handle the
// signaling. Eventually fires "OnOpen".
if err != nil {
log.Printf("CreateDataChannel ERROR: %s", err)
return err
}
dc.OnOpen = func() {
c.lock.Lock()
defer c.lock.Unlock()
log.Println("WebRTC: DataChannel.OnOpen")
if nil != c.transport {
panic("WebRTC: transport already exists.")
}
// Flush buffered outgoing SOCKS data if necessary.
if c.buffer.Len() > 0 {
dc.Send(c.buffer.Bytes())
log.Println("Flushed", c.buffer.Len(), "bytes.")
c.buffer.Reset()
}
// Then enable the datachannel.
c.transport = dc
}
dc.OnClose = func() {
c.lock.Lock()
// Future writes will go to the buffer until a new DataChannel is available.
if nil == c.transport {
// Closed locally, as part of a reset.
log.Println("WebRTC: DataChannel.OnClose [locally]")
c.lock.Unlock()
return
}
// Closed remotely, need to reset everything.
// Disable the DataChannel as a write destination.
log.Println("WebRTC: DataChannel.OnClose [remotely]")
c.transport = nil
c.pc.DeleteDataChannel(dc)
// Unlock before Close'ing, since it calls cleanup and asks for the
// lock to check if the transport needs to be be deleted.
c.lock.Unlock()
c.Close()
}
dc.OnMessage = func(msg []byte) {
if len(msg) <= 0 {
log.Println("0 length message---")
}
c.BytesLogger.AddInbound(len(msg))
n, err := c.writePipe.Write(msg)
if err != nil {
// TODO: Maybe shouldn't actually close.
log.Println("Error writing to SOCKS pipe")
c.writePipe.CloseWithError(err)
}
if n != len(msg) {
log.Println("Error: short write")
panic("short write")
}
c.lastReceive = time.Now()
}
log.Println("WebRTC: DataChannel created.")
return nil
}
func (c *WebRTCPeer) sendOfferToBroker() {
if nil == c.broker {
return
}
offer := c.pc.LocalDescription()
answer, err := c.broker.Negotiate(offer)
if nil != err || nil == answer {
log.Printf("BrokerChannel Error: %s", err)
answer = nil
}
c.answerChannel <- answer
}
// Block until an SDP offer is available, send it to either
// the Broker or signal pipe, then await for the SDP answer.
func (c *WebRTCPeer) exchangeSDP() error {
select {
case offer := <-c.offerChannel:
// Display for copy-paste when no broker available.
if nil == c.broker {
log.Printf("Please Copy & Paste the following to the peer:")
log.Printf("----------------")
log.Printf("\n\n" + offer.Serialize() + "\n\n")
log.Printf("----------------")
}
case err := <-c.errorChannel:
log.Println("Failed to prepare offer", err)
c.Close()
return err
}
// Keep trying the same offer until a valid answer arrives.
var ok bool
var answer *webrtc.SessionDescription = nil
for nil == answer {
go c.sendOfferToBroker()
answer, ok = <-c.answerChannel // Blocks...
if !ok || nil == answer {
log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout)
<-time.After(time.Second * ReconnectTimeout)
answer = nil
}
}
log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
err := c.pc.SetRemoteDescription(answer)
if nil != err {
log.Println("WebRTC: Unable to SetRemoteDescription:", err)
return err
}
return nil
}
// Close all channels and transports
func (c *WebRTCPeer) cleanup() {
if nil != c.offerChannel {
close(c.offerChannel)
}
if nil != c.answerChannel {
close(c.answerChannel)
}
if nil != c.errorChannel {
close(c.errorChannel)
}
// Close this side of the SOCKS pipe.
if nil != c.writePipe {
c.writePipe.Close()
c.writePipe = nil
}
c.lock.Lock()
if nil != c.transport {
log.Printf("WebRTC: closing DataChannel")
dataChannel := c.transport
// Setting transport to nil *before* dc Close indicates to OnClose that
// this was locally triggered.
c.transport = nil
// Release the lock before calling DeleteDataChannel (which in turn
// calls Close on the dataChannel), but after nil'ing out the transport,
// since otherwise we'll end up in the onClose handler in a deadlock.
c.lock.Unlock()
if c.pc == nil {
panic("DataChannel w/o PeerConnection, not good.")
}
c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel))
} else {
c.lock.Unlock()
}
if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection")
err := c.pc.Destroy()
if nil != err {
log.Printf("Error closing peerconnection...")
}
c.pc = nil
}
}