diff --git a/server/snowflake.go b/server/snowflake.go index d046998..88a07d8 100644 --- a/server/snowflake.go +++ b/server/snowflake.go @@ -81,6 +81,8 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { } func datachannelHandler(conn *webRTCConn) { + defer conn.Close() + handlerChan <- 1 defer func() { handlerChan <- -1 @@ -91,58 +93,104 @@ func datachannelHandler(conn *webRTCConn) { log.Printf("Failed to connect to ORPort: " + err.Error()) return } - //defer or.Close() + defer or.Close() - pr, pw := io.Pipe() - conn.pr = pr - - dc := conn.dc - dc.OnOpen = func() { - log.Println("OnOpen channel") - } - dc.OnClose = func() { - log.Println("OnClose channel") - pw.Close() - } - dc.OnMessage = func(msg []byte) { - // log.Printf("OnMessage channel %d %+q", len(msg), msg) - log.Printf("OnMessage <--- %d bytes", len(msg)) - n, err := pw.Write(msg) - if err != nil { - pw.CloseWithError(err) - } - if n != len(msg) { - panic("short write") - } - } - - go copyLoop(conn, or) + copyLoop(conn, or) } -func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) { - pc, err := webrtc.NewPeerConnection(config) +// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE +// candidates is complete and and answer is available in LocalDescription. +// Installs an OnDataChannel callback that creates a webRTCConn and passes it to +// datachannelHandler. +func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration) (*webrtc.PeerConnection, error) { + errChan := make(chan error) + answerChan := make(chan *webrtc.SessionDescription) + pc, err := webrtc.NewPeerConnection(config) if err != nil { - log.Printf("NewPeerConnection: %s", err) - return nil, err + return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) } pc.OnNegotiationNeeded = func() { panic("OnNegotiationNeeded") } + pc.OnIceComplete = func() { + answerChan <- pc.LocalDescription() + } pc.OnDataChannel = func(dc *data.Channel) { log.Println("OnDataChannel") - datachannelHandler(&webRTCConn{pc: pc, dc: dc}) + + pr, pw := io.Pipe() + + dc.OnOpen = func() { + log.Println("OnOpen channel") + } + dc.OnClose = func() { + log.Println("OnClose channel") + pw.Close() + } + dc.OnMessage = func(msg []byte) { + log.Printf("OnMessage <--- %d bytes", len(msg)) + n, err := pw.Write(msg) + if err != nil { + pw.CloseWithError(err) + } + if n != len(msg) { + panic("short write") + } + } + + conn := &webRTCConn{pc: pc, dc: dc, pr: pr} + go datachannelHandler(conn) } - pc.OnIceComplete = func() { - log.Printf("----------------") - fmt.Fprintln(logFile, pc.LocalDescription().Serialize()) - log.Printf("----------------") + + err = pc.SetRemoteDescription(sdp) + if err != nil { + pc.Close() + return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err) } + log.Println("sdp offer successfully received.") + + go func() { + log.Println("Generating answer...") + answer, err := pc.CreateAnswer() // blocking + if err != nil { + errChan <- err + return + } + err = pc.SetLocalDescription(answer) + if err != nil { + errChan <- err + return + } + }() + + // Wait until answer is ready. + select { + case err = <-errChan: + pc.Close() + return nil, err + case <-answerChan: + } + return pc, nil } -func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f *os.File) { - s := bufio.NewScanner(f) +// Create a signaling named pipe and feed offers from it into +// makePeerConnectionFromOffer. +func receiveSignalsFIFO(filename string, config *webrtc.Configuration) error { + err := syscall.Mkfifo(filename, 0600) + if err != nil { + if err.(syscall.Errno) != syscall.EEXIST { + return err + } + } + signalFile, err := os.OpenFile(filename, os.O_RDONLY, 0600) + if err != nil { + return err + } + defer signalFile.Close() + + s := bufio.NewScanner(signalFile) for s.Scan() { msg := s.Text() sdp := webrtc.DeserializeSessionDescription(msg) @@ -150,62 +198,18 @@ func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f *os.Fil log.Printf("ignoring invalid signal message %+q", msg) continue } - signalChan <- sdp - continue - } - if err := s.Err(); err != nil { - log.Printf("signal FIFO: %s", err) - } -} -func generateAnswer(pc *webrtc.PeerConnection) { - fmt.Println("Generating answer...") - answer, err := pc.CreateAnswer() // blocking - if err != nil { - fmt.Println(err) - return - } - pc.SetLocalDescription(answer) -} - -func listenWebRTC(config *webrtc.Configuration, signal string) (*os.File, error) { - err := syscall.Mkfifo(signal, 0600) - if err != nil { - if err.(syscall.Errno) != syscall.EEXIST { - return nil, err + pc, err := makePeerConnectionFromOffer(sdp, config) + if err != nil { + log.Printf("makePeerConnectionFromOffer: %s", err) + continue } + // Write offer to log for manual signaling. + log.Printf("----------------") + fmt.Fprintln(logFile, pc.LocalDescription().Serialize()) + log.Printf("----------------") } - signalFile, err := os.OpenFile(signal, os.O_RDONLY, 0600) - if err != nil { - return nil, err - } - //defer signalFile.Close() - - var signalChan = make(chan *webrtc.SessionDescription) - - go func() { - for { - select { - case sdp := <-signalChan: - pc, err := makePeerConnection(config) - if err != nil { - log.Printf("makePeerConnection: %s", err) - break - } - err = pc.SetRemoteDescription(sdp) - if err != nil { - fmt.Println("ERROR", err) - break - } - fmt.Println("sdp offer successfully received.") - go generateAnswer(pc) - } - } - }() - - go readSignalingMessages(signalChan, signalFile) - log.Printf("waiting for offer") - return signalFile, nil + return s.Err() } func main() { @@ -228,18 +232,19 @@ func main() { webRTCConfig := webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302")) - listeners := make([]*os.File, 0) + // Start FIFO-based signaling receiver. + go func() { + err := receiveSignalsFIFO("signal", webRTCConfig) + if err != nil { + log.Printf("receiveSignalsFIFO: %s", err) + } + }() + for _, bindaddr := range ptInfo.Bindaddrs { switch bindaddr.MethodName { case ptMethodName: - ln, err := listenWebRTC(webRTCConfig, "signal") // meh - if err != nil { - pt.SmethodError(bindaddr.MethodName, err.Error()) - break - } bindaddr.Addr.Port = 12345 // lies!!! pt.Smethod(bindaddr.MethodName, bindaddr.Addr) - listeners = append(listeners, ln) default: pt.SmethodError(bindaddr.MethodName, "no such method") } @@ -260,9 +265,6 @@ func main() { case sig = <-sigChan: } } - for _, ln := range listeners { - ln.Close() - } if sig == syscall.SIGTERM { return