broker tracking a heap of snowflakes

This commit is contained in:
Serene Han 2016-01-21 10:44:14 -08:00
parent 28e557fb43
commit 0cd6852ad0
5 changed files with 183 additions and 20 deletions

View file

@ -1,13 +1,13 @@
package snowflake_broker package snowflake_broker
import ( import (
// "io" "container/heap"
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
"net/http" "net/http"
"path" "time"
// "appengine" // "appengine"
// "appengine/urlfetch" // "appengine/urlfetch"
) )
@ -15,7 +15,64 @@ import (
// This is an intermediate step - a basic hardcoded appengine rendezvous // This is an intermediate step - a basic hardcoded appengine rendezvous
// to a single browser snowflake. // to a single browser snowflake.
var snowflakeProxy = "" // This is minimum viable client-proxy registration.
// TODO: better, more secure registration corresponding to what's in
// the python flashproxy facilitator.
// Slice of available snowflake proxies.
// var snowflakes []chan []byte
type Snowflake struct {
id string
sigChannel chan []byte
clients int
index int
}
// Implements heap.Interface, and holds Snowflakes.
type SnowflakeHeap []*Snowflake
func (sh SnowflakeHeap) Len() int { return len(sh) }
func (sh SnowflakeHeap) Less(i, j int) bool {
// Snowflakes serving less clients should sort earlier.
return sh[i].clients < sh[j].clients
}
func (sh SnowflakeHeap) Swap(i, j int) {
sh[i], sh[j] = sh[j], sh[i]
sh[i].index = i
sh[j].index = j
}
func (sh *SnowflakeHeap) Push(s interface{}) {
n := len(*sh)
snowflake := s.(*Snowflake)
snowflake.index = n
*sh = append(*sh, snowflake)
}
// Only valid when Len() > 0.
func (sh *SnowflakeHeap) Pop() interface{} {
flakes := *sh
n := len(flakes)
snowflake := flakes[n-1]
snowflake.index = -1
*sh = flakes[0 : n-1]
return snowflake
}
var snowflakes *SnowflakeHeap
// Create and add a Snowflake to the heap.
func AddSnowflake(id string) *Snowflake {
snowflake := new(Snowflake)
snowflake.id = id
snowflake.clients = 0
snowflake.sigChannel = make(chan []byte)
heap.Push(snowflakes, snowflake)
return snowflake
}
func robotsTxtHandler(w http.ResponseWriter, r *http.Request) { func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("Content-Type", "text/plain; charset=utf-8")
@ -36,29 +93,76 @@ Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client. the HTTP response back to the client.
*/ */
func regHandler(w http.ResponseWriter, r *http.Request) { func clientHandler(w http.ResponseWriter, r *http.Request) {
// TODO: Maybe don't pass anything on path, since it will always be bidirectional offer, err := ioutil.ReadAll(r.Body)
dir, _ := path.Split(path.Clean(r.URL.Path))
if dir != "/reg/" {
http.NotFound(w, r)
return
}
body, err := ioutil.ReadAll(r.Body)
if nil != err { if nil != err {
return
log.Println("Invalid data.") log.Println("Invalid data.")
return
} }
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
// Pop the most available snowflake proxy, and pass the offer to it.
// TODO: Make this much better.
snowflake := heap.Pop(snowflakes).(*Snowflake)
if nil == snowflake {
w.Write([]byte("no snowflake proxies available"))
// w.WriteHeader(http.StatusServiceUnavailable)
return
}
// snowflakes = snowflakes[1:]
snowflake.sigChannel <- offer
w.Write([]byte("sent offer to proxy!"))
// TODO: Get browser snowflake to talkto this appengine instance // TODO: Get browser snowflake to talkto this appengine instance
// so it can reply with an answer, and not just the offer again :) // so it can reply with an answer, and not just the offer again :)
// TODO: Real broker which matches clients and snowflake proxies. // TODO: Real broker which matches clients and snowflake proxies.
w.Write(offer)
}
/*
A snowflake browser proxy requests a client from the Broker.
*/
func proxyHandler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if nil != err {
log.Println("Invalid data.")
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
snowflakeSession := body
log.Println("Received snowflake: ", snowflakeSession)
snowflake := AddSnowflake(string(snowflakeSession))
select {
case offer := <-snowflake.sigChannel:
log.Println("Passing client offer to snowflake.")
w.Write(offer)
case <-time.After(time.Second * 10):
s := fmt.Sprintf("%d snowflakes left.", snowflakes.Len())
w.Write([]byte("timed out. " + s))
heap.Remove(snowflakes, snowflake.index)
// w.WriteHeader(http.StatusRequestTimeout)
}
}
func reflectHandler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if nil != err {
log.Println("Invalid data.")
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Write(body) w.Write(body)
} }
func init() { func init() {
// snowflakes = make([]chan []byte, 0)
snowflakes = new(SnowflakeHeap)
heap.Init(snowflakes)
http.HandleFunc("/robots.txt", robotsTxtHandler) http.HandleFunc("/robots.txt", robotsTxtHandler)
http.HandleFunc("/ip", ipHandler) http.HandleFunc("/ip", ipHandler)
http.HandleFunc("/reg/", regHandler)
http.HandleFunc("/client", clientHandler)
http.HandleFunc("/proxy", proxyHandler)
http.HandleFunc("/reflect", reflectHandler)
// if SNOWFLAKE_BROKER == "" { // if SNOWFLAKE_BROKER == "" {
// panic("SNOWFLAKE_BROKER empty; did you forget to edit config.go?") // panic("SNOWFLAKE_BROKER empty; did you forget to edit config.go?")
// } // }

View file

@ -0,0 +1,59 @@
package snowflake_broker
import (
"container/heap"
"testing"
)
func TestSnowflakeHeap(t *testing.T) {
h := new(SnowflakeHeap)
heap.Init(h)
if 0 != h.Len() {
t.Error("Unexpected length.")
}
s1 := new(Snowflake)
s2 := new(Snowflake)
s3 := new(Snowflake)
s4 := new(Snowflake)
s1.clients = 4
s2.clients = 5
s3.clients = 3
s4.clients = 1
heap.Push(h, s1)
heap.Push(h, s2)
heap.Push(h, s3)
heap.Push(h, s4)
if 4 != h.Len() {
t.Error("Unexpected length.")
}
heap.Remove(h, 0)
if 3 != h.Len() {
t.Error("Unexpected length.")
}
r := heap.Pop(h).(*Snowflake)
if r.clients != 3 {
t.Error("Unexpected clients: ", r.clients)
}
if r.index != -1 {
t.Error("Unexpected index: ", r.index)
}
r = heap.Pop(h).(*Snowflake)
if r.clients != 4 {
t.Error("Unexpected clients: ", r.clients)
}
r = heap.Pop(h).(*Snowflake)
if r.clients != 5 {
t.Error("Unexpected clients: ", r.clients)
}
if 0 != h.Len() {
t.Error("Unexpected length.")
}
}

View file

@ -37,7 +37,7 @@ func NewMeekChannel(broker string, front string) *MeekChannel {
mc.Method = "POST" mc.Method = "POST"
mc.trueURL = targetUrl mc.trueURL = targetUrl
mc.externalUrl = front + "/reg/test" // TODO: Have a better suffix. mc.externalUrl = front + "/client"
// We make a copy of DefaultTransport because we want the default Dial // We make a copy of DefaultTransport because we want the default Dial
// and TLSHandshakeTimeout settings. But we want to disable the default // and TLSHandshakeTimeout settings. But we want to disable the default
@ -70,6 +70,7 @@ func (mc *MeekChannel) Negotiate(offer *webrtc.SessionDescription) (
if nil != err { if nil != err {
return nil, err return nil, err
} }
log.Println("Body: ", string(body))
answer := webrtc.DeserializeSessionDescription(string(body)) answer := webrtc.DeserializeSessionDescription(string(body))
return answer, nil return answer, nil
} }

View file

@ -21,7 +21,6 @@ class Broker
xhr = new XMLHttpRequest() xhr = new XMLHttpRequest()
try try
xhr.open 'POST', @url xhr.open 'POST', @url
xhr
catch err catch err
### ###
An exception happens here when, for example, NoScript allows the domain on An exception happens here when, for example, NoScript allows the domain on
@ -35,14 +34,14 @@ class Broker
# xhr.responseType = 'text' # xhr.responseType = 'text'
xhr.onreadystatechange = -> xhr.onreadystatechange = ->
if xhr.DONE == xhr.readyState if xhr.DONE == xhr.readyState
log 'Broker: ' + xhr.status
if 200 == xhr.status if 200 == xhr.status
log 'Broker: success'
log 'Response: ' + xhr.responseText log 'Response: ' + xhr.responseText
# @fac_complete xhr.responseText log xhr
else else
log 'Broker error ' + xhr.status + ' - ' + xhr.statusText log 'Broker error ' + xhr.status + ' - ' + xhr.statusText
xhr.send 'snowflake-testing' xhr.send 'snowflake-testing'
log "Broker: sent a registration message, waiting for reply..."
sendAnswer: (answer) -> sendAnswer: (answer) ->
log 'Sending answer to broker.' log 'Sending answer to broker.'

View file

@ -8,7 +8,7 @@ Assume that the webrtc client plugin is always the offerer, in which case
this must always act as the answerer. this must always act as the answerer.
### ###
DEFAULT_WEBSOCKET = '192.81.135.242:9901' DEFAULT_WEBSOCKET = '192.81.135.242:9901'
DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/reg/test' DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/proxy'
DEFAULT_PORTS = DEFAULT_PORTS =
http: 80 http: 80
https: 443 https: 443