222 lines
3.8 KiB
Go
222 lines
3.8 KiB
Go
|
package fasthttputil
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"io"
|
||
|
"net"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// NewPipeConns returns new bi-directonal connection pipe.
|
||
|
func NewPipeConns() *PipeConns {
|
||
|
ch1 := make(chan *byteBuffer, 4)
|
||
|
ch2 := make(chan *byteBuffer, 4)
|
||
|
|
||
|
pc := &PipeConns{
|
||
|
stopCh: make(chan struct{}),
|
||
|
}
|
||
|
pc.c1.rCh = ch1
|
||
|
pc.c1.wCh = ch2
|
||
|
pc.c2.rCh = ch2
|
||
|
pc.c2.wCh = ch1
|
||
|
pc.c1.pc = pc
|
||
|
pc.c2.pc = pc
|
||
|
return pc
|
||
|
}
|
||
|
|
||
|
// PipeConns provides bi-directional connection pipe,
|
||
|
// which use in-process memory as a transport.
|
||
|
//
|
||
|
// PipeConns must be created by calling NewPipeConns.
|
||
|
//
|
||
|
// PipeConns has the following additional features comparing to connections
|
||
|
// returned from net.Pipe():
|
||
|
//
|
||
|
// * It is faster.
|
||
|
// * It buffers Write calls, so there is no need to have concurrent goroutine
|
||
|
// calling Read in order to unblock each Write call.
|
||
|
type PipeConns struct {
|
||
|
c1 pipeConn
|
||
|
c2 pipeConn
|
||
|
stopCh chan struct{}
|
||
|
stopChLock sync.Mutex
|
||
|
}
|
||
|
|
||
|
// Conn1 returns the first end of bi-directional pipe.
|
||
|
//
|
||
|
// Data written to Conn1 may be read from Conn2.
|
||
|
// Data written to Conn2 may be read from Conn1.
|
||
|
func (pc *PipeConns) Conn1() net.Conn {
|
||
|
return &pc.c1
|
||
|
}
|
||
|
|
||
|
// Conn2 returns the second end of bi-directional pipe.
|
||
|
//
|
||
|
// Data written to Conn2 may be read from Conn1.
|
||
|
// Data written to Conn1 may be read from Conn2.
|
||
|
func (pc *PipeConns) Conn2() net.Conn {
|
||
|
return &pc.c2
|
||
|
}
|
||
|
|
||
|
// Close closes pipe connections.
|
||
|
func (pc *PipeConns) Close() error {
|
||
|
pc.stopChLock.Lock()
|
||
|
select {
|
||
|
case <-pc.stopCh:
|
||
|
default:
|
||
|
close(pc.stopCh)
|
||
|
}
|
||
|
pc.stopChLock.Unlock()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type pipeConn struct {
|
||
|
b *byteBuffer
|
||
|
bb []byte
|
||
|
|
||
|
rCh chan *byteBuffer
|
||
|
wCh chan *byteBuffer
|
||
|
pc *PipeConns
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) Write(p []byte) (int, error) {
|
||
|
b := acquireByteBuffer()
|
||
|
b.b = append(b.b[:0], p...)
|
||
|
|
||
|
select {
|
||
|
case <-c.pc.stopCh:
|
||
|
releaseByteBuffer(b)
|
||
|
return 0, errConnectionClosed
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case c.wCh <- b:
|
||
|
default:
|
||
|
select {
|
||
|
case c.wCh <- b:
|
||
|
case <-c.pc.stopCh:
|
||
|
releaseByteBuffer(b)
|
||
|
return 0, errConnectionClosed
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return len(p), nil
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) Read(p []byte) (int, error) {
|
||
|
mayBlock := true
|
||
|
nn := 0
|
||
|
for len(p) > 0 {
|
||
|
n, err := c.read(p, mayBlock)
|
||
|
nn += n
|
||
|
if err != nil {
|
||
|
if !mayBlock && err == errWouldBlock {
|
||
|
err = nil
|
||
|
}
|
||
|
return nn, err
|
||
|
}
|
||
|
p = p[n:]
|
||
|
mayBlock = false
|
||
|
}
|
||
|
|
||
|
return nn, nil
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) read(p []byte, mayBlock bool) (int, error) {
|
||
|
if len(c.bb) == 0 {
|
||
|
if err := c.readNextByteBuffer(mayBlock); err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
}
|
||
|
n := copy(p, c.bb)
|
||
|
c.bb = c.bb[n:]
|
||
|
|
||
|
return n, nil
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) readNextByteBuffer(mayBlock bool) error {
|
||
|
releaseByteBuffer(c.b)
|
||
|
c.b = nil
|
||
|
|
||
|
select {
|
||
|
case c.b = <-c.rCh:
|
||
|
default:
|
||
|
if !mayBlock {
|
||
|
return errWouldBlock
|
||
|
}
|
||
|
select {
|
||
|
case c.b = <-c.rCh:
|
||
|
case <-c.pc.stopCh:
|
||
|
return io.EOF
|
||
|
}
|
||
|
}
|
||
|
|
||
|
c.bb = c.b.b
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
errWouldBlock = errors.New("would block")
|
||
|
errConnectionClosed = errors.New("connection closed")
|
||
|
errNoDeadlines = errors.New("deadline not supported")
|
||
|
)
|
||
|
|
||
|
func (c *pipeConn) Close() error {
|
||
|
return c.pc.Close()
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) LocalAddr() net.Addr {
|
||
|
return pipeAddr(0)
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) RemoteAddr() net.Addr {
|
||
|
return pipeAddr(0)
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) SetDeadline(t time.Time) error {
|
||
|
return errNoDeadlines
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) SetReadDeadline(t time.Time) error {
|
||
|
return c.SetDeadline(t)
|
||
|
}
|
||
|
|
||
|
func (c *pipeConn) SetWriteDeadline(t time.Time) error {
|
||
|
return c.SetDeadline(t)
|
||
|
}
|
||
|
|
||
|
type pipeAddr int
|
||
|
|
||
|
func (pipeAddr) Network() string {
|
||
|
return "pipe"
|
||
|
}
|
||
|
|
||
|
func (pipeAddr) String() string {
|
||
|
return "pipe"
|
||
|
}
|
||
|
|
||
|
type byteBuffer struct {
|
||
|
b []byte
|
||
|
}
|
||
|
|
||
|
func acquireByteBuffer() *byteBuffer {
|
||
|
return byteBufferPool.Get().(*byteBuffer)
|
||
|
}
|
||
|
|
||
|
func releaseByteBuffer(b *byteBuffer) {
|
||
|
if b != nil {
|
||
|
byteBufferPool.Put(b)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var byteBufferPool = &sync.Pool{
|
||
|
New: func() interface{} {
|
||
|
return &byteBuffer{
|
||
|
b: make([]byte, 1024),
|
||
|
}
|
||
|
},
|
||
|
}
|