Use a channel to safely synchronize datachannel writes, (#12)

clean up ice candidate log message.
still need to debug the copy loop break.
This commit is contained in:
Serene Han 2016-02-19 16:17:17 -08:00
parent c4215b5614
commit c3ada1b545
2 changed files with 47 additions and 18 deletions

View file

@ -9,10 +9,12 @@ import (
type MockDataChannel struct { type MockDataChannel struct {
destination bytes.Buffer destination bytes.Buffer
done chan bool
} }
func (m *MockDataChannel) Send(data []byte) { func (m *MockDataChannel) Send(data []byte) {
m.destination.Write(data) m.destination.Write(data)
m.done <- true
} }
func (*MockDataChannel) Close() error { func (*MockDataChannel) Close() error {
@ -24,6 +26,7 @@ func TestConnect(t *testing.T) {
Convey("WebRTC Connection", func() { Convey("WebRTC Connection", func() {
c := new(webRTCConn) c := new(webRTCConn)
c.BytesInfo = &BytesInfo{ c.BytesInfo = &BytesInfo{
inboundChan: make(chan int), outboundChan: make(chan int), inboundChan: make(chan int), outboundChan: make(chan int),
inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
@ -31,15 +34,19 @@ func TestConnect(t *testing.T) {
So(c.buffer.Bytes(), ShouldEqual, nil) So(c.buffer.Bytes(), ShouldEqual, nil)
Convey("SendData buffers when datachannel is nil", func() { Convey("SendData buffers when datachannel is nil", func() {
c.sendData([]byte("test")) c.SendData([]byte("test"))
c.snowflake = nil c.snowflake = nil
So(c.buffer.Bytes(), ShouldResemble, []byte("test")) So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
}) })
Convey("SendData sends to datachannel when not nil", func() { Convey("SendData sends to datachannel when not nil", func() {
mock := new(MockDataChannel) mock := new(MockDataChannel)
mock.done = make(chan bool)
go c.SendLoop()
c.writeChannel = make(chan []byte)
c.snowflake = mock c.snowflake = mock
c.sendData([]byte("test")) c.SendData([]byte("test"))
<-mock.done
So(c.buffer.Bytes(), ShouldEqual, nil) So(c.buffer.Bytes(), ShouldEqual, nil)
So(mock.destination.Bytes(), ShouldResemble, []byte("test")) So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
}) })

View file

@ -36,12 +36,15 @@ const (
func copyLoop(a, b net.Conn) { func copyLoop(a, b net.Conn) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
// TODO fix the copy loop.
go func() { go func() {
io.Copy(b, a) io.Copy(b, a)
log.Println("copy loop b-a break")
wg.Done() wg.Done()
}() }()
go func() { go func() {
io.Copy(a, b) io.Copy(a, b)
log.Println("copy loop a-b break")
wg.Done() wg.Done()
}() }()
wg.Wait() wg.Wait()
@ -63,6 +66,7 @@ type webRTCConn struct {
offerChannel chan *webrtc.SessionDescription offerChannel chan *webrtc.SessionDescription
answerChannel chan *webrtc.SessionDescription answerChannel chan *webrtc.SessionDescription
errorChannel chan error errorChannel chan error
writeChannel chan []byte
recvPipe *io.PipeReader recvPipe *io.PipeReader
writePipe *io.PipeWriter writePipe *io.PipeWriter
buffer bytes.Buffer buffer bytes.Buffer
@ -77,7 +81,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
} }
func (c *webRTCConn) Write(b []byte) (int, error) { func (c *webRTCConn) Write(b []byte) (int, error) {
c.sendData(b) c.SendData(b)
return len(b), nil return len(b), nil
} }
@ -133,9 +137,9 @@ func (c *webRTCConn) PreparePeerConnection() {
} }
}() }()
} }
pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
log.Printf("WebRTC: OnIceCandidate %s", candidate.Serialize())
// Allow candidates to accumulate until OnIceComplete. // Allow candidates to accumulate until OnIceComplete.
pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
log.Printf(candidate.Candidate)
} }
// TODO: This may soon be deprecated, consider OnIceGatheringStateChange. // TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
pc.OnIceComplete = func() { pc.OnIceComplete = func() {
@ -169,10 +173,11 @@ func (c *webRTCConn) EstablishDataChannel() error {
// } // }
// Flush the buffer, then enable datachannel. // Flush the buffer, then enable datachannel.
// TODO: Make this more safe // TODO: Make this more safe
dc.Send(c.buffer.Bytes()) // dc.Send(c.buffer.Bytes())
log.Println("Flushed", c.buffer.Len(), "bytes") // log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset() // c.buffer.Reset()
c.snowflake = dc c.snowflake = dc
c.SendData(nil)
} }
dc.OnClose = func() { dc.OnClose = func() {
// Disable the DataChannel as a write destination. // Disable the DataChannel as a write destination.
@ -180,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
log.Println("WebRTC: DataChannel.OnClose") log.Println("WebRTC: DataChannel.OnClose")
if nil != c.snowflake { if nil != c.snowflake {
c.snowflake = nil c.snowflake = nil
// Only reset if this OnClose triggered // Only reset if this OnClose was triggered remotely.
c.Reset() c.Reset()
} }
} }
@ -247,21 +252,32 @@ func (c *webRTCConn) ReceiveAnswer() {
}() }()
} }
func (c *webRTCConn) sendData(data []byte) { func (c *webRTCConn) SendData(data []byte) {
c.BytesInfo.AddOutbound(len(data))
// Buffer the data in case datachannel isn't available yet. // Buffer the data in case datachannel isn't available yet.
if nil == c.snowflake { if nil == c.snowflake {
log.Printf("Buffered %d bytes --> WebRTC", len(data)) log.Printf("Buffered %d bytes --> WebRTC", len(data))
c.buffer.Write(data) c.buffer.Write(data)
return return
} }
// Otherwise, flush buffer if necessary. go func() {
c.writeChannel <- data
}()
}
// Expected in own goroutine.
func (c *webRTCConn) SendLoop() {
log.Println("send loop")
for data := range c.writeChannel {
// Flush buffer if necessary.
for c.buffer.Len() > 0 { for c.buffer.Len() > 0 {
c.snowflake.Send(c.buffer.Bytes()) c.snowflake.Send(c.buffer.Bytes())
log.Println("Flushed", c.buffer.Len(), "bytes") log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset() c.buffer.Reset()
} }
c.BytesInfo.AddOutbound(len(data))
c.snowflake.Send(data) c.snowflake.Send(data)
}
} }
// WebRTC re-establishment loop. Expected in own goroutine. // WebRTC re-establishment loop. Expected in own goroutine.
@ -296,6 +312,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.broker = broker connection.broker = broker
connection.offerChannel = make(chan *webrtc.SessionDescription) connection.offerChannel = make(chan *webrtc.SessionDescription)
connection.answerChannel = make(chan *webrtc.SessionDescription) connection.answerChannel = make(chan *webrtc.SessionDescription)
connection.writeChannel = make(chan []byte)
connection.errorChannel = make(chan error) connection.errorChannel = make(chan error)
connection.reset = make(chan struct{}) connection.reset = make(chan struct{})
connection.BytesInfo = &BytesInfo{ connection.BytesInfo = &BytesInfo{
@ -308,6 +325,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.recvPipe, connection.writePipe = io.Pipe() connection.recvPipe, connection.writePipe = io.Pipe()
go connection.ConnectLoop() go connection.ConnectLoop()
go connection.SendLoop()
return connection, nil return connection, nil
} }
@ -317,8 +335,10 @@ func endWebRTC() {
return return
} }
if nil != webrtcRemote.snowflake { if nil != webrtcRemote.snowflake {
s := webrtcRemote.snowflake
webrtcRemote.snowflake = nil
log.Printf("WebRTC: closing DataChannel") log.Printf("WebRTC: closing DataChannel")
webrtcRemote.snowflake.Close() s.Close()
} }
if nil != webrtcRemote.pc { if nil != webrtcRemote.pc {
log.Printf("WebRTC: closing PeerConnection") log.Printf("WebRTC: closing PeerConnection")
@ -333,6 +353,7 @@ func handler(conn *pt.SocksConn) error {
handlerChan <- -1 handlerChan <- -1
}() }()
defer conn.Close() defer conn.Close()
log.Println("handler", conn)
// TODO: [#3] Fetch ICE server information from Broker. // TODO: [#3] Fetch ICE server information from Broker.
// TODO: [#18] Consider TURN servers here too. // TODO: [#18] Consider TURN servers here too.
@ -357,6 +378,7 @@ func handler(conn *pt.SocksConn) error {
} }
copyLoop(conn, remote) copyLoop(conn, remote)
log.Println("----END---")
return nil return nil
} }