Snowflake client now using a reconnect loop (#12)

This commit is contained in:
Serene Han 2016-02-17 19:19:11 -08:00
parent eb7eb04ac0
commit f205a0be59
2 changed files with 93 additions and 62 deletions

View file

@ -38,6 +38,10 @@ func TestConnect(t *testing.T) {
So(c.buffer.Bytes(), ShouldEqual, nil) So(c.buffer.Bytes(), ShouldEqual, nil)
So(mock.destination.Bytes(), ShouldResemble, []byte("test")) So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
}) })
Convey("Connect Loop", func() {
// TODO
})
}) })
}) })

View file

@ -28,9 +28,12 @@ var frontDomain string
// When a connection handler starts, +1 is written to this channel; when it // When a connection handler starts, +1 is written to this channel; when it
// ends, -1 is written. // ends, -1 is written.
var handlerChan = make(chan int) var handlerChan = make(chan int)
var answerChannel = make(chan *webrtc.SessionDescription) var answerChannel = make(chan *webrtc.SessionDescription)
const (
ReconnectTimeout = 5
)
func copyLoop(a, b net.Conn) { func copyLoop(a, b net.Conn) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
@ -55,14 +58,16 @@ type SnowflakeChannel interface {
// Implements net.Conn interface // Implements net.Conn interface
type webRTCConn struct { type webRTCConn struct {
config *webrtc.Configuration
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel.
broker *BrokerChannel broker *BrokerChannel
offerChannel chan *webrtc.SessionDescription offerChannel chan *webrtc.SessionDescription
errorChannel chan error errorChannel chan error
recvPipe *io.PipeReader recvPipe *io.PipeReader
writePipe *io.PipeWriter writePipe *io.PipeWriter
buffer bytes.Buffer buffer bytes.Buffer
reset chan struct{}
} }
var webrtcRemote *webRTCConn var webrtcRemote *webRTCConn
@ -72,7 +77,6 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
} }
func (c *webRTCConn) Write(b []byte) (int, error) { func (c *webRTCConn) Write(b []byte) (int, error) {
// log.Printf("webrtc Write %d %+q", len(b), string(b))
c.sendData(b) c.sendData(b)
return len(b), nil return len(b), nil
} }
@ -102,10 +106,56 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
return fmt.Errorf("SetWriteDeadline not implemented") return fmt.Errorf("SetWriteDeadline not implemented")
} }
func (c *webRTCConn) PreparePeerConnection() {
if nil != c.pc {
log.Printf("PeerConnection already exists.")
c.pc.Close()
c.pc = nil
}
pc, err := webrtc.NewPeerConnection(c.config)
if err != nil {
log.Printf("NewPeerConnection: %s", err)
c.errorChannel <- err
}
// Prepare PeerConnection callbacks.
pc.OnNegotiationNeeded = func() {
log.Println("WebRTC: OnNegotiationNeeded")
go func() {
offer, err := pc.CreateOffer()
// TODO: Potentially timeout and retry if ICE isn't working.
if err != nil {
c.errorChannel <- err
return
}
err = pc.SetLocalDescription(offer)
if err != nil {
c.errorChannel <- err
return
}
}()
}
pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
log.Printf("OnIceCandidate %s", candidate.Serialize())
// Allow candidates to accumulate until OnIceComplete.
}
// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
pc.OnIceComplete = func() {
log.Printf("OnIceComplete")
c.offerChannel <- pc.LocalDescription()
}
// This callback is not expected, as the Client initiates the creation
// of the data channel, not the remote peer.
pc.OnDataChannel = func(channel *webrtc.DataChannel) {
log.Println("OnDataChannel")
panic("Unexpected OnDataChannel!")
}
c.pc = pc
}
// Create a WebRTC DataChannel locally. // Create a WebRTC DataChannel locally.
func (c *webRTCConn) EstablishDataChannel() error { func (c *webRTCConn) EstablishDataChannel() error {
dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{}) dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
// an SDP offer while other goroutines operating on this struct handle the // an SDP offer while other goroutines operating on this struct handle the
// signaling. Eventually fires "OnOpen". // signaling. Eventually fires "OnOpen".
if err != nil { if err != nil {
@ -126,7 +176,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
// Future writes will go to the buffer until a new DataChannel is available. // Future writes will go to the buffer until a new DataChannel is available.
log.Println("WebRTC: DataChannel.OnClose") log.Println("WebRTC: DataChannel.OnClose")
c.snowflake = nil c.snowflake = nil
// TODO: (Issue #12) Should attempt to renegotiate at this point. c.reset <- struct{}{} // Attempt to negotiate a new datachannel..
} }
dc.OnMessage = func(msg []byte) { dc.OnMessage = func(msg []byte) {
log.Printf("OnMessage <--- %d bytes", len(msg)) log.Printf("OnMessage <--- %d bytes", len(msg))
@ -144,7 +194,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
// Block until an offer is available, then send it to either // Block until an offer is available, then send it to either
// the Broker or signal pipe. // the Broker or signal pipe.
func (c *webRTCConn) sendOffer() error { func (c *webRTCConn) SendOffer() error {
select { select {
case offer := <-c.offerChannel: case offer := <-c.offerChannel:
if "" == brokerURL { if "" == brokerURL {
@ -166,6 +216,7 @@ func (c *webRTCConn) sendOffer() error {
if nil == answer { if nil == answer {
log.Printf("BrokerChannel: No answer received.") log.Printf("BrokerChannel: No answer received.")
// TODO: Should try again here. // TODO: Should try again here.
c.reset <- struct{}{}
return return
} }
answerChannel <- answer answerChannel <- answer
@ -177,6 +228,19 @@ func (c *webRTCConn) sendOffer() error {
return nil return nil
} }
func (c *webRTCConn) ReceiveAnswer() error {
log.Printf("waiting for answer...")
answer, ok := <-answerChannel
if !ok {
// TODO: Don't just fail, try again!
c.pc.Close()
// connection.errorChannel <- errors.New("Bad answer")
return errors.New("Bad answer")
}
log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
return c.pc.SetRemoteDescription(answer)
}
func (c *webRTCConn) sendData(data []byte) { func (c *webRTCConn) sendData(data []byte) {
// Buffer the data in case datachannel isn't available yet. // Buffer the data in case datachannel isn't available yet.
if nil == c.snowflake { if nil == c.snowflake {
@ -188,72 +252,35 @@ func (c *webRTCConn) sendData(data []byte) {
c.snowflake.Send(data) c.snowflake.Send(data)
} }
// WebRTC re-establishment loop. Expected in own goroutine.
func (c *webRTCConn) ConnectLoop() {
for {
log.Println("Establishing WebRTC connection...")
// TODO: When go-webrtc is more stable, it's possible that a new
// PeerConnection won't need to be recreated each time.
// called once.
c.PreparePeerConnection()
c.EstablishDataChannel()
c.SendOffer()
c.ReceiveAnswer()
<-c.reset
log.Println(" --- snowflake connection reset ---")
}
}
// Initialize a WebRTC Connection. // Initialize a WebRTC Connection.
func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
*webRTCConn, error) { *webRTCConn, error) {
pc, err := webrtc.NewPeerConnection(config)
if err != nil {
log.Printf("NewPeerConnection: %s", err)
return nil, err
}
connection := new(webRTCConn) connection := new(webRTCConn)
connection.config = config
connection.broker = broker connection.broker = broker
connection.pc = pc
connection.offerChannel = make(chan *webrtc.SessionDescription) connection.offerChannel = make(chan *webrtc.SessionDescription)
connection.errorChannel = make(chan error) connection.errorChannel = make(chan error)
connection.reset = make(chan struct{})
// Pipes remain the same even when DataChannel gets switched. // Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe() connection.recvPipe, connection.writePipe = io.Pipe()
pc.OnNegotiationNeeded = func() { go connection.ConnectLoop()
log.Println("OnNegotiationNeeded")
go func() {
offer, err := pc.CreateOffer()
// TODO: Potentially timeout and retry if ICE isn't working.
if err != nil {
connection.errorChannel <- err
return
}
err = pc.SetLocalDescription(offer)
if err != nil {
connection.errorChannel <- err
return
}
}()
}
pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
log.Printf("OnIceCandidate %s", candidate.Serialize())
// Allow candidates to accumulate until OnIceComplete.
}
// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
pc.OnIceComplete = func() {
log.Printf("OnIceComplete")
connection.offerChannel <- pc.LocalDescription()
}
// This callback is not expected, as the Client initiates the creation
// of the data channel, not the remote peer.
pc.OnDataChannel = func(channel *webrtc.DataChannel) {
log.Println("OnDataChannel")
panic("Unexpected OnDataChannel!")
}
connection.EstablishDataChannel()
// TODO: Make this part of a re-establishment loop.
connection.sendOffer()
log.Printf("waiting for answer...")
answer, ok := <-answerChannel
if !ok {
// TODO: Don't just fail, try again!
pc.Close()
return nil, fmt.Errorf("no answer received")
}
log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
err = pc.SetRemoteDescription(answer)
if err != nil {
pc.Close()
return nil, err
}
return connection, nil return connection, nil
} }