Add context to HTTP handlers, attempt to support localhost Broker.

Seems unlikely to work due to dev_appserver single inflight request limitation
This commit is contained in:
Serene Han 2016-02-12 15:38:28 -08:00
parent 4acff9983c
commit b04d1f67fb
4 changed files with 110 additions and 35 deletions

View file

@ -11,6 +11,7 @@ package snowflake_broker
import ( import (
"container/heap" "container/heap"
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
@ -23,27 +24,68 @@ const (
ProxyTimeout = 10 ProxyTimeout = 10
) )
// This is minimum viable client-proxy registration. type SnowflakeContext struct {
// TODO(#13): better, more secure registration corresponding to what's in snowflakes *SnowflakeHeap
// the python flashproxy facilitator. // Map keeping track of snowflakeIDs required to match SDP answers from
var snowflakes *SnowflakeHeap // the second http POST.
snowflakeMap map[string]*Snowflake
}
// Map keeping track of snowflakeIDs required to match SDP answers from type SnowflakeHandler struct {
// the second http POST. *SnowflakeContext
var snowflakeMap map[string]*Snowflake h func(*SnowflakeContext, http.ResponseWriter, *http.Request)
}
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sh.h(sh.SnowflakeContext, w, r)
}
type ProxyRequest struct {
id string
offerChan chan []byte
}
var createChan = make(chan *ProxyRequest)
// Create and add a Snowflake to the heap. // Create and add a Snowflake to the heap.
func AddSnowflake(id string) *Snowflake { func (sc *SnowflakeContext) AddSnowflake(id string) *Snowflake {
log.Println(sc.snowflakes)
snowflake := new(Snowflake) snowflake := new(Snowflake)
snowflake.id = id snowflake.id = id
snowflake.clients = 0 snowflake.clients = 0
snowflake.offerChannel = make(chan []byte) snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte)
heap.Push(snowflakes, snowflake) heap.Push(sc.snowflakes, snowflake)
snowflakeMap[id] = snowflake sc.snowflakeMap[id] = snowflake
log.Println("Total snowflakes available: ", sc.snowflakes.Len())
log.Println(sc.snowflakes)
log.Println(sc.snowflakeMap)
return snowflake return snowflake
} }
func (sc *SnowflakeContext) Broker(proxies <-chan *ProxyRequest) {
for p := range proxies {
log.Println("adding ", p.id)
snowflake := sc.AddSnowflake(p.id)
// Wait for a client to avail an offer to the snowflake, or timeout
// and ask the snowflake to poll later.
go func(p *ProxyRequest) {
select {
case offer := <-snowflake.offerChannel:
log.Println("Passing client offer to snowflake.")
p.offerChan <- offer
case <-time.After(time.Second * ProxyTimeout):
// This snowflake is no longer available to serve clients.
heap.Remove(sc.snowflakes, snowflake.index)
delete(sc.snowflakeMap, snowflake.id)
p.offerChan <- nil
}
}(p)
}
}
func robotsTxtHandler(w http.ResponseWriter, r *http.Request) { func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("User-agent: *\nDisallow:\n")) w.Write([]byte("User-agent: *\nDisallow:\n"))
@ -73,7 +115,7 @@ Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client. the HTTP response back to the client.
*/ */
func clientHandler(w http.ResponseWriter, r *http.Request) { func clientHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) {
offer, err := ioutil.ReadAll(r.Body) offer, err := ioutil.ReadAll(r.Body)
if nil != err { if nil != err {
log.Println("Invalid data.") log.Println("Invalid data.")
@ -85,23 +127,24 @@ func clientHandler(w http.ResponseWriter, r *http.Request) {
// Find the most available snowflake proxy, and pass the offer to it. // Find the most available snowflake proxy, and pass the offer to it.
// TODO: Needs improvement - maybe shouldn' // TODO: Needs improvement - maybe shouldn'
snowflake := heap.Pop(snowflakes).(*Snowflake) if ctx.snowflakes.Len() <= 0 {
if nil == snowflake { log.Println("Client: No snowflake proxies available.")
w.WriteHeader(http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable)
// w.Write([]byte("no snowflake proxies available"))
return return
} }
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
snowflake.offerChannel <- offer snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel. // Wait for the answer to be returned on the channel.
select { select {
case answer := <-snowflake.answerChannel: case answer := <-snowflake.answerChannel:
log.Println("Retrieving answer") log.Println("Client: Retrieving answer")
w.Write(answer) w.Write(answer)
// Only remove from the snowflake map once the answer is set. // Only remove from the snowflake map once the answer is set.
delete(snowflakeMap, snowflake.id) delete(ctx.snowflakeMap, snowflake.id)
case <-time.After(time.Second * ClientTimeout): case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.")
w.WriteHeader(http.StatusGatewayTimeout) w.WriteHeader(http.StatusGatewayTimeout)
w.Write([]byte("timed out waiting for answer!")) w.Write([]byte("timed out waiting for answer!"))
} }
@ -110,7 +153,7 @@ func clientHandler(w http.ResponseWriter, r *http.Request) {
/* /*
For snowflake proxies to request a client from the Broker. For snowflake proxies to request a client from the Broker.
*/ */
func proxyHandler(w http.ResponseWriter, r *http.Request) { func proxyHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) {
if isPreflight(w, r) { if isPreflight(w, r) {
return return
} }
@ -126,21 +169,22 @@ func proxyHandler(w http.ResponseWriter, r *http.Request) {
} }
// Maybe confirm that X-Session-ID is the same. // Maybe confirm that X-Session-ID is the same.
log.Println("Received snowflake: ", id) log.Println("Received snowflake: ", id)
snowflake := AddSnowflake(id)
p := new(ProxyRequest)
p.id = id
p.offerChan = make(chan []byte)
createChan <- p
// Wait for a client to avail an offer to the snowflake, or timeout // Wait for a client to avail an offer to the snowflake, or timeout
// and ask the snowflake to poll later. // and ask the snowflake to poll later.
select { offer := <-p.offerChan
case offer := <-snowflake.offerChannel: if nil == offer {
log.Println("Passing client offer to snowflake.") log.Println("Proxy " + id + " did not receive a Client offer.")
w.Write(offer)
case <-time.After(time.Second * ProxyTimeout):
// This snowflake is no longer available to serve clients.
heap.Remove(snowflakes, snowflake.index)
delete(snowflakeMap, snowflake.id)
w.WriteHeader(http.StatusGatewayTimeout) w.WriteHeader(http.StatusGatewayTimeout)
return
} }
log.Println("Passing client offer to snowflake.")
w.Write(offer)
} }
/* /*
@ -148,12 +192,12 @@ Expects snowflake proxes which have previously successfully received
an offer from proxyHandler to respond with an answer in an HTTP POST, an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client. which the broker will pass back to the original client.
*/ */
func answerHandler(w http.ResponseWriter, r *http.Request) { func answerHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) {
if isPreflight(w, r) { if isPreflight(w, r) {
return return
} }
id := r.Header.Get("X-Session-ID") id := r.Header.Get("X-Session-ID")
snowflake, ok := snowflakeMap[id] snowflake, ok := ctx.snowflakeMap[id]
if !ok || nil == snowflake { if !ok || nil == snowflake {
// The snowflake took too long to respond with an answer, // The snowflake took too long to respond with an answer,
// and the designated client is no longer around / recognized by the Broker. // and the designated client is no longer around / recognized by the Broker.
@ -170,15 +214,30 @@ func answerHandler(w http.ResponseWriter, r *http.Request) {
snowflake.answerChannel <- body snowflake.answerChannel <- body
} }
func debugHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) {
s := fmt.Sprintf("current: %d", ctx.snowflakes.Len())
w.Write([]byte(s))
}
func init() { func init() {
snowflakes = new(SnowflakeHeap) // snowflakeMap = make(map[string]*Snowflake)
snowflakeMap = make(map[string]*Snowflake) snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes) heap.Init(snowflakes)
ctx := &SnowflakeContext{
snowflakes: snowflakes,
snowflakeMap: make(map[string]*Snowflake),
}
go ctx.Broker(createChan)
http.HandleFunc("/robots.txt", robotsTxtHandler) http.HandleFunc("/robots.txt", robotsTxtHandler)
http.HandleFunc("/ip", ipHandler) http.HandleFunc("/ip", ipHandler)
http.HandleFunc("/client", clientHandler) // http.HandleFunc("/client", clientHandler)
http.HandleFunc("/proxy", proxyHandler) // http.HandleFunc("/proxy", proxyHandler)
http.HandleFunc("/answer", answerHandler) // http.HandleFunc("/answer", answerHandler)
http.Handle("/client", SnowflakeHandler{ctx, clientHandler})
http.Handle("/proxy", SnowflakeHandler{ctx, proxyHandler})
http.Handle("/answer", SnowflakeHandler{ctx, answerHandler})
http.Handle("/debug", SnowflakeHandler{ctx, debugHandler})
} }

View file

@ -22,6 +22,8 @@ func TestSnowflakeHeap(t *testing.T) {
s4.clients = 1 s4.clients = 1
heap.Push(h, s1) heap.Push(h, s1)
if 1 != h.Len() {
}
heap.Push(h, s2) heap.Push(h, s2)
heap.Push(h, s3) heap.Push(h, s3)
heap.Push(h, s4) heap.Push(h, s4)
@ -36,6 +38,9 @@ func TestSnowflakeHeap(t *testing.T) {
} }
r := heap.Pop(h).(*Snowflake) r := heap.Pop(h).(*Snowflake)
if 2 != h.Len() {
t.Error("Unexpected length.")
}
if r.clients != 3 { if r.clients != 3 {
t.Error("Unexpected clients: ", r.clients) t.Error("Unexpected clients: ", r.clients)
} }
@ -44,6 +49,9 @@ func TestSnowflakeHeap(t *testing.T) {
} }
r = heap.Pop(h).(*Snowflake) r = heap.Pop(h).(*Snowflake)
if 1 != h.Len() {
t.Error("Unexpected length.")
}
if r.clients != 4 { if r.clients != 4 {
t.Error("Unexpected clients: ", r.clients) t.Error("Unexpected clients: ", r.clients)
} }

7
client/torrc-localhost Normal file
View file

@ -0,0 +1,7 @@
UseBridges 1
DataDirectory datadir
ClientTransportPlugin snowflake exec ./client \
-url http://localhost:8080/ \
Bridge snowflake 0.0.3.0:1

View file

@ -28,7 +28,8 @@ class Broker
@clients = 0 @clients = 0
@id = genSnowflakeID() @id = genSnowflakeID()
# Ensure url has the right protocol + trailing slash. # Ensure url has the right protocol + trailing slash.
@url = 'https://' + @url if 0 != @url.indexOf('https://', 0) @url = 'http://' + @url if 0 == @url.indexOf('localhost', 0)
@url = 'https://' + @url if 0 != @url.indexOf('http', 0)
@url += '/' if '/' != @url.substr -1 @url += '/' if '/' != @url.substr -1
# Promises some client SDP Offer. # Promises some client SDP Offer.