mirror of
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake.git
synced 2025-10-14 05:11:19 -04:00
Handle generated errors in broker
This commit is contained in:
parent
ed3d42e1ec
commit
3cfceb3755
4 changed files with 39 additions and 22 deletions
|
@ -179,7 +179,10 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
|
|||
w.WriteHeader(http.StatusGatewayTimeout)
|
||||
return
|
||||
}
|
||||
w.Write(offer)
|
||||
log.Println("Passing client offer to snowflake.")
|
||||
if _, err := w.Write(offer); err != nil {
|
||||
log.Printf("proxyPolls unable to write offer with error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -211,14 +214,18 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
|
|||
select {
|
||||
case answer := <-snowflake.answerChannel:
|
||||
ctx.metrics.clientProxyMatchCount++
|
||||
w.Write(answer)
|
||||
if _, err := w.Write(answer); err != nil {
|
||||
log.Printf("unable to write answer with error: %v", err)
|
||||
}
|
||||
// Initial tracking of elapsed time.
|
||||
ctx.metrics.clientRoundtripEstimate = time.Since(startTime) /
|
||||
time.Millisecond
|
||||
case <-time.After(time.Second * ClientTimeout):
|
||||
log.Println("Client: Timed out.")
|
||||
w.WriteHeader(http.StatusGatewayTimeout)
|
||||
w.Write([]byte("timed out waiting for answer!"))
|
||||
if _, err := w.Write([]byte("timed out waiting for answer!")); err != nil {
|
||||
log.Printf("unable to write timeout error, failed with error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -259,12 +266,16 @@ func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
|
||||
s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
|
||||
w.Write([]byte(s))
|
||||
if _, err := w.Write([]byte(s)); err != nil {
|
||||
log.Printf("writing proxy information returned error: %v ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
w.Write([]byte("User-agent: *\nDisallow: /\n"))
|
||||
if _, err := w.Write([]byte("User-agent: *\nDisallow: /\n")); err != nil {
|
||||
log.Printf("robotsTxtHandler unable to write, with this error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func metricsHandler(metricsFilename string, w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -281,7 +292,9 @@ func metricsHandler(metricsFilename string, w http.ResponseWriter, r *http.Reque
|
|||
return
|
||||
}
|
||||
|
||||
io.Copy(w, metricsFile)
|
||||
if _, err := io.Copy(w, metricsFile); err != nil {
|
||||
log.Printf("copying metricsFile returned error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -310,7 +323,7 @@ func main() {
|
|||
flag.Parse()
|
||||
|
||||
var err error
|
||||
var metricsFile io.Writer = os.Stdout
|
||||
var metricsFile io.Writer
|
||||
var logOutput io.Writer = os.Stderr
|
||||
//We want to send the log output through our scrubber first
|
||||
log.SetOutput(&safelog.LogScrubber{Output: logOutput})
|
||||
|
@ -332,7 +345,7 @@ func main() {
|
|||
ctx := NewBrokerContext(metricsLogger)
|
||||
|
||||
if !disableGeoip {
|
||||
err := ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
|
||||
err = ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
@ -361,8 +374,10 @@ func main() {
|
|||
go func() {
|
||||
for {
|
||||
signal := <-sigChan
|
||||
log.Println("Received signal:", signal, ". Reloading geoip databases.")
|
||||
ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
|
||||
log.Printf("Received signal: %s. Reloading geoip databases.", signal)
|
||||
if err = ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database); err != nil {
|
||||
log.Fatalf("reload of Geo IP databases on signal %s returned error: %v", signal, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
@ -93,7 +93,7 @@ func (table *GeoIPv6Table) Unlock() { (*table).lock.Unlock() }
|
|||
func geoipStringToIP(ipStr string) (net.IP, error) {
|
||||
ip, err := strconv.ParseUint(ipStr, 10, 32)
|
||||
if err != nil {
|
||||
return net.IPv4(0, 0, 0, 0), fmt.Errorf("Error parsing IP %s", ipStr)
|
||||
return net.IPv4(0, 0, 0, 0), fmt.Errorf("error parsing IP %s", ipStr)
|
||||
}
|
||||
var bytes [4]byte
|
||||
bytes[0] = byte(ip & 0xFF)
|
||||
|
@ -115,7 +115,7 @@ func (table *GeoIPv4Table) parseEntry(candidate string) (*GeoIPEntry, error) {
|
|||
parsedCandidate := strings.Split(candidate, ",")
|
||||
|
||||
if len(parsedCandidate) != 3 {
|
||||
return nil, fmt.Errorf("Provided geoip file is incorrectly formatted. Could not parse line:\n%s", parsedCandidate)
|
||||
return nil, fmt.Errorf("provided geoip file is incorrectly formatted. Could not parse line:\n%s", parsedCandidate)
|
||||
}
|
||||
|
||||
low, err := geoipStringToIP(parsedCandidate[0])
|
||||
|
@ -190,7 +190,7 @@ func GeoIPLoadFile(table GeoIPTable, pathname string) error {
|
|||
for scanner.Scan() {
|
||||
entry, err := table.parseEntry(scanner.Text())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Provided geoip file is incorrectly formatted. Line is: %+q", scanner.Text())
|
||||
return fmt.Errorf("provided geoip file is incorrectly formatted. Line is: %+q", scanner.Text())
|
||||
}
|
||||
|
||||
if entry != nil {
|
||||
|
|
|
@ -120,7 +120,6 @@ func (m *Metrics) UpdateCountryStats(addr string) {
|
|||
m.countryStats.counts[country]++
|
||||
m.countryStats.addrs[addr] = true
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
|
||||
|
@ -132,19 +131,16 @@ func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
|
|||
if err != nil {
|
||||
m.tablev4 = nil
|
||||
return err
|
||||
} else {
|
||||
m.tablev4 = tablev4
|
||||
}
|
||||
m.tablev4 = tablev4
|
||||
|
||||
tablev6 := new(GeoIPv6Table)
|
||||
err = GeoIPLoadFile(tablev6, geoip6DB)
|
||||
if err != nil {
|
||||
m.tablev6 = nil
|
||||
return err
|
||||
} else {
|
||||
m.tablev6 = tablev6
|
||||
}
|
||||
|
||||
m.tablev6 = tablev6
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@ package main
|
|||
import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
|
@ -12,6 +11,8 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func NullLogger() *log.Logger {
|
||||
|
@ -181,7 +182,7 @@ func TestBroker(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("with error if the proxy writes too much data", func() {
|
||||
data := bytes.NewReader(make([]byte, 100001, 100001))
|
||||
data := bytes.NewReader(make([]byte, 100001))
|
||||
r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
|
||||
r.Header.Set("X-Session-ID", "test")
|
||||
So(err, ShouldBeNil)
|
||||
|
@ -385,7 +386,9 @@ func TestGeoip(t *testing.T) {
|
|||
|
||||
// Make sure things behave properly if geoip file fails to load
|
||||
ctx := NewBrokerContext(NullLogger())
|
||||
ctx.metrics.LoadGeoipDatabases("invalid_filename", "invalid_filename6")
|
||||
if err := ctx.metrics.LoadGeoipDatabases("invalid_filename", "invalid_filename6"); err != nil {
|
||||
log.Printf("loading geo ip databases returned error: %v", err)
|
||||
}
|
||||
ctx.metrics.UpdateCountryStats("127.0.0.1")
|
||||
So(ctx.metrics.tablev4, ShouldEqual, nil)
|
||||
|
||||
|
@ -504,6 +507,9 @@ func TestMetrics(t *testing.T) {
|
|||
|
||||
data = bytes.NewReader([]byte("test"))
|
||||
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
|
||||
if err != nil {
|
||||
log.Printf("unable to get NewRequest with error: %v", err)
|
||||
}
|
||||
r.Header.Set("X-Session-ID", "test")
|
||||
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
|
||||
go func(ctx *BrokerContext) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue