log traffic bytes only once every few seconds, along with OnMessage & datachannel.Send counts, to prevent flooded logs

This commit is contained in:
Serene Han 2016-02-18 14:15:22 -08:00
parent a1b7e01c54
commit c4215b5614
3 changed files with 83 additions and 5 deletions

View file

@ -24,6 +24,10 @@ func TestConnect(t *testing.T) {
Convey("WebRTC Connection", func() { Convey("WebRTC Connection", func() {
c := new(webRTCConn) c := new(webRTCConn)
c.BytesInfo = &BytesInfo{
inboundChan: make(chan int), outboundChan: make(chan int),
inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
}
So(c.buffer.Bytes(), ShouldEqual, nil) So(c.buffer.Bytes(), ShouldEqual, nil)
Convey("SendData buffers when datachannel is nil", func() { Convey("SendData buffers when datachannel is nil", func() {

View file

@ -67,6 +67,7 @@ type webRTCConn struct {
writePipe *io.PipeWriter writePipe *io.PipeWriter
buffer bytes.Buffer buffer bytes.Buffer
reset chan struct{} reset chan struct{}
*BytesInfo
} }
var webrtcRemote *webRTCConn var webrtcRemote *webRTCConn
@ -164,12 +165,12 @@ func (c *webRTCConn) EstablishDataChannel() error {
dc.OnOpen = func() { dc.OnOpen = func() {
log.Println("WebRTC: DataChannel.OnOpen") log.Println("WebRTC: DataChannel.OnOpen")
// if nil != c.snowflake { // if nil != c.snowflake {
// panic("PeerConnection snowflake already exists.") // panic("PeerConnection snowflake already exists.")
// } // }
// Flush the buffer, then enable datachannel. // Flush the buffer, then enable datachannel.
// TODO: Make this more safe // TODO: Make this more safe
dc.Send(c.buffer.Bytes()) dc.Send(c.buffer.Bytes())
log.Println("Flushed ", c.buffer.Len(), " bytes") log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset() c.buffer.Reset()
c.snowflake = dc c.snowflake = dc
} }
@ -184,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
} }
} }
dc.OnMessage = func(msg []byte) { dc.OnMessage = func(msg []byte) {
log.Printf("OnMessage <--- %d bytes", len(msg)) c.BytesInfo.AddInbound(len(msg))
n, err := c.writePipe.Write(msg) n, err := c.writePipe.Write(msg)
if err != nil { if err != nil {
// TODO: Maybe shouldn't actually close. // TODO: Maybe shouldn't actually close.
@ -247,6 +248,7 @@ func (c *webRTCConn) ReceiveAnswer() {
} }
func (c *webRTCConn) sendData(data []byte) { func (c *webRTCConn) sendData(data []byte) {
c.BytesInfo.AddOutbound(len(data))
// Buffer the data in case datachannel isn't available yet. // Buffer the data in case datachannel isn't available yet.
if nil == c.snowflake { if nil == c.snowflake {
log.Printf("Buffered %d bytes --> WebRTC", len(data)) log.Printf("Buffered %d bytes --> WebRTC", len(data))
@ -256,10 +258,9 @@ func (c *webRTCConn) sendData(data []byte) {
// Otherwise, flush buffer if necessary. // Otherwise, flush buffer if necessary.
for c.buffer.Len() > 0 { for c.buffer.Len() > 0 {
c.snowflake.Send(c.buffer.Bytes()) c.snowflake.Send(c.buffer.Bytes())
log.Println("Flushed ", c.buffer.Len(), " bytes") log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset() c.buffer.Reset()
} }
log.Printf("Write %d bytes --> WebRTC", len(data))
c.snowflake.Send(data) c.snowflake.Send(data)
} }
@ -297,6 +298,12 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.answerChannel = make(chan *webrtc.SessionDescription) connection.answerChannel = make(chan *webrtc.SessionDescription)
connection.errorChannel = make(chan error) connection.errorChannel = make(chan error)
connection.reset = make(chan struct{}) connection.reset = make(chan struct{})
connection.BytesInfo = &BytesInfo{
inboundChan: make(chan int), outboundChan: make(chan int),
inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
}
go connection.BytesInfo.Log()
// Pipes remain the same even when DataChannel gets switched. // Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe() connection.recvPipe, connection.writePipe = io.Pipe()

67
client/util.go Normal file
View file

@ -0,0 +1,67 @@
package main
import (
"log"
"time"
)
type BytesInfo struct {
outboundChan chan int
inboundChan chan int
outbound int
inbound int
outEvents int
inEvents int
isLogging bool
}
func (b *BytesInfo) Log() {
b.isLogging = true
var amount int
output := func() {
log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
b.inbound, b.outbound, b.inEvents, b.outEvents)
b.outbound = 0
b.outEvents = 0
b.inbound = 0
b.inEvents = 0
}
last := time.Now()
for {
select {
case amount = <-b.outboundChan:
b.outbound += amount
b.outEvents++
last := time.Now()
if time.Since(last) > time.Second*5 {
last = time.Now()
output()
}
case amount = <-b.inboundChan:
b.inbound += amount
b.inEvents++
if time.Since(last) > time.Second*5 {
last = time.Now()
output()
}
case <-time.After(time.Second * 5):
if b.inEvents > 0 || b.outEvents > 0 {
output()
}
}
}
}
func (b *BytesInfo) AddOutbound(amount int) {
if !b.isLogging {
return
}
b.outboundChan <- amount
}
func (b *BytesInfo) AddInbound(amount int) {
if !b.isLogging {
return
}
b.inboundChan <- amount
}