结构体Transport

Transport结构定义如下:

type Transport struct {
    //操作空闲连接需要获取锁
    idleMu       sync.Mutex
    //空闲连接池,key为协议目标地址等组合
    idleConn     map[connectMethodKey][]*persistConn // most recently used at end
    //等待空闲连接的队列,基于切片实现,队列大小无限制
    idleConnWait map[connectMethodKey]wantConnQueue  // waiting getConns
    
    //排队等待建立连接需要获取锁
    connsPerHostMu   sync.Mutex
    //每个host建立的连接数
    connsPerHost     map[connectMethodKey]int
    //等待建立连接的队列,同样基于切片实现,队列大小无限制
    connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns
    
    //最大空闲连接数
    MaxIdleConns int
    //每个目标host最大空闲连接数;默认为2(注意默认值)
    MaxIdleConnsPerHost int
    //每个host可建立的最大连接数
    MaxConnsPerHost int
    //连接多少时间没有使用则被关闭
    IdleConnTimeout time.Duration
    
    //禁用长连接,使用短连接
    DisableKeepAlives bool
}

可以看到,连接护着队列,都是一个map结构,而key为协议目标地址等组合,即同一种协议与同一个目标host可建立的连接或者空闲连接是有限制的。

需要特别注意的是,MaxIdleConnsPerHost默认等于2,即与目标主机最多只维护两个空闲连接。这会导致什么呢?

如果遇到突发流量,瞬间建立大量连接,但是回收连接时,由于最大空闲连接数的限制,该联机不能进入空闲连接池,只能直接关闭。结果是,一直新建大量连接,又关闭大量连,业务机器的TIME_WAIT连接数随之突增。

线上有些业务架构是这样的:客户端 ===> LVS ===> Nginx ===> 服务。LVS负载均衡方案采用DR模式,LVS与Nginx配置统一VIP。此时在客户端看来,只有一个IP地址,只有一个Host。上述问题更为明显。

最后,Transport也提供了配置DisableKeepAlives,禁用长连接,使用短连接访问第三方资源或者服务。

连接获取与回收

Transport结构提供下面两个方法实现连接的获取与回收操作。

func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {}
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {}

连接的获取主要分为两步走:1)尝试获取空闲连接;2)尝试新建连接:

//getConn方法内部实现
if delivered := t.queueForIdleConn(w); delivered {
    return pc, nil
}
    
t.queueForDial(w)

当然,可能获取不到连接而需要排队,此时怎么办呢?当前会阻塞当前协程了,直到获取连接为止,或者httpclient超时取消请求:

select {
    case <-w.ready:
        return w.pc, w.err
        
    //超时被取消
    case <-req.Cancel:
        return nil, errRequestCanceledConn
    ……
}
var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection"// TODO: unify?

排队等待空闲连接的逻辑如下:

func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
    //如果配置了空闲超时时间,获取到连接需要检测,超时则关闭连接
    if t.IdleConnTimeout > 0 {
        oldTime = time.Now().Add(-t.IdleConnTimeout)
    }
    
    if list, ok := t.idleConn[w.key]; ok {
        for len(list) > 0 && !stop {
            pconn := list[len(list)-1]
            tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
            //超时了,关闭连接
            if tooOld {
                go pconn.closeConnIfStillIdle()
            }
            
            //分发连接到wantConn
            delivered = w.tryDeliver(pconn, nil)
        }
    }
    
    //排队等待空闲连接
    q := t.idleConnWait[w.key]
    q.pushBack(w)
    t.idleConnWait[w.key] = q
}

排队等待新建连接的逻辑如下:

func (t *Transport) queueForDial(w *wantConn) {
    //如果没有限制最大连接数,直接建立连接
    if t.MaxConnsPerHost <= 0 {
        go t.dialConnFor(w)
        return
    }
    
    //如果没超过连接数限制,直接建立连接
    if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
        go t.dialConnFor(w)
        return
    }
    
    //排队等待连接建立
    q := t.connsPerHostWait[w.key]
    q.pushBack(w)
    t.connsPerHostWait[w.key] = q
}

连接建立完成后,同样会调用tryDeliver分发连接到wantConn,同时关闭通道w.ready,这样主协程纠接触阻塞了。

func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
    w.pc = pc
    close(w.ready)
}

请求处理完成后,通过tryPutIdleConn将连接放回连接池;这时候如果存在等待空闲连接的协程,则需要分发复用该连接。另外,在回收连接时,还需要校验空闲连接数目是否超过限制:

func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
    //禁用长连接;或者最大空闲连接数不合法
    if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
        return errKeepAlivesDisabled
    }
    
    if q, ok := t.idleConnWait[key]; ok {
        //如果等待队列不为空,分发连接
        for q.len() > 0 {
            w := q.popFront()
            if w.tryDeliver(pconn, nil) {
                done = true
                break
            }
        }
    }
    
    //空闲连接数目超过限制,默认为DefaultMaxIdleConnsPerHost=2
    idles := t.idleConn[key]
    if len(idles) >= t.maxIdleConnsPerHost() {
        return errTooManyIdleHost
    }
}

空闲连接超时关闭

限 时 特 惠: 本站每日持续更新海量各大内部创业教程,一年会员只需98元,全站资源免费下载 点击查看详情
站 长 微 信: lzxmw777

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注