mirror of
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake.git
synced 2025-10-13 20:11:19 -04:00
prepare snowflake client for buffered datachannel writes, separate out dialWebRTC (#12)
This commit is contained in:
parent
661286894a
commit
760dee8a0f
1 changed files with 114 additions and 80 deletions
|
@ -28,7 +28,7 @@ var frontDomain string
|
|||
// ends, -1 is written.
|
||||
var handlerChan = make(chan int)
|
||||
|
||||
var signalChan = make(chan *webrtc.SessionDescription)
|
||||
var answerChannel = make(chan *webrtc.SessionDescription)
|
||||
|
||||
func copyLoop(a, b net.Conn) {
|
||||
var wg sync.WaitGroup
|
||||
|
@ -46,10 +46,16 @@ func copyLoop(a, b net.Conn) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
// Implements net.Conn interface
|
||||
type webRTCConn struct {
|
||||
pc *webrtc.PeerConnection
|
||||
dc *webrtc.DataChannel
|
||||
broker *BrokerChannel
|
||||
recvPipe *io.PipeReader
|
||||
writePipe *io.PipeWriter
|
||||
offerChannel chan *webrtc.SessionDescription
|
||||
errorChannel chan error
|
||||
openChannel chan struct{}
|
||||
}
|
||||
|
||||
var webrtcRemote *webRTCConn
|
||||
|
@ -61,6 +67,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
|
|||
func (c *webRTCConn) Write(b []byte) (int, error) {
|
||||
// log.Printf("webrtc Write %d %+q", len(b), string(b))
|
||||
log.Printf("Write %d bytes --> WebRTC", len(b))
|
||||
// Buffer in case datachannel isn't available.
|
||||
c.dc.Send(b)
|
||||
return len(b), nil
|
||||
}
|
||||
|
@ -90,18 +97,87 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
|
|||
return fmt.Errorf("SetWriteDeadline not implemented")
|
||||
}
|
||||
|
||||
// Create a WebRTC DataChannel locally.
|
||||
// This triggers "OnNegotiationNeeded" which should prepare an SDP offer.
|
||||
func (c *webRTCConn) EstablishDataChannel() error {
|
||||
dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
|
||||
if err != nil {
|
||||
log.Printf("CreateDataChannel: %s", err)
|
||||
return err
|
||||
}
|
||||
dc.OnOpen = func() {
|
||||
log.Println("OnOpen channel")
|
||||
c.openChannel <- struct{}{}
|
||||
}
|
||||
dc.OnClose = func() {
|
||||
log.Println("OnClose channel")
|
||||
// writePipe.Close()
|
||||
close(c.openChannel)
|
||||
// TODO: (Issue #12) Should attempt to renegotiate at this point.
|
||||
}
|
||||
dc.OnMessage = func(msg []byte) {
|
||||
log.Printf("OnMessage <--- %d bytes", len(msg))
|
||||
n, err := c.writePipe.Write(msg)
|
||||
if err != nil {
|
||||
// TODO: Maybe shouldn't actually close.
|
||||
c.writePipe.CloseWithError(err)
|
||||
}
|
||||
if n != len(msg) {
|
||||
panic("short write")
|
||||
}
|
||||
}
|
||||
c.dc = dc
|
||||
return nil
|
||||
}
|
||||
|
||||
// Block until an offer is available, then send it to either
|
||||
// the Broker or signal pipe.
|
||||
func (c *webRTCConn) sendOffer() error {
|
||||
select {
|
||||
case offer := <-c.offerChannel:
|
||||
if "" == brokerURL {
|
||||
log.Printf("Please Copy & Paste the following to the peer:")
|
||||
log.Printf("----------------")
|
||||
fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n")
|
||||
log.Printf("----------------")
|
||||
return nil
|
||||
}
|
||||
// Use Broker...
|
||||
go func() {
|
||||
log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL,
|
||||
"\nFront URL: ", frontDomain)
|
||||
answer, err := c.broker.Negotiate(c.pc.LocalDescription())
|
||||
if nil != err {
|
||||
log.Printf("BrokerChannel signaling error: %s", err)
|
||||
return
|
||||
}
|
||||
if nil == answer {
|
||||
log.Printf("BrokerChannel: No answer received.")
|
||||
return
|
||||
// return errors.New("No answer received.")
|
||||
}
|
||||
answerChannel <- answer
|
||||
}()
|
||||
case err := <-c.errorChannel:
|
||||
c.pc.Close()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
|
||||
*webRTCConn, error) {
|
||||
|
||||
offerChan := make(chan *webrtc.SessionDescription)
|
||||
errChan := make(chan error)
|
||||
openChan := make(chan struct{})
|
||||
|
||||
pc, err := webrtc.NewPeerConnection(config)
|
||||
if err != nil {
|
||||
log.Printf("NewPeerConnection: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
connection := new(webRTCConn)
|
||||
connection.broker = broker
|
||||
connection.pc = pc
|
||||
connection.offerChannel = make(chan *webrtc.SessionDescription)
|
||||
connection.errorChannel = make(chan error)
|
||||
connection.openChannel = make(chan struct{})
|
||||
|
||||
// Triggered by CreateDataChannel.
|
||||
pc.OnNegotiationNeeded = func() {
|
||||
|
@ -109,12 +185,12 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
|
|||
go func() {
|
||||
offer, err := pc.CreateOffer()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
connection.errorChannel <- err
|
||||
return
|
||||
}
|
||||
err = pc.SetLocalDescription(offer)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
connection.errorChannel <- err
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
@ -126,7 +202,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
|
|||
// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
|
||||
pc.OnIceComplete = func() {
|
||||
log.Printf("OnIceComplete")
|
||||
offerChan <- pc.LocalDescription()
|
||||
connection.offerChannel <- pc.LocalDescription()
|
||||
}
|
||||
// This callback is not expected, as the Client initiates the creation
|
||||
// of the data channel, not the remote peer.
|
||||
|
@ -135,62 +211,14 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
|
|||
panic("OnDataChannel")
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
// Pipes remain the same even when DataChannel gets switched.
|
||||
connection.recvPipe, connection.writePipe = io.Pipe()
|
||||
|
||||
dc, err := pc.CreateDataChannel("test", webrtc.Init{})
|
||||
if err != nil {
|
||||
log.Printf("CreateDataChannel: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
dc.OnOpen = func() {
|
||||
log.Println("OnOpen channel")
|
||||
openChan <- struct{}{}
|
||||
}
|
||||
dc.OnClose = func() {
|
||||
log.Println("OnClose channel")
|
||||
pw.Close()
|
||||
close(openChan)
|
||||
// TODO: (Issue #12) Should attempt to renegotiate at this point.
|
||||
}
|
||||
dc.OnMessage = func(msg []byte) {
|
||||
log.Printf("OnMessage <--- %d bytes", len(msg))
|
||||
n, err := pw.Write(msg)
|
||||
if err != nil {
|
||||
pw.CloseWithError(err)
|
||||
}
|
||||
if n != len(msg) {
|
||||
panic("short write")
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
pc.Close()
|
||||
return nil, err
|
||||
case offer := <-offerChan:
|
||||
log.Printf("----------------")
|
||||
fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n")
|
||||
log.Printf("----------------")
|
||||
go func() {
|
||||
if "" != brokerURL {
|
||||
log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL,
|
||||
"\nFront URL: ", frontDomain)
|
||||
answer, err := broker.Negotiate(pc.LocalDescription())
|
||||
if nil != err {
|
||||
log.Printf("BrokerChannel signaling error: %s", err)
|
||||
}
|
||||
if nil == answer {
|
||||
log.Printf("BrokerChannel: No answer received.")
|
||||
} else {
|
||||
signalChan <- answer
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
log.Printf("waiting for answer")
|
||||
answer, ok := <-signalChan
|
||||
connection.EstablishDataChannel()
|
||||
connection.sendOffer()
|
||||
|
||||
log.Printf("waiting for answer...")
|
||||
answer, ok := <-answerChannel
|
||||
if !ok {
|
||||
pc.Close()
|
||||
return nil, fmt.Errorf("no answer received")
|
||||
|
@ -205,13 +233,13 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
|
|||
// Wait until data channel is open; otherwise for example sends may get
|
||||
// lost.
|
||||
// TODO: Buffering *should* work though.
|
||||
_, ok = <-openChan
|
||||
_, ok = <-connection.openChannel
|
||||
if !ok {
|
||||
pc.Close()
|
||||
return nil, fmt.Errorf("failed to open data channel")
|
||||
}
|
||||
|
||||
return &webRTCConn{pc: pc, dc: dc, recvPipe: pr}, nil
|
||||
return connection, nil
|
||||
}
|
||||
|
||||
func endWebRTC() {
|
||||
|
@ -229,6 +257,7 @@ func endWebRTC() {
|
|||
}
|
||||
}
|
||||
|
||||
// Establish a WebRTC channel for SOCKS connections.
|
||||
func handler(conn *pt.SocksConn) error {
|
||||
handlerChan <- 1
|
||||
defer func() {
|
||||
|
@ -259,7 +288,6 @@ func handler(conn *pt.SocksConn) error {
|
|||
}
|
||||
|
||||
copyLoop(conn, remote)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -293,10 +321,10 @@ func readSignalingMessages(f *os.File) {
|
|||
log.Printf("ignoring invalid signal message %+q", msg)
|
||||
continue
|
||||
}
|
||||
signalChan <- sdp
|
||||
answerChannel <- sdp
|
||||
}
|
||||
log.Printf("close signalChan")
|
||||
close(signalChan)
|
||||
log.Printf("close answerChannel")
|
||||
close(answerChannel)
|
||||
if err := s.Err(); err != nil {
|
||||
log.Printf("signal FIFO: %s", err)
|
||||
}
|
||||
|
@ -308,19 +336,25 @@ func main() {
|
|||
flag.StringVar(&brokerURL, "url", "", "URL of signaling broker")
|
||||
flag.StringVar(&frontDomain, "front", "", "front domain")
|
||||
flag.Parse()
|
||||
|
||||
logFile, err = os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer logFile.Close()
|
||||
log.SetOutput(logFile)
|
||||
log.Println("starting")
|
||||
log.Println("\nStarting Snowflake Client...")
|
||||
|
||||
if "" == brokerURL {
|
||||
// Expect user to copy-paste if
|
||||
// TODO: Maybe just get rid of copy-paste entirely.
|
||||
if "" != brokerURL {
|
||||
log.Println("Rendezvous using Broker at: ", brokerURL)
|
||||
if "" != frontDomain {
|
||||
log.Println("Domain fronting using:", frontDomain)
|
||||
}
|
||||
} else {
|
||||
log.Println("No HTTP signaling detected. Waiting for a \"signal\" pipe...")
|
||||
// This FIFO receives signaling messages.
|
||||
err = syscall.Mkfifo("signal", 0600)
|
||||
err := syscall.Mkfifo("signal", 0600)
|
||||
if err != nil {
|
||||
if err.(syscall.Errno) != syscall.EEXIST {
|
||||
log.Fatal(err)
|
||||
|
@ -363,6 +397,7 @@ func main() {
|
|||
}
|
||||
}
|
||||
pt.CmethodsDone()
|
||||
defer endWebRTC()
|
||||
|
||||
var numHandlers int = 0
|
||||
var sig os.Signal
|
||||
|
@ -382,10 +417,9 @@ func main() {
|
|||
ln.Close()
|
||||
}
|
||||
|
||||
if syscall.SIGTERM == sig || syscall.SIGINT == sig {
|
||||
endWebRTC()
|
||||
return
|
||||
}
|
||||
// if syscall.SIGTERM == sig || syscall.SIGINT == sig {
|
||||
// return
|
||||
// }
|
||||
|
||||
// wait for second signal or no more handlers
|
||||
sig = nil
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue