type Transport struct { idleMu sync.Mutex wantIdle bool// user has requested to close all idle conns // 空闲的连接 缓存的地方 idleConn map[connectMethodKey][]*persistConn // most recently used at end // connectMethodKey => 空闲连接的chan 形成的map // 有空闲连接放入的时候,首先尝试放入这个chan,方便另一个可能需要连接的goroutine直接使用,如果没有goroutine需要连接,就放入到上面的idleConn里面,便于后面的请求连接复用 idleConnCh map[connectMethodKey]chan *persistConn // DisableKeepAlives, if true, disables HTTP keep-alives and // will only use the connection to the server for a single // HTTP request. // // This is unrelated to the similarly named TCP keep-alives. // 是否开启 keepAlive,为true的话,连接不会被复用 DisableKeepAlives bool // MaxIdleConns controls the maximum number of idle (keep-alive) // connections across all hosts. Zero means no limit. // 所有hosts对应的最大的连接总数 MaxIdleConns int // 每一个host对应的最大的空闲连接数 MaxIdleConnsPerHost int // 每一个host对应的最大连接数 MaxConnsPerHost int }
type persistConn struct { // alt optionally specifies the TLS NextProto RoundTripper. // This is used for HTTP/2 today and future protocols later. // If it's non-nil, the rest of the fields are unused. alt RoundTripper
t *Transport cacheKey connectMethodKey conn net.Conn tlsState *tls.ConnectionState br *bufio.Reader // from conn bw *bufio.Writer // to conn nwrite int64// bytes written // roundTrip 往 这个chan 里写入request,readLoop从这个 chan 读取request reqch chan requestAndChan // written by roundTrip; read by readLoop // roundTrip 往 这个chan 里写入request 和 writeErrCh,writeLoop从这个 chan 读取request写入大盘 连接 里,并写入 err 到 writeErrCh chan writech chan writeRequest // written by roundTrip; read by writeLoop closech chanstruct{} // closed when conn closed // 判断body是否读取完 sawEOF bool// whether we've seen EOF from conn; owned by readLoop // writeErrCh passes the request write error (usually nil) // from the writeLoop goroutine to the readLoop which passes // it off to the res.Body reader, which then uses it to decide // whether or not a connection can be reused. Issue 7569. // writeLoop 写入 err的 chan writeErrCh chanerror // writeLoop 结束的时候关闭 writeLoopDone chanstruct{} // closed when write loop ends }
writeRequest
1 2 3 4 5 6 7 8 9
type writeRequest struct { req *transportRequest ch chan<- error
// Optional blocking chan for Expect: 100-continue (for receive). // If not nil, writeLoop blocks sending request body until // it receives from this chan. continueCh <-chanstruct{} }
requestAndChan
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
type requestAndChan struct { req *Request ch chan responseAndError // unbuffered; always send in select on callerGone
// whether the Transport (as opposed to the user client code) // added the Accept-Encoding gzip header. If the Transport // set it, only then do we transparently decode the gzip. addedGzip bool
// Optional blocking chan for Expect: 100-continue (for send). // If the request has an "Expect: 100-continue" header and // the server responds 100 Continue, readLoop send a value // to writeLoop via this chan. continueCh chan<- struct{}
callerGone <-chanstruct{} // closed when roundTrip caller has returned }
var ( deadline = c.deadline() reqs []*Request resp *Response copyHeaders = c.makeHeadersCopier(req) reqBodyClosed = false// have we closed the current req.Body?
// Redirect behavior: redirectMethod string includeBody bool ) // 错误自定义处理,忽略.... for { // 省略无关的代码.... reqs = append(reqs, req) var err error var didTimeout func()bool // 调用 client.send 方法来获取response,主要逻辑 if resp, didTimeout, err = c.send(req, deadline); err != nil { // c.send() always closes req.Body reqBodyClosed = true if !deadline.IsZero() && didTimeout() { err = &httpError{ // TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancelation/ err: err.Error() + " (Client.Timeout exceeded while awaiting headers)", timeout: true, } } returnnil, uerr(err) } // 判断是否需要跳转,进而进一步请求 var shouldRedirect bool redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0]) if !shouldRedirect { return resp, nil }
funcsend(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func()bool, err error) { req := ireq // req is either the original request, or a modified fork
// URL Hader 等判断及请求fork,忽略.... stopTimer, didTimeout := setRequestCancel(req, rt, deadline) // 调用 Transport.RoundTrip 来处理请求 resp, err = rt.RoundTrip(req) if err != nil { stopTimer() if resp != nil { log.Printf("RoundTripper returned a response & error; ignoring response") } if tlsErr, ok := err.(tls.RecordHeaderError); ok { // If we get a bad TLS record header, check to see if the // response looks like HTTP and give a more helpful error. // See golang.org/issue/11111. ifstring(tlsErr.RecordHeader[:]) == "HTTP/" { err = errors.New("http: server gave HTTP response to HTTPS client") } } returnnil, didTimeout, err } if !deadline.IsZero() { resp.Body = &cancelTimerBody{ stop: stopTimer, rc: resp.Body, reqDidTimeout: didTimeout, } } return resp, nil, nil }
for { // 判断context 是否完成,超时等 select { case <-ctx.Done(): req.closeBody() returnnil, ctx.Err() default: }
// treq gets modified by roundTrip, so we need to recreate for each retry. // treq会被 roundTrip 方法修改,所有每一次循环需要创建一个新的 treq := &transportRequest{Request: req, trace: trace} // 根据当前的请求获取 connectMethod,包含schema和address,方便请求的复用,这里不重要,不做详细分析 cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() returnnil, err }
// Get the cached or newly-created connection to either the // host (for http or https), the http proxy, or the http proxy // pre-CONNECTed to https server. In any case, we'll be ready // to send it requests. // 根据请求和connectMethod获取一个可用的连接,重要,后面会具体分析 pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() returnnil, err }
var resp *Response if pconn.alt != nil { // HTTP/2 path. // http2 使用,这里不展开 t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { // 获取response,这里是重点,后面展开 resp, err = pconn.roundTrip(treq) } // 判断获取response是否有误及错误处理等操作,无关紧要,忽略 } }
func(t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) { // trace相关的忽略... req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } // 从idleConn里面获取一个 connectMethod对应的空闲的 连接,获取到了直接返回 if pc, idleSince := t.getIdleConn(cm); pc != nil { if trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(idleSince)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func(error) {}) return pc, nil }
// 开启一个goroutine,去创建一个连接,dialConn 是重点,后面深入分析 gofunc() { pc, err := t.dialConn(ctx, cm) dialc <- dialRes{pc, err} }() // 获取 idleChan中对应connectMethod的 channel idleConnCh := t.getIdleConnCh(cm) // 从多个chan中获取连接,获取取消信号,先来的先处理 select { case v := <-dialc: // 上面 goroutine首先创建完成了一个连接,使用这个链接 // Our dial finished. if v.pc != nil { if trace != nil && trace.GotConn != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } return v.pc, nil } // Our dial failed. See why to return a nicer error // value. t.decHostConnCount(cmKey) select { case <-req.Cancel: // It was an error due to cancelation, so prioritize that // error value. (Issue 16049) returnnil, errRequestCanceledConn case <-req.Context().Done(): returnnil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } returnnil, err default: // It wasn't an error due to cancelation, so // return the original error message: returnnil, v.err } case pc := <-idleConnCh: // 另一个goroutine的request首先完成了,然后会把这个链接首先尝试放入对应connectMethod对应的 chan,如果放入不了,则放入idleConns的map中,进入这里说明,另一个goroutine把连接放入了chan,并被当前goroutine捕获了,那么上面 // go func() { // pc, err := t.dialConn(ctx, cm) // dialc <- dialRes{pc, err} // }() // 生成的连接就暂时没用了,这时候就用到上面 handlePendingDial 定义的方法,去处理这个多余的连接 // Another request finished first and its net.Conn // became available before our dial. Or somebody // else's dial that they didn't use. // But our dial is still going, so give it away // when it finishes: handlePendingDial() if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil case <-req.Cancel: handlePendingDial() returnnil, errRequestCanceledConn case <-req.Context().Done(): handlePendingDial() returnnil, req.Context().Err() case err := <-cancelc: handlePendingDial() if err == errRequestCanceled { err = errRequestCanceledConn } returnnil, err } }
// eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection // back to the idle pool. eofc := make(chanstruct{}) deferclose(eofc) // unblock reader on errors // 省略部分...
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false }
// body为空的处理,忽略....
waitForBodyRead := make(chanbool, 2) body := &bodyEOFSignal{ body: resp.Body, // resp.Body.Close() 的最终调用的函数, Close()影响readLoop 和 writeLoop 两个goroutine 这两个goroutine的关闭,在后面讲close的时候具体介绍 earlyCloseFn: func()error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function returnnil
// Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancelation or death) // 阻塞在这里,等待 请求 body close 或 请求cancel 或 context done 或 pc.closech select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false }
func(pc *persistConn) writeLoop() { deferclose(pc.writeLoopDone) for { select { // 首先通过pc.writech chan 从 persistConn.roundTrip 函数中获取 writeRequest, 可以简单理解为 request case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error // Errors reading from the user's // Request.Body are high priority. // Set it here before sending on the // channels below or calling // pc.close() which tears town // connections and causes other // errors. wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } // 把 err 通过 chan 返回给 persistConn.roundTrip 函数,persistConn.roundTrip 函数判断 err是否为 nil及相应的处理 pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { // 如果 写入请求出现错误,这里关闭,pc.closech chan,readLoop的第151行就会停止阻塞,将alive设为false,进而结束循环,终止 readLoop的goroutine pc.close(err) return } case <-pc.closech: // 这里结束阻塞,是由 readLoop 结束是,调用 第3行的 defer函数,关闭 pc.closech chan 导致的 return } } }
body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func()error { // 向 waiForBody 的chan 中写入 false waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function returnnil
select { case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error // Errors reading from the user's // Request.Body are high priority. // Set it here before sending on the // channels below or calling // pc.close() which tears town // connections and causes other // errors. wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { pc.close(err) return } // 在 上面 readLoop 关闭 pc.closech chan 后,这里就直接return了,循环终止,结束当前goroutine case <-pc.closech: return }