From 3de5b30287ae4ad9e0e663156cb5442616b40492 Mon Sep 17 00:00:00 2001 From: Serene Han Date: Sun, 10 Jan 2016 13:01:59 -0800 Subject: [PATCH] ProxyPair with initial websocket stuff, nearly there --- proxy/snowflake.coffee | 199 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 189 insertions(+), 10 deletions(-) diff --git a/proxy/snowflake.coffee b/proxy/snowflake.coffee index 94bc5e0..2723773 100644 --- a/proxy/snowflake.coffee +++ b/proxy/snowflake.coffee @@ -6,6 +6,8 @@ Uses WebRTC from the client, and websocket to the server. Assume that the webrtc client plugin is always the offerer, in which case this must always act as the answerer. + +TODO(keroserene): Complete the websocket + webrtc ProxyPair ### Query = @@ -38,6 +40,14 @@ Query = result[name] = value if !(name in result) result + # params is a list of (key, value) 2-tuples. + buildString: (params) -> + parts = [] + for param in params + parts.push encodeURIComponent(param[0]) + '=' + + encodeURIComponent(param[1]) + parts.join '&' + Params = getBool: (query, param, defaultValue) -> val = query[param] @@ -46,6 +56,21 @@ Params = return false if "false" == val || "0" == val return null +# repr = (x) -> + # return 'null' if null == x + # return 'undefined' if 'undefined' == typeof x + # if 'object' == typeof x + # elems = [] + # for k in x + # elems.push(maybe_quote(k) + ': ' + repr(x[k])); + # return "{ " + elems.join(", ") + " }"; + # } else if (typeof x === "string") { + # return quote(x); + # } else { + # return x.toString(); +# safe_repr = (s) -> SAFE_LOGGING ? "[scrubbed]" : repr(s) +safe_repr = (s) -> SAFE_LOGGING ? "[scrubbed]" : JSON.stringify(s) + # HEADLESS is true if we are running not in a browser with a DOM. query = Query.parse(window.location.search.substr(1)) HEADLESS = "undefined" == typeof(document) @@ -86,10 +111,11 @@ class Snowflake # PeerConnection pc: null rateLimit: 0 - proxyPairs: null + proxyPairs: [] badge: null $badge: null MAX_NUM_CLIENTS = 1 + CONNECTIONS_PER_CLIENT = 1 state: MODE.INIT constructor: -> @@ -134,6 +160,7 @@ class Snowflake log "Data channel opened!" @state = MODE.WEBRTC_READY # TODO: Prepare ProxyPair onw. + @beginProxy() channel.onclose = => log "Data channel closed." @state = MODE.INIT; @@ -159,7 +186,7 @@ class Snowflake log "Invalid SDP message." return false log("SDP " + sdp.type + " successfully received.") - @sendAnswer() if "offer" == sdp.type + @sendAnswer() if 'offer' == sdp.type true sendAnswer: => @@ -171,7 +198,7 @@ class Snowflake # Poll facilitator when this snowflake can support more clients. proxyMain: -> - if @proxyPairs.length >= @MAX_NUM_CLIENTS * CONNECTIONS_PER_CLIENT + if @proxyPairs.length >= @MAX_NUM_CLIENTS * @CONNECTIONS_PER_CLIENT setTimeout(@proxyMain, @facilitator_poll_interval * 1000) return @@ -180,8 +207,8 @@ class Snowflake params.push ["transport", "webrtc"] beginProxy: (client, relay) -> - for i in [0..CONNECTIONS_PER_CLIENT] - makeProxyPair(client, relay) + for i in [1..CONNECTIONS_PER_CLIENT] + @makeProxyPair client, relay makeProxyPair: (client, relay) -> pair = new ProxyPair(client, relay, @rate_limit); @@ -191,16 +218,14 @@ class Snowflake @proxyPairs.splice(@proxy_pairs.indexOf(pair), 1) @badge.endProxy() if @badge try - proxy_pair.connect() + pair.connectRelay() catch err - log "ProxyPair: exception while connecting: " + safe_repr(err.message) + "." + log 'ERROR: ProxyPair exception while connecting.' + log err return @badge.beginProxy if @badge cease: -> - # @start = null - # @proxyMain = null - # @make_proxy_pair = function(client_addr, relay_addr) { }; while @proxyPairs.length > 0 @proxyPairs.pop().close() @@ -215,10 +240,164 @@ class Snowflake @badge.die() if @badge +# Build an escaped URL string from unescaped components. Only scheme and host +# are required. See RFC 3986, section 3. +buildUrl = (scheme, host, port, path, params) -> + parts = [] + parts.push(encodeURIComponent scheme) + parts.push '://' + + # If it contains a colon but no square brackets, treat it as IPv6. + if host.match(/:/) && !host.match(/[[\]]/) + parts.push '[' + parts.push host + parts.push ']' + else + parts.push(encodeURIComponent host) + + if undefined != port && DEFAULT_PORTS[scheme] != port + parts.push ':' + parts.push(encodeURIComponent port.toString()) + + if undefined != path && '' != path + if !path.match(/^\//) + path = '/' + path + ### + Slash is significant so we must protect it from encodeURIComponent, while + still encoding question mark and number sign. RFC 3986, section 3.3: "The + path is terminated by the first question mark ('?') or number sign ('#') + character, or by the end of the URI. ... A path consists of a sequence of + path segments separated by a slash ('/') character." + ### + path = path.replace /[^\/]+/, (m) -> + encodeURIComponent m + parts.push path + + if undefined != params + parts.push '?' + parts.push Query.buildString params + + parts.join '' + +makeWebsocket = (addr) -> + url = buildUrl 'ws', addr.host, addr.port, '/' + if have_websocket_binary_frames() + ws = new WebSocket url + else + ws = new WebSocket url 'base64' + ### + "User agents can use this as a hint for how to handle incoming binary data: if + the attribute is set to 'blob', it is safe to spool it to disk, and if it is + set to 'arraybuffer', it is likely more efficient to keep the data in memory." + ### + ws.binaryType = 'arraybuffer' + ws + + + # TODO: Implement class ProxyPair + # TODO: Hardcoded for now, but should fetch from facilitator later. + relayAddr: null + constructor: (@clientAddr, @relayAddr, @rateLimit) -> + + # Assumes WebRTC part is already connected. + # TODO: Put the webrtc stuff in ProxyPair, so that multiple webrtc connections + # can be established. + connectRelay: -> + log "Snowflake: connecting to relay" + + @relay = makeWebsocket(@relayAddr); + @relay.label = 'Relay' + @relay.onopen = => + log "Snowflake: " + ws.label + "connected" + @relay.onclose = @onClose + @relay.onerror = @onError + @relay.onmessage = @onRelayToClientMessage + + onClientToRelayMessage: (event) -> + @c2r_schedule.push event.data + @flush() + + onRelayToClientMessage: (event) -> + @r2c_schedule.push event.data + @flush() + + onClose: (event) -> + ws = event.target + log(ws.label + ': closed.') + @flush() + @maybeCleanup() + + onError: (event) -> + ws = event.target + log ws.label + ': error.' + this.close(); + # we can't rely on onclose_callback to cleanup, since one common error + # case is when the client fails to connect and the relay never starts. + # in that case close() is a NOP and onclose_callback is never called. + @maybeCleanup() + + isOpen: (ws) -> undefined != ws && WebSocket.OPEN == ws.readyState + isClosed: (ws) -> undefined == ws || WebSocket.CLOSED == ws.readyState + + maybe_cleanup: -> + if @running && @isClosed(client) && @isClosed @relay + @running = false + @cleanup_callback() + true + false + + # Send as much data as the rate limit currently allows. + ### + flush: -> + clearTimeout @flush_timeout_id if @flush_timeout_id + @flush_timeout_id = null + busy = true + checkChunks = -> + busy = false + # if @isOpen @clientthis.client_s) && + # this.client_s.bufferedAmount < MAX_BUFFER && + # this.r2c_schedule.length > 0) { + # chunk = this.r2c_schedule.shift(); + # this.rate_limit.update(chunk.length); + # this.client_s.send(chunk); + # busy = true; + # + if @isOpen @relay && + @relay.bufferedAmount < MAX_BUFFER && + @c2r_schedule.length > 0 + chunk = @c2r_schedule.shift() + @rate_limit.update chunk.length + @relay.send chunk + busy = true + checkChunks() while busy && !@rate_limit.is_limited() + + if @isClosed @relay && + # !isClosed(this.client_s) && + # @client_s.bufferedAmount === 0 && + @r2c_schedule.length == 0 + # log("Client: closing."); + # this.client_s.close(); + # if (is_closed(this.client_s) && + # !is_closed(this.relay_s) && + # this.relay_s.bufferedAmount === 0 && + # this.c2r_schedule.length === 0) { + # log("Relay: closing."); + # this.relay_s.close(); + # } + + + while busy && !@rate_limit.is_limited() + + if this.r2c_schedule.length > 0 || + (@isOpen(@client) && @client.bufferedAmount > 0) || + @c2r_schedule.length > 0 || + (@isOpen(@relay) && @relay.bufferedAmount > 0) + @flush_timeout_id = setTimeout @flush, @rate_limit.when() * 1000 + ### # ## -- DOM & Input Functionality -- ## #