replace webrtcRemote with webrtcRemotes map & indexing, client multiplexing remotes confirmed working (#31)

This commit is contained in:
Serene Han 2016-03-23 19:40:26 -07:00 committed by Arlo Breault
parent a8ea5e586e
commit 22ace32a71
2 changed files with 20 additions and 25 deletions

View file

@ -22,7 +22,7 @@ var ptInfo pt.ClientInfo
const ( const (
ReconnectTimeout = 5 ReconnectTimeout = 5
SnowflakeCapacity = 1 SnowflakeCapacity = 3
) )
var brokerURL string var brokerURL string
@ -55,12 +55,13 @@ type SnowflakeChannel interface {
Close() error Close() error
} }
// Maintain |WebRTCSlots| number of open connections to // Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to SOCKS when needed. TODO: complete // transfer to the Tor SOCKS handler when needed.
func SnowflakeConnectLoop() { func SnowflakeConnectLoop() {
for { for {
if len(snowflakeChan) >= SnowflakeCapacity { numRemotes := len(snowflakeChan)
log.Println("At Capacity: ", len(snowflakeChan), "snowflake. Re-checking in 10s") if numRemotes >= SnowflakeCapacity {
log.Println("At Capacity: ", numRemotes, "snowflake. Re-checking in 10s")
<-time.After(time.Second * 10) <-time.After(time.Second * 10)
continue continue
} }
@ -70,9 +71,6 @@ func SnowflakeConnectLoop() {
<-time.After(time.Second * ReconnectTimeout) <-time.After(time.Second * ReconnectTimeout)
continue continue
} }
log.Println("Created a snowflake.")
// TODO: Better handling of multiplex snowflakes.
snowflakeChan <- s snowflakeChan <- s
} }
} }
@ -93,11 +91,9 @@ func dialWebRTC() (*webRTCConn, error) {
func endWebRTC() { func endWebRTC() {
log.Printf("WebRTC: interruped") log.Printf("WebRTC: interruped")
if nil == webrtcRemote { for _, r := range webrtcRemotes {
return r.Close()
} }
webrtcRemote.Close()
webrtcRemote = nil
} }
// Establish a WebRTC channel for SOCKS connections. // Establish a WebRTC channel for SOCKS connections.
@ -106,7 +102,7 @@ func handler(conn *pt.SocksConn) error {
defer func() { defer func() {
handlerChan <- -1 handlerChan <- -1
}() }()
// Wait for an available WebRTC remote...
remote, ok := <-snowflakeChan remote, ok := <-snowflakeChan
if remote == nil || !ok { if remote == nil || !ok {
conn.Reject() conn.Reject()
@ -114,8 +110,6 @@ func handler(conn *pt.SocksConn) error {
} }
defer remote.Close() defer remote.Close()
defer conn.Close() defer conn.Close()
// TODO: Fix this global
webrtcRemote = remote
log.Println("handler: Snowflake assigned.") log.Println("handler: Snowflake assigned.")
err := conn.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0}) err := conn.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
@ -123,8 +117,6 @@ func handler(conn *pt.SocksConn) error {
return err return err
} }
// TODO: Make SOCKS acceptance more independent from WebRTC so they can
// be more easily interchanged.
go copyLoop(conn, remote) go copyLoop(conn, remote)
// When WebRTC resets, close the SOCKS connection, which induces new handler. // When WebRTC resets, close the SOCKS connection, which induces new handler.
<-remote.reset <-remote.reset
@ -164,10 +156,10 @@ func readSignalingMessages(f *os.File) {
log.Printf("ignoring invalid signal message %+q", msg) log.Printf("ignoring invalid signal message %+q", msg)
continue continue
} }
webrtcRemote.answerChannel <- sdp webrtcRemotes[0].answerChannel <- sdp
} }
log.Printf("close answerChannel") log.Printf("close answerChannel")
close(webrtcRemote.answerChannel) close(webrtcRemotes[0].answerChannel)
if err := s.Err(); err != nil { if err := s.Err(); err != nil {
log.Printf("signal FIFO: %s", err) log.Printf("signal FIFO: %s", err)
} }
@ -211,6 +203,7 @@ func main() {
go readSignalingMessages(signalFile) go readSignalingMessages(signalFile)
} }
webrtcRemotes = make(map[int]*webRTCConn)
go SnowflakeConnectLoop() go SnowflakeConnectLoop()
ptInfo, err = pt.ClientSetup(nil) ptInfo, err = pt.ClientSetup(nil)

View file

@ -24,11 +24,12 @@ type webRTCConn struct {
writePipe *io.PipeWriter writePipe *io.PipeWriter
buffer bytes.Buffer buffer bytes.Buffer
reset chan struct{} reset chan struct{}
active bool index int
*BytesInfo *BytesInfo
} }
var webrtcRemote *webRTCConn var webrtcRemotes map[int]*webRTCConn
var remoteIndex int = 0
func (c *webRTCConn) Read(b []byte) (int, error) { func (c *webRTCConn) Read(b []byte) (int, error) {
return c.recvPipe.Read(b) return c.recvPipe.Read(b)
@ -53,6 +54,7 @@ func (c *webRTCConn) Close() error {
close(c.offerChannel) close(c.offerChannel)
close(c.answerChannel) close(c.answerChannel)
close(c.errorChannel) close(c.errorChannel)
delete(webrtcRemotes, c.index)
return err return err
} }
@ -87,7 +89,6 @@ func NewWebRTCConnection(config *webrtc.Configuration,
// creation & local description setting, which happens asynchronously. // creation & local description setting, which happens asynchronously.
connection.errorChannel = make(chan error, 1) connection.errorChannel = make(chan error, 1)
connection.reset = make(chan struct{}, 1) connection.reset = make(chan struct{}, 1)
connection.active = false
// Log every few seconds. // Log every few seconds.
connection.BytesInfo = &BytesInfo{ connection.BytesInfo = &BytesInfo{
@ -98,13 +99,14 @@ func NewWebRTCConnection(config *webrtc.Configuration,
// Pipes remain the same even when DataChannel gets switched. // Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe() connection.recvPipe, connection.writePipe = io.Pipe()
connection.index = remoteIndex
webrtcRemotes[connection.index] = connection
remoteIndex++
return connection return connection
} }
// TODO: Multiplex.
func (c *webRTCConn) Connect() error { func (c *webRTCConn) Connect() error {
log.Println("Establishing WebRTC connection...") log.Printf("Establishing WebRTC connection #%d...", c.index)
// TODO: When go-webrtc is more stable, it's possible that a new // TODO: When go-webrtc is more stable, it's possible that a new
// PeerConnection won't need to be re-prepared each time. // PeerConnection won't need to be re-prepared each time.
err := c.preparePeerConnection() err := c.preparePeerConnection()