mirror of
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake.git
synced 2025-10-14 05:11:19 -04:00
fix Peers.Count() using activePeers list, mark for delete on Close, and remove
maxedChan
This commit is contained in:
parent
c63f5cfc0a
commit
2caa47988d
4 changed files with 94 additions and 66 deletions
|
@ -75,7 +75,7 @@ func TestSnowflakeClient(t *testing.T) {
|
||||||
snowflakes.Tongue = FakeDialer{}
|
snowflakes.Tongue = FakeDialer{}
|
||||||
|
|
||||||
go ConnectLoop(snowflakes)
|
go ConnectLoop(snowflakes)
|
||||||
<-snowflakes.maxedChan
|
// <-snowflakes.maxedChan
|
||||||
|
|
||||||
So(snowflakes.Count(), ShouldEqual, 1)
|
So(snowflakes.Count(), ShouldEqual, 1)
|
||||||
r := <-snowflakes.snowflakeChan
|
r := <-snowflakes.snowflakeChan
|
||||||
|
@ -88,7 +88,7 @@ func TestSnowflakeClient(t *testing.T) {
|
||||||
snowflakes.Tongue = FakeDialer{}
|
snowflakes.Tongue = FakeDialer{}
|
||||||
|
|
||||||
go ConnectLoop(snowflakes)
|
go ConnectLoop(snowflakes)
|
||||||
<-snowflakes.maxedChan
|
// <-snowflakes.maxedChan
|
||||||
So(snowflakes.Count(), ShouldEqual, 3)
|
So(snowflakes.Count(), ShouldEqual, 3)
|
||||||
<-snowflakes.snowflakeChan
|
<-snowflakes.snowflakeChan
|
||||||
<-snowflakes.snowflakeChan
|
<-snowflakes.snowflakeChan
|
||||||
|
@ -101,13 +101,13 @@ func TestSnowflakeClient(t *testing.T) {
|
||||||
snowflakes.Tongue = FakeDialer{}
|
snowflakes.Tongue = FakeDialer{}
|
||||||
|
|
||||||
go ConnectLoop(snowflakes)
|
go ConnectLoop(snowflakes)
|
||||||
<-snowflakes.maxedChan
|
// <-snowflakes.maxedChan
|
||||||
So(snowflakes.Count(), ShouldEqual, 3)
|
So(snowflakes.Count(), ShouldEqual, 3)
|
||||||
|
|
||||||
r := <-snowflakes.snowflakeChan
|
r := <-snowflakes.snowflakeChan
|
||||||
So(snowflakes.Count(), ShouldEqual, 2)
|
So(snowflakes.Count(), ShouldEqual, 2)
|
||||||
r.Close()
|
r.Close()
|
||||||
<-snowflakes.maxedChan
|
// <-snowflakes.maxedChan
|
||||||
So(snowflakes.Count(), ShouldEqual, 3)
|
So(snowflakes.Count(), ShouldEqual, 3)
|
||||||
|
|
||||||
<-snowflakes.snowflakeChan
|
<-snowflakes.snowflakeChan
|
||||||
|
@ -121,7 +121,6 @@ func TestSnowflakeClient(t *testing.T) {
|
||||||
Convey("Can construct", func() {
|
Convey("Can construct", func() {
|
||||||
p := NewPeers(1)
|
p := NewPeers(1)
|
||||||
So(p.capacity, ShouldEqual, 1)
|
So(p.capacity, ShouldEqual, 1)
|
||||||
So(p.current, ShouldEqual, nil)
|
|
||||||
So(p.snowflakeChan, ShouldNotBeNil)
|
So(p.snowflakeChan, ShouldNotBeNil)
|
||||||
So(cap(p.snowflakeChan), ShouldEqual, 1)
|
So(cap(p.snowflakeChan), ShouldEqual, 1)
|
||||||
})
|
})
|
||||||
|
@ -136,36 +135,54 @@ func TestSnowflakeClient(t *testing.T) {
|
||||||
err = p.Collect()
|
err = p.Collect()
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(p.Count(), ShouldEqual, 1)
|
So(p.Count(), ShouldEqual, 1)
|
||||||
// S
|
// S
|
||||||
err = p.Collect()
|
err = p.Collect()
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("Collection continues until capacity.", func() {
|
Convey("Collection continues until capacity.", func() {
|
||||||
c := 5
|
c := 5
|
||||||
p := NewPeers(c)
|
p := NewPeers(c)
|
||||||
p.Tongue = FakeDialer{}
|
p.Tongue = FakeDialer{}
|
||||||
// Fill up to capacity.
|
// Fill up to capacity.
|
||||||
for i := 0 ; i < c ; i++ {
|
for i := 0; i < c; i++ {
|
||||||
fmt.Println("Adding snowflake ", i)
|
fmt.Println("Adding snowflake ", i)
|
||||||
err := p.Collect()
|
err := p.Collect()
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(p.Count(), ShouldEqual, i + 1)
|
So(p.Count(), ShouldEqual, i+1)
|
||||||
}
|
}
|
||||||
// But adding another gives an error.
|
// But adding another gives an error.
|
||||||
So(p.Count(), ShouldEqual, c)
|
So(p.Count(), ShouldEqual, c)
|
||||||
err := p.Collect()
|
err := p.Collect()
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
So(p.Count(), ShouldEqual, c)
|
So(p.Count(), ShouldEqual, c)
|
||||||
|
|
||||||
// But popping allows it to continue.
|
// But popping and closing allows it to continue.
|
||||||
s := p.Pop()
|
s := p.Pop()
|
||||||
So(s, ShouldNotBeNil)
|
s.Close()
|
||||||
So(p.Count(), ShouldEqual, c)
|
So(s, ShouldNotBeNil)
|
||||||
|
So(p.Count(), ShouldEqual, c-1)
|
||||||
|
|
||||||
|
err = p.Collect()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(p.Count(), ShouldEqual, c)
|
||||||
|
})
|
||||||
|
|
||||||
|
Convey("Count correctly purges peers marked for deletion.", func() {
|
||||||
|
p := NewPeers(4)
|
||||||
|
p.Tongue = FakeDialer{}
|
||||||
|
p.Collect()
|
||||||
|
p.Collect()
|
||||||
|
p.Collect()
|
||||||
|
p.Collect()
|
||||||
|
So(p.Count(), ShouldEqual, 4)
|
||||||
|
s := p.Pop()
|
||||||
|
s.Close()
|
||||||
|
So(p.Count(), ShouldEqual, 3)
|
||||||
|
s = p.Pop()
|
||||||
|
s.Close()
|
||||||
|
So(p.Count(), ShouldEqual, 2)
|
||||||
|
})
|
||||||
|
|
||||||
// err = p.Collect()
|
|
||||||
// So(err, ShouldNotBeNil)
|
|
||||||
// So(p.Count(), ShouldEqual, c)
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("Snowflake", t, func() {
|
Convey("Snowflake", t, func() {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"container/list"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
@ -22,70 +23,77 @@ type Peers struct {
|
||||||
BytesLogger
|
BytesLogger
|
||||||
|
|
||||||
snowflakeChan chan *webRTCConn
|
snowflakeChan chan *webRTCConn
|
||||||
current *webRTCConn
|
activePeers *list.List
|
||||||
capacity int
|
capacity int
|
||||||
// TODO: Probably not necessary.
|
|
||||||
maxedChan chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construct a fresh container of remote peers.
|
// Construct a fresh container of remote peers.
|
||||||
func NewPeers(max int) *Peers {
|
func NewPeers(max int) *Peers {
|
||||||
p := &Peers{capacity: max, current: nil}
|
p := &Peers{capacity: max}
|
||||||
// Use buffered go channel to pass new snowflakes onwards to the SOCKS handler.
|
// Use buffered go channel to pass new snowflakes onwards to the SOCKS handler.
|
||||||
p.snowflakeChan = make(chan *webRTCConn, max)
|
p.snowflakeChan = make(chan *webRTCConn, max)
|
||||||
p.maxedChan = make(chan struct{}, 1)
|
p.activePeers = list.New()
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Needs fixing.
|
|
||||||
func (p *Peers) Count() int {
|
|
||||||
count := 0
|
|
||||||
if p.current != nil {
|
|
||||||
count = 1
|
|
||||||
}
|
|
||||||
return count + len(p.snowflakeChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
// As part of |SnowflakeCollector| interface.
|
// As part of |SnowflakeCollector| interface.
|
||||||
func (p *Peers) Collect() error {
|
func (p *Peers) Collect() error {
|
||||||
if p.Count() >= p.capacity {
|
cnt := p.Count()
|
||||||
s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
|
if cnt >= p.capacity {
|
||||||
p.maxedChan <- struct{}{}
|
s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
|
||||||
return errors.New(s)
|
return errors.New(s)
|
||||||
}
|
}
|
||||||
// Engage the Snowflake Catching interface, which must be available.
|
// Engage the Snowflake Catching interface, which must be available.
|
||||||
if nil == p.Tongue {
|
if nil == p.Tongue {
|
||||||
return errors.New("Missing Tongue to catch Snowflakes with.")
|
return errors.New("Missing Tongue to catch Snowflakes with.")
|
||||||
}
|
}
|
||||||
connection, err := p.Tongue.Catch()
|
connection, err := p.Tongue.Catch()
|
||||||
if nil == connection || nil != err {
|
if nil == connection || nil != err {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Use the same rate-limited traffic logger to keep consistency.
|
// Track new valid Snowflake in internal collection and pass along.
|
||||||
connection.BytesLogger = p.BytesLogger
|
p.activePeers.PushBack(connection)
|
||||||
p.snowflakeChan <- connection
|
p.snowflakeChan <- connection
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// As part of |SnowflakeCollector| interface.
|
// As part of |SnowflakeCollector| interface.
|
||||||
func (p *Peers) Pop() *webRTCConn {
|
func (p *Peers) Pop() *webRTCConn {
|
||||||
// Blocks until an available snowflake appears.
|
// Blocks until an available snowflake appears.
|
||||||
snowflake, ok := <-p.snowflakeChan
|
snowflake, ok := <-p.snowflakeChan
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
p.current = snowflake
|
// Set to use the same rate-limited traffic logger to keep consistency.
|
||||||
snowflake.BytesLogger = p.BytesLogger
|
snowflake.BytesLogger = p.BytesLogger
|
||||||
return snowflake
|
return snowflake
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all remote peers.
|
// Returns total available Snowflakes (including the active one)
|
||||||
func (p *Peers) End() {
|
// The count only reduces when connections themselves close, rather than when
|
||||||
log.Printf("WebRTC: interruped")
|
// they are popped.
|
||||||
if nil != p.current {
|
func (p *Peers) Count() int {
|
||||||
p.current.Close()
|
p.purgeClosedPeers()
|
||||||
}
|
return p.activePeers.Len()
|
||||||
for r := range p.snowflakeChan {
|
}
|
||||||
r.Close()
|
|
||||||
|
func (p *Peers) purgeClosedPeers() {
|
||||||
|
for e := p.activePeers.Front(); e != nil; {
|
||||||
|
next := e.Next()
|
||||||
|
conn := e.Value.(*webRTCConn)
|
||||||
|
// Purge those marked for deletion.
|
||||||
|
if conn.closed {
|
||||||
|
p.activePeers.Remove(e)
|
||||||
|
}
|
||||||
|
e = next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close all Peers contained here.
|
||||||
|
func (p *Peers) End() {
|
||||||
|
log.Printf("WebRTC: Ending all peer connections.")
|
||||||
|
for e := p.activePeers.Front(); e != nil; e = e.Next() {
|
||||||
|
conn := e.Value.(*webRTCConn)
|
||||||
|
conn.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,7 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Successful collection gets rate limited to once per second.
|
// Successful collection gets rate limited to once per second.
|
||||||
|
log.Println("ConnectLoop success.")
|
||||||
<-time.After(time.Second)
|
<-time.After(time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,10 +69,10 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
|
||||||
// Accept local SOCKS connections and pass them to the handler.
|
// Accept local SOCKS connections and pass them to the handler.
|
||||||
func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
|
func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
|
||||||
defer ln.Close()
|
defer ln.Close()
|
||||||
|
log.Println("Started SOCKS listener.")
|
||||||
for {
|
for {
|
||||||
log.Println("SOCKS listening...", ln)
|
|
||||||
conn, err := ln.AcceptSocks()
|
conn, err := ln.AcceptSocks()
|
||||||
log.Println("accepting", conn, err)
|
log.Println("SOCKS accepted ", conn.Req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if e, ok := err.(net.Error); ok && e.Temporary() {
|
if e, ok := err.(net.Error); ok && e.Temporary() {
|
||||||
continue
|
continue
|
||||||
|
@ -138,7 +139,8 @@ func readSignalingMessages(f *os.File) {
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
webrtc.SetLoggingVerbosity(1)
|
webrtc.SetLoggingVerbosity(1)
|
||||||
logFile, err := os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
|
logFile, err := os.OpenFile("snowflake.log",
|
||||||
|
os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,10 @@ type WebRTCDialer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWebRTCDialer(broker *BrokerChannel) *WebRTCDialer {
|
func NewWebRTCDialer(broker *BrokerChannel) *WebRTCDialer {
|
||||||
|
config := webrtc.NewConfiguration(iceServers...)
|
||||||
return &WebRTCDialer{
|
return &WebRTCDialer{
|
||||||
broker,
|
BrokerChannel: broker,
|
||||||
webrtc.NewConfiguration(iceServers...),
|
webrtcConfig: config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue