Revert "Vendor update"

This reverts commit e5f062ee91.
This commit is contained in:
Morgan Bazalgette
2017-07-25 15:09:02 +02:00
parent e5f062ee91
commit 2535a03c5f
23 changed files with 226 additions and 380 deletions

View File

@@ -2,77 +2,56 @@ package pool
import (
"net"
"sync/atomic"
"time"
"gopkg.in/redis.v5/internal/proto"
)
const defaultBufSize = 4096
var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
Rd *proto.Reader
Wb *proto.WriteBuffer
NetConn net.Conn
Rd *proto.Reader
Wb *proto.WriteBuffer
Inited bool
usedAt atomic.Value
UsedAt time.Time
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
NetConn: netConn,
Wb: proto.NewWriteBuffer(),
UsedAt: time.Now(),
}
cn.Rd = proto.NewReader(cn.netConn)
cn.SetUsedAt(time.Now())
cn.Rd = proto.NewReader(cn.NetConn)
return cn
}
func (cn *Conn) UsedAt() time.Time {
return cn.usedAt.Load().(time.Time)
}
func (cn *Conn) SetUsedAt(tm time.Time) {
cn.usedAt.Store(tm)
}
func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
cn.Rd.Reset(netConn)
}
func (cn *Conn) IsStale(timeout time.Duration) bool {
return timeout > 0 && time.Since(cn.UsedAt()) > timeout
return timeout > 0 && time.Since(cn.UsedAt) > timeout
}
func (cn *Conn) SetReadTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
cn.UsedAt = time.Now()
if timeout > 0 {
return cn.netConn.SetReadDeadline(now.Add(timeout))
return cn.NetConn.SetReadDeadline(cn.UsedAt.Add(timeout))
}
return cn.netConn.SetReadDeadline(noDeadline)
return cn.NetConn.SetReadDeadline(noDeadline)
}
func (cn *Conn) SetWriteTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
cn.UsedAt = time.Now()
if timeout > 0 {
return cn.netConn.SetWriteDeadline(now.Add(timeout))
return cn.NetConn.SetWriteDeadline(cn.UsedAt.Add(timeout))
}
return cn.netConn.SetWriteDeadline(noDeadline)
}
func (cn *Conn) Write(b []byte) (int, error) {
return cn.netConn.Write(b)
}
func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr()
return cn.NetConn.SetWriteDeadline(noDeadline)
}
func (cn *Conn) Close() error {
return cn.netConn.Close()
return cn.NetConn.Close()
}

View File

@@ -19,9 +19,7 @@ var (
var timers = sync.Pool{
New: func() interface{} {
t := time.NewTimer(time.Hour)
t.Stop()
return t
return time.NewTimer(0)
},
}
@@ -43,6 +41,7 @@ type Pooler interface {
FreeLen() int
Stats() *Stats
Close() error
Closed() bool
}
type dialer func() (net.Conn, error)
@@ -97,13 +96,12 @@ func (p *ConnPool) NewConn() (*Conn, error) {
func (p *ConnPool) PopFree() *Conn {
timer := timers.Get().(*time.Timer)
timer.Reset(p.poolTimeout)
if !timer.Reset(p.poolTimeout) {
<-timer.C
}
select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
case <-timer.C:
timers.Put(timer)
@@ -134,20 +132,19 @@ func (p *ConnPool) popFree() *Conn {
// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get() (*Conn, bool, error) {
if p.closed() {
if p.Closed() {
return nil, false, ErrClosed
}
atomic.AddUint32(&p.stats.Requests, 1)
timer := timers.Get().(*time.Timer)
timer.Reset(p.poolTimeout)
if !timer.Reset(p.poolTimeout) {
<-timer.C
}
select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
case <-timer.C:
timers.Put(timer)
@@ -244,7 +241,7 @@ func (p *ConnPool) Stats() *Stats {
}
}
func (p *ConnPool) closed() bool {
func (p *ConnPool) Closed() bool {
return atomic.LoadInt32(&p._closed) == 1
}
@@ -321,7 +318,7 @@ func (p *ConnPool) reaper(frequency time.Duration) {
defer ticker.Stop()
for _ = range ticker.C {
if p.closed() {
if p.Closed() {
break
}
n, err := p.ReapStaleConns()

View File

@@ -12,6 +12,10 @@ func NewSingleConnPool(cn *Conn) *SingleConnPool {
}
}
func (p *SingleConnPool) First() *Conn {
return p.cn
}
func (p *SingleConnPool) Get() (*Conn, bool, error) {
return p.cn, false, nil
}
@@ -45,3 +49,7 @@ func (p *SingleConnPool) Stats() *Stats {
func (p *SingleConnPool) Close() error {
return nil
}
func (p *SingleConnPool) Closed() bool {
return false
}

View File

@@ -11,7 +11,7 @@ type StickyConnPool struct {
cn *Conn
closed bool
mu sync.Mutex
mx sync.Mutex
}
var _ Pooler = (*StickyConnPool)(nil)
@@ -23,9 +23,16 @@ func NewStickyConnPool(pool *ConnPool, reusable bool) *StickyConnPool {
}
}
func (p *StickyConnPool) First() *Conn {
p.mx.Lock()
cn := p.cn
p.mx.Unlock()
return cn
}
func (p *StickyConnPool) Get() (*Conn, bool, error) {
p.mu.Lock()
defer p.mu.Unlock()
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
return nil, false, ErrClosed
@@ -49,12 +56,14 @@ func (p *StickyConnPool) putUpstream() (err error) {
}
func (p *StickyConnPool) Put(cn *Conn) error {
p.mu.Lock()
defer p.mu.Unlock()
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
return ErrClosed
}
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}
@@ -65,19 +74,23 @@ func (p *StickyConnPool) removeUpstream(reason error) error {
}
func (p *StickyConnPool) Remove(cn *Conn, reason error) error {
p.mu.Lock()
defer p.mu.Unlock()
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
return nil
}
if p.cn == nil {
panic("p.cn == nil")
}
if cn != nil && p.cn != cn {
panic("p.cn != cn")
}
return p.removeUpstream(reason)
}
func (p *StickyConnPool) Len() int {
p.mu.Lock()
defer p.mu.Unlock()
defer p.mx.Unlock()
p.mx.Lock()
if p.cn == nil {
return 0
}
@@ -85,9 +98,8 @@ func (p *StickyConnPool) Len() int {
}
func (p *StickyConnPool) FreeLen() int {
p.mu.Lock()
defer p.mu.Unlock()
defer p.mx.Unlock()
p.mx.Lock()
if p.cn == nil {
return 1
}
@@ -99,9 +111,8 @@ func (p *StickyConnPool) Stats() *Stats {
}
func (p *StickyConnPool) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
return ErrClosed
}
@@ -117,3 +128,10 @@ func (p *StickyConnPool) Close() error {
}
return err
}
func (p *StickyConnPool) Closed() bool {
p.mx.Lock()
closed := p.closed
p.mx.Unlock()
return closed
}