remove webRTCConn SendLoop and simplify Write without additional channel, as net.Conn is already safe

This commit is contained in:
Serene Han 2016-02-23 17:34:51 -08:00
parent 3a7e0ea620
commit d4efe774d1
3 changed files with 24 additions and 43 deletions

View file

@ -33,19 +33,17 @@ func TestConnect(t *testing.T) {
} }
So(c.buffer.Bytes(), ShouldEqual, nil) So(c.buffer.Bytes(), ShouldEqual, nil)
Convey("sendData buffers when datachannel is nil", func() { Convey("Write buffers when datachannel is nil", func() {
c.sendData([]byte("test")) c.Write([]byte("test"))
c.snowflake = nil c.snowflake = nil
So(c.buffer.Bytes(), ShouldResemble, []byte("test")) So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
}) })
Convey("sendData sends to datachannel when not nil", func() { Convey("Write sends to datachannel when not nil", func() {
mock := new(MockDataChannel) mock := new(MockDataChannel)
mock.done = make(chan bool)
go c.SendLoop()
c.writeChannel = make(chan []byte)
c.snowflake = mock c.snowflake = mock
c.sendData([]byte("test")) mock.done = make(chan bool, 1)
c.Write([]byte("test"))
<-mock.done <-mock.done
So(c.buffer.Bytes(), ShouldEqual, nil) So(c.buffer.Bytes(), ShouldEqual, nil)
So(mock.destination.Bytes(), ShouldResemble, []byte("test")) So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
@ -54,7 +52,7 @@ func TestConnect(t *testing.T) {
Convey("Receive answer sets remote description", func() { Convey("Receive answer sets remote description", func() {
c.answerChannel = make(chan *webrtc.SessionDescription) c.answerChannel = make(chan *webrtc.SessionDescription)
c.config = webrtc.NewConfiguration() c.config = webrtc.NewConfiguration()
c.PreparePeerConnection() c.preparePeerConnection()
c.receiveAnswer() c.receiveAnswer()
sdp := webrtc.DeserializeSessionDescription("test") sdp := webrtc.DeserializeSessionDescription("test")
c.answerChannel <- sdp c.answerChannel <- sdp

View file

@ -70,7 +70,6 @@ func dialWebRTC() (*webRTCConn, error) {
connection := NewWebRTCConnection(config, broker) connection := NewWebRTCConnection(config, broker)
go connection.ConnectLoop() go connection.ConnectLoop()
go connection.SendLoop()
return connection, nil return connection, nil
} }

View file

@ -19,7 +19,6 @@ type webRTCConn struct {
offerChannel chan *webrtc.SessionDescription offerChannel chan *webrtc.SessionDescription
answerChannel chan *webrtc.SessionDescription answerChannel chan *webrtc.SessionDescription
errorChannel chan error errorChannel chan error
writeChannel chan []byte
recvPipe *io.PipeReader recvPipe *io.PipeReader
writePipe *io.PipeWriter writePipe *io.PipeWriter
buffer bytes.Buffer buffer bytes.Buffer
@ -33,8 +32,15 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
return c.recvPipe.Read(b) return c.recvPipe.Read(b)
} }
// Writes bytes out to the snowflake proxy.
func (c *webRTCConn) Write(b []byte) (int, error) { func (c *webRTCConn) Write(b []byte) (int, error) {
c.sendData(b) c.BytesInfo.AddOutbound(len(b))
if nil == c.snowflake {
log.Printf("Buffered %d bytes --> WebRTC", len(b))
c.buffer.Write(b)
} else {
c.snowflake.Send(b)
}
return len(b), nil return len(b), nil
} }
@ -64,19 +70,18 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
} }
func NewWebRTCConnection(config *webrtc.Configuration, func NewWebRTCConnection(config *webrtc.Configuration,
broker *BrokerChannel) *webRTCConn { broker *BrokerChannel) *webRTCConn {
connection := new(webRTCConn) connection := new(webRTCConn)
connection.config = config connection.config = config
connection.broker = broker connection.broker = broker
connection.offerChannel = make(chan *webrtc.SessionDescription) connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
connection.answerChannel = make(chan *webrtc.SessionDescription) connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
connection.writeChannel = make(chan []byte) connection.errorChannel = make(chan error, 1)
connection.errorChannel = make(chan error) connection.reset = make(chan struct{}, 1)
connection.reset = make(chan struct{})
// Log every few seconds. // Log every few seconds.
connection.BytesInfo = &BytesInfo{ connection.BytesInfo = &BytesInfo{
inboundChan: make(chan int), outboundChan: make(chan int), inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
} }
go connection.BytesInfo.Log() go connection.BytesInfo.Log()
@ -106,20 +111,6 @@ func (c *webRTCConn) ConnectLoop() {
} }
} }
// Expected in own goroutine.
func (c *webRTCConn) SendLoop() {
log.Println("send loop")
for data := range c.writeChannel {
// Flush buffer if necessary.
for c.buffer.Len() > 0 {
c.snowflake.Send(c.buffer.Bytes())
log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset()
}
c.snowflake.Send(data)
}
}
func (c *webRTCConn) preparePeerConnection() { func (c *webRTCConn) preparePeerConnection() {
if nil != c.pc { if nil != c.pc {
c.pc.Close() c.pc.Close()
@ -185,6 +176,7 @@ func (c *webRTCConn) establishDataChannel() error {
dc.Send(c.buffer.Bytes()) dc.Send(c.buffer.Bytes())
log.Println("Flushed", c.buffer.Len(), "bytes") log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset() c.buffer.Reset()
c.snowflake = dc c.snowflake = dc
} }
dc.OnClose = func() { dc.OnClose = func() {
@ -198,6 +190,9 @@ func (c *webRTCConn) establishDataChannel() error {
} }
} }
dc.OnMessage = func(msg []byte) { dc.OnMessage = func(msg []byte) {
if len(msg) <= 0 {
log.Println("0 length---")
}
c.BytesInfo.AddInbound(len(msg)) c.BytesInfo.AddInbound(len(msg))
n, err := c.writePipe.Write(msg) n, err := c.writePipe.Write(msg)
if err != nil { if err != nil {
@ -261,17 +256,6 @@ func (c *webRTCConn) receiveAnswer() {
}() }()
} }
func (c *webRTCConn) sendData(data []byte) {
c.BytesInfo.AddOutbound(len(data))
// Buffer the data in case datachannel isn't available yet.
if nil == c.snowflake {
log.Printf("Buffered %d bytes --> WebRTC", len(data))
c.buffer.Write(data)
return
}
c.writeChannel <- data
}
func (c *webRTCConn) Reset() { func (c *webRTCConn) Reset() {
go func() { go func() {
c.reset <- struct{}{} // Attempt to negotiate a new datachannel.. c.reset <- struct{}{} // Attempt to negotiate a new datachannel..