answer successfully roundtripped back from snowflake proxy through broker to client (#1)

This commit is contained in:
Serene Han 2016-01-21 13:02:46 -08:00
parent 7081e6328c
commit c9013b2f80
5 changed files with 127 additions and 70 deletions

View file

@ -1,16 +0,0 @@
/*
This is the server-side code that runs on Google App Engine for the
"appspot" registration method.
See doc/appspot-howto.txt for more details about setting up an
application, and advice on running one.
To upload a new version:
$ torify ~/go_appengine/appcfg.py --no_cookies -A $YOUR_APP_ID update .
*/
package snowflake_broker
// host:port/basepath of the broker you want to register with
// for example, fp-broker.org or example.com:12345/broker
// https:// and /reg/ will be prepended and appended respectively.
const SNOWFLAKE_BROKER = ""

View file

@ -1,3 +1,12 @@
/*
Broker acts as the HTTP signaling channel.
It matches clients and snowflake proxies by passing corresponding
SessionDescriptions in order to negotiate a WebRTC connection.
TODO(serene): This code is currently the absolute minimum required to
cause a successful negotiation.
It's otherwise very unsafe and problematic, and needs quite some work...
*/
package snowflake_broker package snowflake_broker
import ( import (
@ -8,8 +17,6 @@ import (
"net" "net"
"net/http" "net/http"
"time" "time"
// "appengine"
// "appengine/urlfetch"
) )
// This is an intermediate step - a basic hardcoded appengine rendezvous // This is an intermediate step - a basic hardcoded appengine rendezvous
@ -24,7 +31,8 @@ import (
type Snowflake struct { type Snowflake struct {
id string id string
sigChannel chan []byte offerChannel chan []byte
answerChannel chan []byte
clients int clients int
index int index int
} }
@ -63,14 +71,17 @@ func (sh *SnowflakeHeap) Pop() interface{} {
} }
var snowflakes *SnowflakeHeap var snowflakes *SnowflakeHeap
var snowflakeMap map[string]*Snowflake
// Create and add a Snowflake to the heap. // Create and add a Snowflake to the heap.
func AddSnowflake(id string) *Snowflake { func AddSnowflake(id string) *Snowflake {
snowflake := new(Snowflake) snowflake := new(Snowflake)
snowflake.id = id snowflake.id = id
snowflake.clients = 0 snowflake.clients = 0
snowflake.sigChannel = make(chan []byte) snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte)
heap.Push(snowflakes, snowflake) heap.Push(snowflakes, snowflake)
snowflakeMap[id] = snowflake
return snowflake return snowflake
} }
@ -97,65 +108,108 @@ func clientHandler(w http.ResponseWriter, r *http.Request) {
offer, err := ioutil.ReadAll(r.Body) offer, err := ioutil.ReadAll(r.Body)
if nil != err { if nil != err {
log.Println("Invalid data.") log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return return
} }
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
// Pop the most available snowflake proxy, and pass the offer to it. w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
// TODO: Make this much better.
// Find the most available snowflake proxy, and pass the offer to it.
// TODO: Needs improvement.
snowflake := heap.Pop(snowflakes).(*Snowflake) snowflake := heap.Pop(snowflakes).(*Snowflake)
if nil == snowflake { if nil == snowflake {
// w.Header().Set("Status", http.StatusServiceUnavailable) w.Header().Set("Status", http.StatusServiceUnavailable)
w.Write([]byte("no snowflake proxies available")) // w.Write([]byte("no snowflake proxies available"))
return return
} }
// snowflakes = snowflakes[1:] snowflake.offerChannel <- offer
snowflake.sigChannel <- offer
w.Write([]byte("sent offer to proxy!")) // Wait for the answer to be returned on the channel.
// TODO: Get browser snowflake to talkto this appengine instance select {
// so it can reply with an answer, and not just the offer again :) case answer := <-snowflake.answerChannel:
// TODO: Real broker which matches clients and snowflake proxies. log.Println("Retrieving answer")
w.Write(offer) w.Write(answer)
// Only remove from the snowflake map once the answer is set.
delete(snowflakeMap, snowflake.id)
case <-time.After(time.Second * 10):
w.WriteHeader(http.StatusGatewayTimeout)
w.Write([]byte("timed out waiting for answer!"))
}
} }
/* /*
A snowflake browser proxy requests a client from the Broker. For snowflake proxies to request a client from the Broker.
*/ */
func proxyHandler(w http.ResponseWriter, r *http.Request) { func proxyHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
// For CORS preflight.
if "OPTIONS" == r.Method {
return
}
id := r.Header.Get("X-Session-ID")
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
if nil != err { if nil != err {
log.Println("Invalid data.") log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return return
} }
w.Header().Set("Access-Control-Allow-Origin", "*") if string(body) != id { // Mismatched IDs!
snowflakeSession := body w.WriteHeader(http.StatusBadRequest)
log.Println("Received snowflake: ", snowflakeSession) }
snowflake := AddSnowflake(string(snowflakeSession)) // Maybe confirm that X-Session-ID is the same.
log.Println("Received snowflake: ", id)
snowflake := AddSnowflake(id)
// Wait for a client to avail an offer to the snowflake, or timeout
// and ask the snowflake to poll later.
select { select {
case offer := <-snowflake.sigChannel: case offer := <-snowflake.offerChannel:
log.Println("Passing client offer to snowflake.") log.Println("Passing client offer to snowflake.")
w.Write(offer) w.Write(offer)
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
// s := fmt.Sprintf("%d snowflakes left.", snowflakes.Len())
// w.Write([]byte("timed out. " + s))
// w.Header().Set("Status", http.StatusRequestTimeout)
w.WriteHeader(http.StatusGatewayTimeout)
heap.Remove(snowflakes, snowflake.index) heap.Remove(snowflakes, snowflake.index)
w.WriteHeader(http.StatusGatewayTimeout)
} }
} }
func reflectHandler(w http.ResponseWriter, r *http.Request) { /*
Expects snowflake proxes which have previously successfully received
an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client.
*/
func answerHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
// For CORS preflight.
if "OPTIONS" == r.Method {
return
}
id := r.Header.Get("X-Session-ID")
snowflake, ok := snowflakeMap[id]
if !ok || nil == snowflake {
// The snowflake took too long to respond with an answer,
// and the designated client is no longer around / recognized by the Broker.
w.WriteHeader(http.StatusGone)
return
}
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
if nil != err { if nil != err {
log.Println("Invalid data.") log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return return
} }
w.Header().Set("Access-Control-Allow-Origin", "*") log.Println("Received answer: ", body)
w.Write(body) snowflake.answerChannel <- body
} }
func init() { func init() {
// snowflakes = make([]chan []byte, 0)
snowflakes = new(SnowflakeHeap) snowflakes = new(SnowflakeHeap)
snowflakeMap = make(map[string]*Snowflake)
heap.Init(snowflakes) heap.Init(snowflakes)
http.HandleFunc("/robots.txt", robotsTxtHandler) http.HandleFunc("/robots.txt", robotsTxtHandler)
@ -163,8 +217,5 @@ func init() {
http.HandleFunc("/client", clientHandler) http.HandleFunc("/client", clientHandler)
http.HandleFunc("/proxy", proxyHandler) http.HandleFunc("/proxy", proxyHandler)
http.HandleFunc("/reflect", reflectHandler) http.HandleFunc("/answer", answerHandler)
// if SNOWFLAKE_BROKER == "" {
// panic("SNOWFLAKE_BROKER empty; did you forget to edit config.go?")
// }
} }

View file

@ -6,19 +6,25 @@ to get assigned to clients.
### ###
STATUS_OK = 200 STATUS_OK = 200
STATUS_GONE = 410
STATUS_GATEWAY_TIMEOUT = 504 STATUS_GATEWAY_TIMEOUT = 504
genSnowflakeID = ->
Math.random().toString(36).substring(2)
# Represents a broker running remotely. # Represents a broker running remotely.
class Broker class Broker
clients: 0 clients: 0
id: null
# When interacting with the Broker, snowflake must generate a unique session # When interacting with the Broker, snowflake must generate a unique session
# ID so the Broker can keep track of which signalling channel it's speaking # ID so the Broker can keep track of which signalling channel it's speaking
# to. # to.
constructor: (@url) -> constructor: (@url) ->
log 'Using Broker at ' + @url @clients = 0
clients = 0 @id = genSnowflakeID()
log 'Contacting Broker at ' + @url + '\nSnowflake ID: ' + @id
# Snowflake registers with the broker using an HTTP POST request, and expects # Snowflake registers with the broker using an HTTP POST request, and expects
# a response from the broker containing some client offer. # a response from the broker containing some client offer.
@ -27,7 +33,8 @@ class Broker
new Promise (fulfill, reject) => new Promise (fulfill, reject) =>
xhr = new XMLHttpRequest() xhr = new XMLHttpRequest()
try try
xhr.open 'POST', @url xhr.open 'POST', @url + 'proxy'
xhr.setRequestHeader('X-Session-ID', @id)
catch err catch err
### ###
An exception happens here when, for example, NoScript allows the domain An exception happens here when, for example, NoScript allows the domain
@ -47,10 +54,29 @@ class Broker
else else
log 'Broker ERROR: Unexpected ' + xhr.status + log 'Broker ERROR: Unexpected ' + xhr.status +
' - ' + xhr.statusText ' - ' + xhr.statusText
xhr.send @id
xhr.send 'snowflake-testing' log @id + " - polling for client offer..."
log "Broker: polling for client offer..."
sendAnswer: (answer) -> sendAnswer: (answer) ->
log 'Sending answer to broker.' log @id + ' - Sending answer back to broker...\n'
log answer log answer.sdp
xhr = new XMLHttpRequest()
try
xhr.open 'POST', @url + 'answer'
xhr.setRequestHeader('X-Session-ID', @id)
catch err
log 'Broker: exception while connecting: ' + err.message
return
xhr.onreadystatechange = ->
return if xhr.DONE != xhr.readyState
log xhr
switch xhr.status
when STATUS_OK
log 'Broker: Successfully replied with answer.'
log xhr.responseText
when STATUS_GONE
log 'Broker: No longer valid to reply with answer.'
else
log 'Broker ERROR: Unexpected ' + xhr.status +
' - ' + xhr.statusText
xhr.send JSON.stringify(answer)

View file

@ -32,7 +32,10 @@ class ProxyPair
# TODO: Use a promise.all to tell Snowflake about all offers at once, # TODO: Use a promise.all to tell Snowflake about all offers at once,
# once multiple proxypairs are supported. # once multiple proxypairs are supported.
log 'Finished gathering ICE candidates.' log 'Finished gathering ICE candidates.'
if COPY_PASTE_ENABLED
Signalling.send @pc.localDescription Signalling.send @pc.localDescription
else
snowflake.broker.sendAnswer @pc.localDescription
# OnDataChannel triggered remotely from the client when connection succeeds. # OnDataChannel triggered remotely from the client when connection succeeds.
@pc.ondatachannel = (dc) => @pc.ondatachannel = (dc) =>
console.log dc console.log dc

View file

@ -8,7 +8,7 @@ Assume that the webrtc client plugin is always the offerer, in which case
this must always act as the answerer. this must always act as the answerer.
### ###
DEFAULT_WEBSOCKET = '192.81.135.242:9901' DEFAULT_WEBSOCKET = '192.81.135.242:9901'
DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/proxy' DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/'
COPY_PASTE_ENABLED = false COPY_PASTE_ENABLED = false
DEFAULT_PORTS = DEFAULT_PORTS =
http: 80 http: 80
@ -104,8 +104,8 @@ class Snowflake
poll = => poll = =>
recv = broker.getClientOffer() recv = broker.getClientOffer()
recv.then((desc) => recv.then((desc) =>
log 'Received:\n\n' + desc + '\n'
offer = JSON.parse desc offer = JSON.parse desc
log 'Received:\n\n' + offer.sdp + '\n'
@receiveOffer offer @receiveOffer offer
, (err) -> , (err) ->
log err log err
@ -113,13 +113,6 @@ class Snowflake
) )
poll() poll()
# if @proxyPairs.length >= MAX_NUM_CLIENTS * CONNECTIONS_PER_CLIENT
# setTimeout(@proxyMain, @broker_poll_interval * 1000)
# return
# params = [['r', '1']]
# params.push ['transport', 'websocket']
# params.push ['transport', 'webrtc']
# Receive an SDP offer from client plugin. # Receive an SDP offer from client plugin.
receiveOffer: (desc) => receiveOffer: (desc) =>
sdp = new RTCSessionDescription desc sdp = new RTCSessionDescription desc