Buffer writes to DataChannel, remove blocking on openChannel (#12)

This commit is contained in:
Serene Han 2016-02-17 18:38:40 -08:00
parent 760dee8a0f
commit eb7eb04ac0
2 changed files with 93 additions and 40 deletions

View file

@ -3,6 +3,7 @@ package main
import (
"bufio"
"bytes"
"errors"
"flag"
"fmt"
@ -46,16 +47,22 @@ func copyLoop(a, b net.Conn) {
wg.Wait()
}
// Interface that matches both webrc.DataChannel and for testing.
type SnowflakeChannel interface {
Send([]byte)
Close() error
}
// Implements net.Conn interface
type webRTCConn struct {
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel.
broker *BrokerChannel
recvPipe *io.PipeReader
writePipe *io.PipeWriter
offerChannel chan *webrtc.SessionDescription
errorChannel chan error
openChannel chan struct{}
recvPipe *io.PipeReader
writePipe *io.PipeWriter
buffer bytes.Buffer
}
var webrtcRemote *webRTCConn
@ -66,9 +73,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
func (c *webRTCConn) Write(b []byte) (int, error) {
// log.Printf("webrtc Write %d %+q", len(b), string(b))
log.Printf("Write %d bytes --> WebRTC", len(b))
// Buffer in case datachannel isn't available.
c.dc.Send(b)
c.sendData(b)
return len(b), nil
}
@ -98,21 +103,29 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
}
// Create a WebRTC DataChannel locally.
// This triggers "OnNegotiationNeeded" which should prepare an SDP offer.
func (c *webRTCConn) EstablishDataChannel() error {
dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
// an SDP offer while other goroutines operating on this struct handle the
// signaling. Eventually fires "OnOpen".
if err != nil {
log.Printf("CreateDataChannel: %s", err)
return err
}
dc.OnOpen = func() {
log.Println("OnOpen channel")
c.openChannel <- struct{}{}
log.Println("WebRTC: DataChannel.OnOpen")
// Flush the buffer, then enable datachannel.
// TODO: Make this more safe
dc.Send(c.buffer.Bytes())
log.Println("Flushed ", c.buffer.Len(), " bytes")
c.buffer.Reset()
c.snowflake = dc
}
dc.OnClose = func() {
log.Println("OnClose channel")
// writePipe.Close()
close(c.openChannel)
// Disable the DataChannel as a write destination.
// Future writes will go to the buffer until a new DataChannel is available.
log.Println("WebRTC: DataChannel.OnClose")
c.snowflake = nil
// TODO: (Issue #12) Should attempt to renegotiate at this point.
}
dc.OnMessage = func(msg []byte) {
@ -126,7 +139,6 @@ func (c *webRTCConn) EstablishDataChannel() error {
panic("short write")
}
}
c.dc = dc
return nil
}
@ -153,8 +165,8 @@ func (c *webRTCConn) sendOffer() error {
}
if nil == answer {
log.Printf("BrokerChannel: No answer received.")
// TODO: Should try again here.
return
// return errors.New("No answer received.")
}
answerChannel <- answer
}()
@ -165,6 +177,18 @@ func (c *webRTCConn) sendOffer() error {
return nil
}
func (c *webRTCConn) sendData(data []byte) {
// Buffer the data in case datachannel isn't available yet.
if nil == c.snowflake {
log.Printf("Buffered %d bytes --> WebRTC", len(data))
c.buffer.Write(data)
return
}
log.Printf("Write %d bytes --> WebRTC", len(data))
c.snowflake.Send(data)
}
// Initialize a WebRTC Connection.
func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
*webRTCConn, error) {
pc, err := webrtc.NewPeerConnection(config)
@ -177,13 +201,14 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.pc = pc
connection.offerChannel = make(chan *webrtc.SessionDescription)
connection.errorChannel = make(chan error)
connection.openChannel = make(chan struct{})
// Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe()
// Triggered by CreateDataChannel.
pc.OnNegotiationNeeded = func() {
log.Println("OnNegotiationNeeded")
go func() {
offer, err := pc.CreateOffer()
// TODO: Potentially timeout and retry if ICE isn't working.
if err != nil {
connection.errorChannel <- err
return
@ -208,18 +233,17 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
// of the data channel, not the remote peer.
pc.OnDataChannel = func(channel *webrtc.DataChannel) {
log.Println("OnDataChannel")
panic("OnDataChannel")
panic("Unexpected OnDataChannel!")
}
// Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe()
connection.EstablishDataChannel()
connection.sendOffer()
// TODO: Make this part of a re-establishment loop.
connection.sendOffer()
log.Printf("waiting for answer...")
answer, ok := <-answerChannel
if !ok {
// TODO: Don't just fail, try again!
pc.Close()
return nil, fmt.Errorf("no answer received")
}
@ -230,15 +254,6 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
return nil, err
}
// Wait until data channel is open; otherwise for example sends may get
// lost.
// TODO: Buffering *should* work though.
_, ok = <-connection.openChannel
if !ok {
pc.Close()
return nil, fmt.Errorf("failed to open data channel")
}
return connection, nil
}
@ -247,9 +262,9 @@ func endWebRTC() {
if nil == webrtcRemote {
return
}
if nil != webrtcRemote.dc {
if nil != webrtcRemote.snowflake {
log.Printf("WebRTC: closing DataChannel")
webrtcRemote.dc.Close()
webrtcRemote.snowflake.Close()
}
if nil != webrtcRemote.pc {
log.Printf("WebRTC: closing PeerConnection")
@ -332,7 +347,7 @@ func readSignalingMessages(f *os.File) {
func main() {
var err error
webrtc.SetLoggingVerbosity(1)
flag.StringVar(&brokerURL, "url", "", "URL of signaling broker")
flag.StringVar(&frontDomain, "front", "", "front domain")
flag.Parse()
@ -368,8 +383,6 @@ func main() {
go readSignalingMessages(signalFile)
}
webrtc.SetLoggingVerbosity(1)
ptInfo, err = pt.ClientSetup(nil)
if err != nil {
log.Fatal(err)
@ -417,10 +430,6 @@ func main() {
ln.Close()
}
// if syscall.SIGTERM == sig || syscall.SIGINT == sig {
// return
// }
// wait for second signal or no more handlers
sig = nil
for sig == nil && numHandlers != 0 {