Refactor proxy to reuse signaling code

Simplify proxy interactions with the broker signaling server and prepare
for the introduction of an additional signaling server.
This commit is contained in:
Cecylia Bocovich 2020-10-13 11:06:14 -04:00
parent 912bcae24e
commit 7a0428e3b1
2 changed files with 61 additions and 58 deletions

View file

@ -337,7 +337,7 @@ func TestBrokerInteractions(t *testing.T) {
const sampleAnswer = `{"type":"answer","sdp":` + sampleSDP + `}`
Convey("Proxy connections to broker", t, func() {
broker := new(Broker)
broker := new(SignalingServer)
broker.url, _ = url.Parse("localhost")
//Mock peerConnection
@ -417,7 +417,8 @@ func TestBrokerInteractions(t *testing.T) {
}
err = broker.sendAnswer("test", pc)
So(err, ShouldNotEqual, nil)
So(err.Error(), ShouldResemble, "broker returned 410")
So(err.Error(), ShouldResemble,
"error sending answer to broker: remote returned status code 410")
//Error if we can't parse broker message
broker.transport = &MockTransport{

View file

@ -44,7 +44,7 @@ const dataChannelTimeout = 20 * time.Second
const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
var broker *Broker
var broker *SignalingServer
var relayURL string
var currentNATType = NATUnknown
@ -110,12 +110,6 @@ func remoteIPFromSDP(str string) net.IP {
return nil
}
type Broker struct {
url *url.URL
transport http.RoundTripper
keepLocalAddresses bool
}
type webRTCConn struct {
dc *webrtc.DataChannel
pc *webrtc.PeerConnection
@ -200,8 +194,33 @@ func limitedRead(r io.Reader, limit int64) ([]byte, error) {
return p, err
}
func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
brokerPath := b.url.ResolveReference(&url.URL{Path: "proxy"})
type SignalingServer struct {
url *url.URL
transport http.RoundTripper
keepLocalAddresses bool
}
func (s *SignalingServer) Post(path string, payload io.Reader) ([]byte, error) {
req, err := http.NewRequest("POST", path, payload)
if err != nil {
return nil, err
}
resp, err := s.transport.RoundTrip(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("remote returned status code %d", resp.StatusCode)
}
defer resp.Body.Close()
return limitedRead(resp.Body, readLimit)
}
func (s *SignalingServer) pollOffer(sid string) *webrtc.SessionDescription {
brokerPath := s.url.ResolveReference(&url.URL{Path: "proxy"})
timeOfNextPoll := time.Now()
for {
// Sleep until we're scheduled to poll again.
@ -220,24 +239,15 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
log.Printf("Error encoding poll message: %s", err.Error())
return nil
}
req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
resp, err := b.transport.RoundTrip(req)
resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body))
if err != nil {
log.Printf("error polling broker: %s", err)
} else {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("broker returns: %d", resp.StatusCode)
} else {
body, err := limitedRead(resp.Body, readLimit)
if err != nil {
log.Printf("error reading broker response: %s", err)
} else {
log.Printf("error polling broker: %s", err.Error())
}
offer, _, err := messages.DecodePollResponse(body)
offer, _, err := messages.DecodePollResponse(resp)
if err != nil {
log.Printf("error reading broker response: %s", err.Error())
log.Printf("body: %s", body)
log.Printf("Error reading broker response: %s", err.Error())
log.Printf("body: %s", resp)
return nil
}
if offer != "" {
@ -251,14 +261,11 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
}
}
}
}
}
}
func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
brokerPath := b.url.ResolveReference(&url.URL{Path: "answer"})
func (s *SignalingServer) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
brokerPath := s.url.ResolveReference(&url.URL{Path: "answer"})
ld := pc.LocalDescription()
if !b.keepLocalAddresses {
if !s.keepLocalAddresses {
ld = &webrtc.SessionDescription{
Type: ld.Type,
SDP: util.StripLocalAddresses(ld.SDP),
@ -272,20 +279,12 @@ func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
if err != nil {
return err
}
req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
resp, err := b.transport.RoundTrip(req)
resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body))
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("broker returned %d", resp.StatusCode)
return fmt.Errorf("error sending answer to broker: %s", err.Error())
}
body, err = limitedRead(resp.Body, readLimit)
if err != nil {
return fmt.Errorf("error reading broker response: %s", err)
}
success, err := messages.DecodeAnswerResponse(body)
success, err := messages.DecodeAnswerResponse(resp)
if err != nil {
return err
}
@ -327,7 +326,6 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
log.Fatalf("invalid relay url: %s", err)
}
// Retrieve client IP address
if remoteAddr != nil {
// Encode client IP address in relay URL
q := u.Query()
@ -354,7 +352,11 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
// candidates is complete and the answer is available in LocalDescription.
// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
// datachannelHandler.
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
config webrtc.Configuration,
dataChan chan struct{},
handler func(conn *webRTCConn, remoteAddr net.Addr)) (*webrtc.PeerConnection, error) {
pc, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
@ -390,7 +392,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.C
}
})
go datachannelHandler(conn, conn.RemoteAddr())
go handler(conn, conn.RemoteAddr())
})
err = pc.SetRemoteDescription(*sdp)
@ -433,7 +435,7 @@ func runSession(sid string) {
return
}
dataChan := make(chan struct{})
pc, err := makePeerConnectionFromOffer(offer, config, dataChan)
pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler)
if err != nil {
log.Printf("error making WebRTC connection: %s", err)
retToken()
@ -500,7 +502,7 @@ func main() {
log.Println("starting")
var err error
broker = new(Broker)
broker = new(SignalingServer)
broker.keepLocalAddresses = keepLocalAddresses
broker.url, err = url.Parse(rawBrokerURL)
if err != nil {