From 5eb77569fe422825f7e226572fe5e6db5de886e6 Mon Sep 17 00:00:00 2001 From: Arlo Breault Date: Fri, 8 Jan 2016 15:12:08 -0800 Subject: [PATCH] Try to use named pipe on the server to start as well --- .gitignore | 2 +- server/snowflake.go | 255 ++++++++++++++++++-------------------------- server/torrc | 3 +- 3 files changed, 104 insertions(+), 156 deletions(-) diff --git a/.gitignore b/.gitignore index 6906b4e..d84176b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,5 @@ .DS_Store datadir/ client/snowflake -server/snowflake +server/server snowflake.log diff --git a/server/snowflake.go b/server/snowflake.go index d1eb15e..b6bfcee 100644 --- a/server/snowflake.go +++ b/server/snowflake.go @@ -12,25 +12,22 @@ import ( "syscall" "time" + "git.torproject.org/pluggable-transports/goptlib.git" "github.com/keroserene/go-webrtc" "github.com/keroserene/go-webrtc/data" - - "git.torproject.org/pluggable-transports/goptlib.git" ) +var ptMethodName = "snowflake" var ptInfo pt.ServerInfo var logFile *os.File -// When a connection handler starts, +1 is written to this channel; when it -// ends, -1 is written. +// When a datachannel handler starts, +1 is written to this channel; +// when it ends, -1 is written. var handlerChan = make(chan int) -var signalChan = make(chan *webrtc.SessionDescription) - func copyLoop(a, b net.Conn) { var wg sync.WaitGroup wg.Add(2) - go func() { io.Copy(b, a) wg.Done() @@ -39,28 +36,25 @@ func copyLoop(a, b net.Conn) { io.Copy(a, b) wg.Done() }() - wg.Wait() } type webRTCConn struct { - pc *webrtc.PeerConnection - dc *data.Channel - recvPipe *io.PipeReader + dc *data.Channel + pc *webrtc.PeerConnection + pr *io.PipeReader } func (c *webRTCConn) Read(b []byte) (int, error) { - return c.recvPipe.Read(b) + return c.pr.Read(b) } func (c *webRTCConn) Write(b []byte) (int, error) { - log.Printf("webrtc Write %d %q", len(b), string(b)) - dc.Send(b) + c.dc.Send(b) return len(b), nil } func (c *webRTCConn) Close() error { - // Data channel closed implicitly? return c.pc.Close() } @@ -84,149 +78,57 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("SetWriteDeadline not implemented") } -type webRTCListener struct { - peerConnectionChan chan *webrtc.PeerConnection - stopChan chan struct{} -} - -func (ln *webRTCListener) Accept() (net.Conn, error) { - offer, ok := <-signalChan - if !ok { - return nil, fmt.Errorf("signal channel closed") - } - - pc, ok := <-ln.peerConnectionChan - if !ok { - return nil, fmt.Errorf("PeerConnection channel closed") - } - - err := pc.SetRemoteDescription(offer) - if err != nil { - return nil, err - } - - go func() { - answer, err := pc.CreateAnswer() - if err != nil { - // signal error upwards - fmt.Println(err) - return - } - err = pc.SetLocalDescription(answer) - if err != nil { - // signal error upwards - fmt.Println(err) - return - } - }() - - select { - case conn := <-ln.connChan: - return conn, nil - case err := <-ln.errChan: - return nil, err - } - - return &webRTCConn{pc: pc, dc: dc, recvPipe: nil}, nil -} - -func (ln *webRTCListener) Close() error { - // Stop the PeerConnection factory goroutine. - close(ln.stopChan) - return nil -} - -func (ln *webRTCListener) Addr() net.Addr { - return &net.TCPAddr{IP: net.IPv4zero, Port: 1} -} - -func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) { - pc, err := webrtc.NewPeerConnection(config) - if err != nil { - log.Printf("NewPeerConnection: %s", err) - return nil, err - } - - pc.OnNegotiationNeeded = func() { - log.Println("OnNegotiationNeeded") - panic("OnNegotiationNeeded") - } - pc.OnIceCandidate = func(candidate webrtc.IceCandidate) { - log.Printf("OnIceCandidate %s", candidate.Serialize()) - // Allow candidates to accumulate until OnIceComplete. - } - pc.OnIceComplete = func() { - log.Printf("OnIceComplete") - } - pc.OnDataChannel = func(channel *data.Channel) { - log.Println("OnDataChannel") - panic("OnDataChannel") - } - - return pc, nil -} - -func listenWebRTC(config *webrtc.Configuration) (*webRTCListener, error) { - ln := new(webRTCListener) - ln.peerConnectionChan = make(chan *webrtc.PeerConnection) - ln.stopChan = make(chan struct{}) - - // This goroutine builds new PeerConnections that await incoming offers. - go func() { - loop: - for { - select { - case <-ln.stopChan: - break loop - default: - pc, err := makePeerConnection(config) - if err != nil { - log.Printf("makePeerConnection: %s", err) - break - } - ln.peerConnectionChan <- pc - } - } - close(ln.peerConnectionChan) - }() - - return ln, nil -} - -func handler(conn net.Conn) error { - defer conn.Close() - +func datachannelHandler(conn *webRTCConn) { handlerChan <- 1 defer func() { handlerChan <- -1 }() - or, err := pt.DialOr(&ptInfo, conn.RemoteAddr().String(), "webrtc") + or, err := pt.DialOr(&ptInfo, "", ptMethodName) // TODO: Extended OR if err != nil { - return err + log.Printf("Failed to connect to ORPort: " + err.Error()) + return } defer or.Close() - copyLoop(conn, or) + pr, pw := io.Pipe() + conn.pr = pr - return nil -} - -func acceptLoop(ln net.Listener) error { - defer ln.Close() - for { - conn, err := ln.Accept() - if err != nil { - if e, ok := err.(net.Error); ok && e.Temporary() { - continue - } - return err - } - go handler(conn) + dc := conn.dc + dc.OnClose = func() { + pw.Close() } + dc.OnMessage = func(msg []byte) { + n, err := pw.Write(msg) + if err != nil { + pw.CloseWithError(err) + } + if n != len(msg) { + panic("short write") + } + } + + copyLoop(conn, or) } -func readSignalingMessages(f *os.File) { +func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) { + pc, err := webrtc.NewPeerConnection(config) + + if err != nil { + log.Printf("NewPeerConnection: %s", err) + return nil, err + } + pc.OnNegotiationNeeded = func() { + panic("OnNegotiationNeeded") + } + pc.OnDataChannel = func(dc *data.Channel) { + log.Println("OnDataChannel") + datachannelHandler(&webRTCConn{pc: pc, dc: dc}) + } + return pc, nil +} + +func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f *os.File) { s := bufio.NewScanner(f) for s.Scan() { msg := s.Text() @@ -237,16 +139,64 @@ func readSignalingMessages(f *os.File) { } signalChan <- sdp } - close(signalChan) if err := s.Err(); err != nil { log.Printf("signal FIFO: %s", err) } } +func generateAnswer(pc *webrtc.PeerConnection) { + fmt.Println("Generating answer...") + answer, err := pc.CreateAnswer() // blocking + if err != nil { + fmt.Println(err) + return + } + pc.SetLocalDescription(answer) +} + +func listenWebRTC(config *webrtc.Configuration, signal string) (*os.File, error) { + err := syscall.Mkfifo(signal, 0600) + if err != nil { + if err.(syscall.Errno) != syscall.EEXIST { + return nil, err + } + } + signalFile, err := os.OpenFile(signal, os.O_RDONLY, 0600) + if err != nil { + return nil, err + } + defer signalFile.Close() + + var signalChan = make(chan *webrtc.SessionDescription) + + go func() { + for { + select { + case sdp := <- signalChan: + pc, err := makePeerConnection(config) + if err != nil { + log.Printf("makePeerConnection: %s", err) + break + } + err = pc.SetRemoteDescription(sdp) + if err != nil { + fmt.Println("ERROR", err) + break + } + fmt.Println("sdp offer successfully received.") + go generateAnswer(pc) + } + } + }() + + go readSignalingMessages(signalChan, signalFile) + return signalFile, nil +} + func main() { var err error - logFile, err = os.OpenFile("webrtc-server.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + logFile, err = os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) if err != nil { log.Fatal(err) } @@ -254,7 +204,6 @@ func main() { log.SetOutput(logFile) log.Println("starting") - webrtc.SetLoggingVerbosity(1) ptInfo, err = pt.ServerSetup(nil) @@ -264,18 +213,16 @@ func main() { webRTCConfig := webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302")) - listeners := make([]net.Listener, 0) + listeners := make([]*os.File, 0) for _, bindaddr := range ptInfo.Bindaddrs { switch bindaddr.MethodName { - case "webrtc": - // Ignore bindaddr.Addr. - ln, err := listenWebRTC(webRTCConfig) + case ptMethodName: + ln, err := listenWebRTC(webRTCConfig, "signal") // meh if err != nil { pt.SmethodError(bindaddr.MethodName, err.Error()) break } - go acceptLoop(ln) - pt.Smethod(bindaddr.MethodName, ln.Addr()) + pt.Smethod(bindaddr.MethodName, nil) listeners = append(listeners, ln) default: pt.SmethodError(bindaddr.MethodName, "no such method") diff --git a/server/torrc b/server/torrc index 4699453..44b5964 100644 --- a/server/torrc +++ b/server/torrc @@ -3,5 +3,6 @@ ORPort 9001 ExtORPort auto SocksPort 0 ExitPolicy reject *:* +DataDirectory datadir -ServerTransportPlugin snowflake exec ./snowflake +ServerTransportPlugin snowflake exec ./server