mirror of
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake.git
synced 2025-10-14 05:11:19 -04:00
Rewrite websocketconn with synchronous pipes.
Makes the following changes: * permits concurrent Read/Write/Close * converts certain CloseErrors into io.EOF https://bugs.torproject.org/33144
This commit is contained in:
parent
5708a1d57b
commit
01e28aa460
1 changed files with 82 additions and 40 deletions
|
@ -10,61 +10,103 @@ import (
|
||||||
// An abstraction that makes an underlying WebSocket connection look like an
|
// An abstraction that makes an underlying WebSocket connection look like an
|
||||||
// io.ReadWriteCloser.
|
// io.ReadWriteCloser.
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
Ws *websocket.Conn
|
ws *websocket.Conn
|
||||||
r io.Reader
|
Reader io.Reader
|
||||||
|
Writer io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements io.Reader.
|
// Implements io.Reader.
|
||||||
func (conn *Conn) Read(b []byte) (n int, err error) {
|
func (conn *Conn) Read(b []byte) (n int, err error) {
|
||||||
var opCode int
|
return conn.Reader.Read(b)
|
||||||
if conn.r == nil {
|
|
||||||
// New message
|
|
||||||
var r io.Reader
|
|
||||||
for {
|
|
||||||
if opCode, r, err = conn.Ws.NextReader(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if opCode != websocket.BinaryMessage && opCode != websocket.TextMessage {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.r = r
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err = conn.r.Read(b)
|
|
||||||
if err == io.EOF {
|
|
||||||
// Message finished
|
|
||||||
conn.r = nil
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements io.Writer.
|
// Implements io.Writer.
|
||||||
func (conn *Conn) Write(b []byte) (n int, err error) {
|
func (conn *Conn) Write(b []byte) (n int, err error) {
|
||||||
var w io.WriteCloser
|
return conn.Writer.Write(b)
|
||||||
if w, err = conn.Ws.NextWriter(websocket.BinaryMessage); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if n, err = w.Write(b); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = w.Close()
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements io.Closer.
|
// Implements io.Closer.
|
||||||
func (conn *Conn) Close() error {
|
func (conn *Conn) Close() error {
|
||||||
// Ignore any error in trying to write a Close frame.
|
// Ignore any error in trying to write a Close frame.
|
||||||
_ = conn.Ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
|
_ = conn.ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
|
||||||
return conn.Ws.Close()
|
return conn.ws.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func readLoop(w io.Writer, ws *websocket.Conn) error {
|
||||||
|
for {
|
||||||
|
messageType, r, err := ws.NextReader()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err = io.Copy(w, r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeLoop(ws *websocket.Conn, r io.Reader) error {
|
||||||
|
for {
|
||||||
|
var buf [2048]byte
|
||||||
|
n, err := r.Read(buf[:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
data := buf[:n]
|
||||||
|
w, err := ws.NextWriter(websocket.BinaryMessage)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n, err = w.Write(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = w.Close()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// websocket.Conn methods start returning websocket.CloseError after the
|
||||||
|
// connection has been closed. We want to instead interpret that as io.EOF, just
|
||||||
|
// as you would find with a normal net.Conn. This only converts
|
||||||
|
// websocket.CloseErrors with known codes; other codes like CloseProtocolError
|
||||||
|
// and CloseAbnormalClosure will still be reported as anomalous.
|
||||||
|
func closeErrorToEOF(err error) error {
|
||||||
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new Conn.
|
// Create a new Conn.
|
||||||
func New(ws *websocket.Conn) *Conn {
|
func New(ws *websocket.Conn) *Conn {
|
||||||
var conn Conn
|
// Set up synchronous pipes to serialize reads and writes to the
|
||||||
conn.Ws = ws
|
// underlying websocket.Conn.
|
||||||
return &conn
|
//
|
||||||
|
// https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
|
||||||
|
// "Connections support one concurrent reader and one concurrent writer.
|
||||||
|
// Applications are responsible for ensuring that no more than one
|
||||||
|
// goroutine calls the write methods (NextWriter, etc.) concurrently and
|
||||||
|
// that no more than one goroutine calls the read methods (NextReader,
|
||||||
|
// etc.) concurrently. The Close and WriteControl methods can be called
|
||||||
|
// concurrently with all other methods."
|
||||||
|
pr1, pw1 := io.Pipe()
|
||||||
|
go func() {
|
||||||
|
pw1.CloseWithError(closeErrorToEOF(readLoop(pw1, ws)))
|
||||||
|
}()
|
||||||
|
pr2, pw2 := io.Pipe()
|
||||||
|
go func() {
|
||||||
|
pr2.CloseWithError(closeErrorToEOF(writeLoop(ws, pr2)))
|
||||||
|
}()
|
||||||
|
return &Conn{
|
||||||
|
ws: ws,
|
||||||
|
Reader: pr1,
|
||||||
|
Writer: pw2,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue