[WIP] Split broker into components

Exploring #26092
This commit is contained in:
Arlo Breault 2021-05-18 19:23:13 -04:00 committed by Arlo Breault
parent 0ef2250280
commit e5d57647f0
5 changed files with 837 additions and 781 deletions

1
.gitignore vendored
View file

@ -5,6 +5,7 @@
.DS_Store .DS_Store
datadir/ datadir/
broker/broker broker/broker
broker/http-frontend/http-frontend
client/client client/client
server/server server/server
proxy/proxy proxy/proxy

View file

@ -7,22 +7,19 @@ package main
import ( import (
"container/heap" "container/heap"
"crypto/tls"
"flag" "flag"
"io" "io"
"log" "log"
"net/http" "net"
"net/rpc"
"os" "os"
"os/signal" "os/signal"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
"github.com/prometheus/client_golang/prometheus" // "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/crypto/acme/autocert"
) )
type BrokerContext struct { type BrokerContext struct {
@ -105,7 +102,7 @@ func (ctx *BrokerContext) Broker() {
} else { } else {
heap.Remove(ctx.restrictedSnowflakes, snowflake.index) heap.Remove(ctx.restrictedSnowflakes, snowflake.index)
} }
ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": request.natType, "type": request.proxyType}).Dec() // ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": request.natType, "type": request.proxyType}).Dec()
delete(ctx.idToSnowflake, snowflake.id) delete(ctx.idToSnowflake, snowflake.id)
close(request.offerChannel) close(request.offerChannel)
} }
@ -131,7 +128,7 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri
} else { } else {
heap.Push(ctx.restrictedSnowflakes, snowflake) heap.Push(ctx.restrictedSnowflakes, snowflake)
} }
ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc() // ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
ctx.snowflakeLock.Unlock() ctx.snowflakeLock.Unlock()
ctx.idToSnowflake[id] = snowflake ctx.idToSnowflake[id] = snowflake
return snowflake return snowflake
@ -144,34 +141,29 @@ type ClientOffer struct {
} }
func main() { func main() {
var acmeEmail string
var acmeHostnamesCommas string
var acmeCertCacheDir string
var addr string
var geoipDatabase string var geoipDatabase string
var geoip6Database string var geoip6Database string
var disableTLS bool
var certFilename, keyFilename string
var disableGeoip bool var disableGeoip bool
var metricsFilename string var metricsFilename string
var unsafeLogging bool var unsafeLogging bool
flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications") var socket string
flag.StringVar(&acmeHostnamesCommas, "acme-hostnames", "", "comma-separated hostnames for TLS certificate")
flag.StringVar(&certFilename, "cert", "", "TLS certificate file")
flag.StringVar(&keyFilename, "key", "", "TLS private key file")
flag.StringVar(&acmeCertCacheDir, "acme-cert-cache", "acme-cert-cache", "directory in which certificates should be cached")
flag.StringVar(&addr, "addr", ":443", "address to listen on")
flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes") flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes")
flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes") flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes")
flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection") flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output") flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed") flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed")
flag.StringVar(&socket, "socket", "/tmp/broker.sock", "path to ipc socket")
flag.Parse() flag.Parse()
var err error var err error
var metricsFile io.Writer var metricsFile io.Writer
var logOutput io.Writer = os.Stderr var logOutput io.Writer = os.Stderr
if unsafeLogging { if unsafeLogging {
log.SetOutput(logOutput) log.SetOutput(logOutput)
@ -179,7 +171,6 @@ func main() {
// We want to send the log output through our scrubber first // We want to send the log output through our scrubber first
log.SetOutput(&safelog.LogScrubber{Output: logOutput}) log.SetOutput(&safelog.LogScrubber{Output: logOutput})
} }
log.SetFlags(log.LstdFlags | log.LUTC) log.SetFlags(log.LstdFlags | log.LUTC)
if metricsFilename != "" { if metricsFilename != "" {
@ -205,21 +196,6 @@ func main() {
go ctx.Broker() go ctx.Broker()
i := &IPC{ctx}
http.HandleFunc("/robots.txt", robotsTxtHandler)
http.Handle("/proxy", SnowflakeHandler{i, proxyPolls})
http.Handle("/client", SnowflakeHandler{i, clientOffers})
http.Handle("/answer", SnowflakeHandler{i, proxyAnswers})
http.Handle("/debug", SnowflakeHandler{i, debugHandler})
http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler})
http.Handle("/prometheus", promhttp.HandlerFor(ctx.metrics.promMetrics.registry, promhttp.HandlerOpts{}))
server := http.Server{
Addr: addr,
}
sigChan := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP) signal.Notify(sigChan, syscall.SIGHUP)
@ -236,49 +212,18 @@ func main() {
} }
}() }()
// Handle the various ways of setting up TLS. The legal configurations // if err := os.RemoveAll(socket); err != nil {
// are: // log.Fatal(err)
// --acme-hostnames (with optional --acme-email and/or --acme-cert-cache) // }
// --cert and --key together
// --disable-tls
// The outputs of this block of code are the disableTLS,
// needHTTP01Listener, certManager, and getCertificate variables.
if acmeHostnamesCommas != "" {
acmeHostnames := strings.Split(acmeHostnamesCommas, ",")
log.Printf("ACME hostnames: %q", acmeHostnames)
var cache autocert.Cache ipc := &IPC{ctx}
if err = os.MkdirAll(acmeCertCacheDir, 0700); err != nil { rpc.Register(ipc)
log.Printf("Warning: Couldn't create cache directory %q (reason: %s) so we're *not* using our certificate cache.", acmeCertCacheDir, err)
} else {
cache = autocert.DirCache(acmeCertCacheDir)
}
certManager := autocert.Manager{
Cache: cache,
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(acmeHostnames...),
Email: acmeEmail,
}
go func() {
log.Printf("Starting HTTP-01 listener")
log.Fatal(http.ListenAndServe(":80", certManager.HTTPHandler(nil)))
}()
server.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate}
err = server.ListenAndServeTLS("", "")
} else if certFilename != "" && keyFilename != "" {
if acmeEmail != "" || acmeHostnamesCommas != "" {
log.Fatalf("The --cert and --key options are not allowed with --acme-email or --acme-hostnames.")
}
err = server.ListenAndServeTLS(certFilename, keyFilename)
} else if disableTLS {
err = server.ListenAndServe()
} else {
log.Fatal("the --acme-hostnames, --cert and --key, or --disable-tls option is required")
}
l, err := net.Listen("unix", socket)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer l.Close()
rpc.Accept(l)
} }

View file

@ -1,14 +1,22 @@
package main package main
import ( import (
"crypto/tls"
"errors" "errors"
"flag"
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"net/rpc"
"os" "os"
"strings"
// "github.com/prometheus/client_golang/prometheus"
// "github.com/prometheus/client_golang/prometheus/promhttp"
"git.torproject.org/pluggable-transports/snowflake.git/common/messages" "git.torproject.org/pluggable-transports/snowflake.git/common/messages"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
"golang.org/x/crypto/acme/autocert"
) )
const ( const (
@ -17,8 +25,8 @@ const (
// Implements the http.Handler interface // Implements the http.Handler interface
type SnowflakeHandler struct { type SnowflakeHandler struct {
*IPC c *rpc.Client
handle func(*IPC, http.ResponseWriter, *http.Request) handle func(*rpc.Client, http.ResponseWriter, *http.Request)
} }
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -28,7 +36,7 @@ func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if "OPTIONS" == r.Method { if "OPTIONS" == r.Method {
return return
} }
sh.handle(sh.IPC, w, r) sh.handle(sh.c, w, r)
} }
// Implements the http.Handler interface // Implements the http.Handler interface
@ -73,10 +81,10 @@ func metricsHandler(metricsFilename string, w http.ResponseWriter, r *http.Reque
} }
} }
func debugHandler(i *IPC, w http.ResponseWriter, r *http.Request) { func debugHandler(c *rpc.Client, w http.ResponseWriter, r *http.Request) {
var response string var response string
err := i.Debug(new(interface{}), &response) err := c.Call("IPC.Debug", new(interface{}), &response)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
@ -91,7 +99,7 @@ func debugHandler(i *IPC, w http.ResponseWriter, r *http.Request) {
/* /*
For snowflake proxies to request a client from the Broker. For snowflake proxies to request a client from the Broker.
*/ */
func proxyPolls(i *IPC, w http.ResponseWriter, r *http.Request) { func proxyPolls(c *rpc.Client, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if err != nil { if err != nil {
log.Println("Invalid data.") log.Println("Invalid data.")
@ -105,7 +113,7 @@ func proxyPolls(i *IPC, w http.ResponseWriter, r *http.Request) {
} }
var response []byte var response []byte
err = i.ProxyPolls(arg, &response) err = c.Call("IPC.ProxyPolls", arg, &response)
switch { switch {
case err == nil: case err == nil:
case errors.Is(err, messages.ErrBadRequest): case errors.Is(err, messages.ErrBadRequest):
@ -129,7 +137,7 @@ Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client. the HTTP response back to the client.
*/ */
func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) { func clientOffers(c *rpc.Client, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if err != nil { if err != nil {
log.Printf("Error reading client request: %s", err.Error()) log.Printf("Error reading client request: %s", err.Error())
@ -159,7 +167,7 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
} }
var response []byte var response []byte
err = i.ClientOffers(arg, &response) err = c.Call("IPC.ClientOffers", arg, &response)
if err != nil { if err != nil {
// Assert err == messages.ErrInternal // Assert err == messages.ErrInternal
log.Println(err) log.Println(err)
@ -194,11 +202,11 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
} }
/* /*
Expects snowflake proxes which have previously successfully received Expects snowflake proxies which have previously successfully received
an offer from proxyHandler to respond with an answer in an HTTP POST, an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client. which the broker will pass back to the original client.
*/ */
func proxyAnswers(i *IPC, w http.ResponseWriter, r *http.Request) { func proxyAnswers(c *rpc.Client, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if err != nil { if err != nil {
log.Println("Invalid data.") log.Println("Invalid data.")
@ -212,7 +220,7 @@ func proxyAnswers(i *IPC, w http.ResponseWriter, r *http.Request) {
} }
var response []byte var response []byte
err = i.ProxyAnswers(arg, &response) err = c.Call("IPC.ProxyAnswers", arg, &response)
switch { switch {
case err == nil: case err == nil:
case errors.Is(err, messages.ErrBadRequest): case errors.Is(err, messages.ErrBadRequest):
@ -230,3 +238,107 @@ func proxyAnswers(i *IPC, w http.ResponseWriter, r *http.Request) {
log.Printf("proxyAnswers unable to write answer response with error: %v", err) log.Printf("proxyAnswers unable to write answer response with error: %v", err)
} }
} }
func main() {
var acmeEmail string
var acmeHostnamesCommas string
var acmeCertCacheDir string
var addr string
var disableTLS bool
var certFilename, keyFilename string
var metricsFilename string
var unsafeLogging bool
var socket string
flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications")
flag.StringVar(&acmeHostnamesCommas, "acme-hostnames", "", "comma-separated hostnames for TLS certificate")
flag.StringVar(&certFilename, "cert", "", "TLS certificate file")
flag.StringVar(&keyFilename, "key", "", "TLS private key file")
flag.StringVar(&acmeCertCacheDir, "acme-cert-cache", "acme-cert-cache", "directory in which certificates should be cached")
flag.StringVar(&addr, "addr", ":443", "address to listen on")
flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed")
flag.StringVar(&socket, "socket", "/tmp/broker.sock", "path to ipc socket")
flag.Parse()
var logOutput io.Writer = os.Stderr
if unsafeLogging {
log.SetOutput(logOutput)
} else {
// We want to send the log output through our scrubber first
log.SetOutput(&safelog.LogScrubber{Output: logOutput})
}
log.SetFlags(log.LstdFlags | log.LUTC)
var c, err = rpc.Dial("unix", socket)
if err != nil {
log.Fatal(err)
}
defer c.Close()
http.HandleFunc("/robots.txt", robotsTxtHandler)
http.Handle("/proxy", SnowflakeHandler{c, proxyPolls})
http.Handle("/client", SnowflakeHandler{c, clientOffers})
http.Handle("/answer", SnowflakeHandler{c, proxyAnswers})
http.Handle("/debug", SnowflakeHandler{c, debugHandler})
http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler})
// http.Handle("/prometheus", promhttp.HandlerFor(ctx.metrics.promMetrics.registry, promhttp.HandlerOpts{}))
server := http.Server{
Addr: addr,
}
// Handle the various ways of setting up TLS. The legal configurations
// are:
// --acme-hostnames (with optional --acme-email and/or --acme-cert-cache)
// --cert and --key together
// --disable-tls
// The outputs of this block of code are the disableTLS,
// needHTTP01Listener, certManager, and getCertificate variables.
if acmeHostnamesCommas != "" {
acmeHostnames := strings.Split(acmeHostnamesCommas, ",")
log.Printf("ACME hostnames: %q", acmeHostnames)
var cache autocert.Cache
if err = os.MkdirAll(acmeCertCacheDir, 0700); err != nil {
log.Printf("Warning: Couldn't create cache directory %q (reason: %s) so we're *not* using our certificate cache.", acmeCertCacheDir, err)
} else {
cache = autocert.DirCache(acmeCertCacheDir)
}
certManager := autocert.Manager{
Cache: cache,
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(acmeHostnames...),
Email: acmeEmail,
}
go func() {
log.Printf("Starting HTTP-01 listener")
log.Fatal(http.ListenAndServe(":80", certManager.HTTPHandler(nil)))
}()
server.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate}
err = server.ListenAndServeTLS("", "")
} else if certFilename != "" && keyFilename != "" {
if acmeEmail != "" || acmeHostnamesCommas != "" {
log.Fatalf("The --cert and --key options are not allowed with --acme-email or --acme-hostnames.")
}
err = server.ListenAndServeTLS(certFilename, keyFilename)
} else if disableTLS {
err = server.ListenAndServe()
} else {
log.Fatal("the --acme-hostnames, --cert and --key, or --disable-tls option is required")
}
if err != nil {
log.Fatal(err)
}
}

View file

@ -9,7 +9,7 @@ import (
"time" "time"
"git.torproject.org/pluggable-transports/snowflake.git/common/messages" "git.torproject.org/pluggable-transports/snowflake.git/common/messages"
"github.com/prometheus/client_golang/prometheus" // "github.com/prometheus/client_golang/prometheus"
) )
const ( const (
@ -98,7 +98,7 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
if offer == nil { if offer == nil {
i.ctx.metrics.lock.Lock() i.ctx.metrics.lock.Lock()
i.ctx.metrics.proxyIdleCount++ i.ctx.metrics.proxyIdleCount++
i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc() // i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc()
i.ctx.metrics.lock.Unlock() i.ctx.metrics.lock.Unlock()
b, err = messages.EncodePollResponse("", false, "") b, err = messages.EncodePollResponse("", false, "")
@ -110,7 +110,7 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
return nil return nil
} }
i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc() // i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc()
b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType) b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType)
if err != nil { if err != nil {
return messages.ErrInternal return messages.ErrInternal
@ -181,7 +181,7 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
if numSnowflakes <= 0 { if numSnowflakes <= 0 {
i.ctx.metrics.lock.Lock() i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientDeniedCount++ i.ctx.metrics.clientDeniedCount++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc() // i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc()
if offer.natType == NATUnrestricted { if offer.natType == NATUnrestricted {
i.ctx.metrics.clientUnrestrictedDeniedCount++ i.ctx.metrics.clientUnrestrictedDeniedCount++
} else { } else {
@ -211,7 +211,7 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
case answer := <-snowflake.answerChannel: case answer := <-snowflake.answerChannel:
i.ctx.metrics.lock.Lock() i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientProxyMatchCount++ i.ctx.metrics.clientProxyMatchCount++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc() // i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc()
i.ctx.metrics.lock.Unlock() i.ctx.metrics.lock.Unlock()
switch version { switch version {
case v1: case v1:
@ -235,7 +235,7 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
} }
i.ctx.snowflakeLock.Lock() i.ctx.snowflakeLock.Lock()
i.ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": snowflake.natType, "type": snowflake.proxyType}).Dec() // i.ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": snowflake.natType, "type": snowflake.proxyType}).Dec()
delete(i.ctx.idToSnowflake, snowflake.id) delete(i.ctx.idToSnowflake, snowflake.id)
i.ctx.snowflakeLock.Unlock() i.ctx.snowflakeLock.Unlock()

File diff suppressed because it is too large Load diff