HTTP
超文本传输协议(Hypertext Transfer Protocol、HTTP 协议)是今天使用最广泛的应用层协议,1989 年由 Tim Berners-Lee 在 CERN 起草的协议已经成为了互联网的数据传输的核心1。在过去几年的时间里,HTTP/2 和 HTTP/3 也对现有的协议进行了更新,提供更加安全和快速的传输功能。多数的编程语言都会在标准库中实现 HTTP/1.1 和 HTTP/2.0 已满足工程师的日常开发需求,今天要介绍的 Go 语言的网络库也实现了这两个大版本的 HTTP 协议。
HTTP 协议是应用层协议,在通常情况下我们都会使用 TCP 作为底层的传输层协议传输数据包,但是 HTTP/3 在 UDP 协议上实现了新的传输层协议 QUIC 并使用 QUIC 传输数据,这也意味着 HTTP 既可以跑在 TCP 上,也可以跑在 UDP 上。

Go 语言标准库通过 net/http 包提供 HTTP 的客户端和服务端实现,在分析内部的实现原理之前,我们先来了解一下 HTTP 协议相关的一些设计以及标准库内部的层级结构和模块之间的关系。
请求和响应
HTTP 协议中最常见的概念是 HTTP 请求与响应,我们可以将它们理解成客户端和服务端之间传递的消息,客户端向服务端发送 HTTP 请求,服务端收到 HTTP 请求后会做出计算后以 HTTP 响应的形式发送给客户端。

与其他的二进制协议不同,作为文本传输协议,HTTP 协议的协议头都是文本数据,HTTP 请求头的首行会包含请求的方法、路径和协议版本,接下来是多个 HTTP 协议头以及携带的负载。
1
2
3
4
5
6
7
8
9
10
11
|
GET / HTTP/1.1
User-Agent: Mozilla/4.0 (compatible; MSIE5.01; Windows NT)
Host: draveness.me
Accept-Language: en-us
Accept-Encoding: gzip, deflate
Content-Length: <length>
Connection: Keep-Alive
<html>
...
</html>
|
对于HTTP协议,有一个header值”Connections”, 这个值的作用就是client向server端发请求的时候,告诉server是否要保持连接。具体的可以参考rfc2616。 这个协议头的值有两种可能(参考MDN文档):
1
2
|
Connection: keep-alive
Connection: close
|
当值为keep-alive时,server端会保持连接,一直到连接超时。当值为close时,server端会在传输完response后主动断掉TCP连接。在HTTP/1.1之前,这个值默认是close, 之后是默认keep-alive, 而net/http默认的协议是HTTP/1.1也就是默认keep-alive, 这个值可以通过DisableKeepAlives来设置。
消息边界
HTTP 协议目前主要还是跑在 TCP 协议上的,TCP 协议是面向连接的、可靠的、基于字节流的传输层通信协议,应用层交给 TCP 协议的数据并不会以消息为单位向目的主机传输,这些数据在某些情况下会被组合成一个数据段发送给目标的主机。因为 TCP 协议是基于字节流的,所以基于 TCP 协议的应用层协议都需要自己划分消息的边界。

在应用层协议中,最常见的两种解决方案是基于长度或者基于终结符(Delimiter)。HTTP 协议其实同时实现了上述两种方案,在多数情况下 HTTP 协议都会在协议头中加入 Content-Length 表示负载的长度,消息的接收者解析到该协议头之后就可以确定当前 HTTP 请求/响应结束的位置,分离不同的 HTTP 消息,下面就是一个使用 Content-Length 划分消息边界的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
HTTP/1.1 200 OK
Content-Type: text/html; charset=UTF-8
Content-Length: 138
...
Connection: close
<html>
<head>
<title>An Example Page</title>
</head>
<body>
<p>Hello World, this is a very simple HTML document.</p>
</body>
</html>
|
不过 HTTP 协议除了使用基于长度的方式实现边界,也会使用基于终结符的策略,当 HTTP 使用块传输(Chunked Transfer)机制时,HTTP 头中就不再包含 Content-Length 了,它会使用负载大小为 0 的 HTTP 消息作为终结符表示消息的边界。
层级结构
Go 语言的 net/http 中同时包好了 HTTP 客户端和服务端的实现,为了支持更好的扩展性,它引入了 net/http.RoundTripper 和 net/http.Handler 两个接口。net/http.RoundTripper 是用来表示执行 HTTP 请求的接口,调用方将请求作为参数可以获取请求对应的响应,而 net/http.Handler 主要用于 HTTP 服务器响应客户端的请求:
1
2
3
|
type RoundTripper interface {
RoundTrip(*Request) (*Response, error)
}
|
HTTP 请求的接收方可以实现 net/http.Handler 接口,其中实现了处理 HTTP 请求的逻辑,处理的过程中会调用 net/http.ResponseWriter 接口的方法构造 HTTP 响应,它提供的三个接口 Header、Write 和 WriteHeader 分别会获取 HTTP 响应、将数据写入负载以及写入响应头:
1
2
3
4
5
6
7
8
9
|
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
type ResponseWriter interface {
Header() Header
Write([]byte) (int, error)
WriteHeader(statusCode int)
}
|
客户端和服务端面对的都是双向的 HTTP 请求与响应,客户端构建请求并等待响应,服务端处理请求并返回响应。HTTP 请求和响应在标准库中不止有一种实现,它们都包含了层级结构,标准库中的 net/http.RoundTripper 包含如下所示的层级结构:

每个 net/http.RoundTripper 接口的实现都包含了一种向远程发出请求的过程;标准库中也提供了 net/http.Handler 的多种实现为客户端的 HTTP 请求提供不同的服务。
客户端
http请求示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
func main() {
url := "http://localhost:8080/login?name=zhouwei1&password=123456"
// 1.创建client, 这里使用的默认值
client := http.DefaultClient
// 2.创建请求
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
panic(err)
}
// 3.发送请求
resp, err := client.Do(req)
if err != nil {
panic(err)
}
// 4.关闭
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
fmt.Printf("请求成功, data: %s\n", data)
}
|
http请求流程
- 创建http.Client对象client
- 创建http.Request对象req
- 发送请求client.do(req)
- 关闭resp.Body.Close()
即使直接调用client.Get()或client.Post(), 内部同样创建了request, 且最终总是通过client.Do()方法调用私有的client.do()方法, 执行请求;

HTTP 的客户端中包含几个比较重要的结构体,它们分别是 net/http.Client、net/http.Transport 和 net/http.persistConn:
- net/http.Client 是 HTTP 客户端,它的默认值是使用 net/http.DefaultTransport 的 HTTP 客户端;
- net/http.Transport 是 net/http.RoundTripper 接口的实现,它的主要作用就是支持 HTTP/HTTPS 请求和 HTTP 代理;
- net/http.persistConn 封装了一个 TCP 的持久连接,是我们与远程交换消息的句柄(Handle);
客户端 net/http.Client 是级别较高的抽象,它提供了 HTTP 的一些细节,包括 Cookies 和重定向;而 net/http.Transport 会处理 HTTP/HTTPS 协议的底层实现细节,其中会包含连接重用、构建请求以及发送请求等功能。
Request
net/http.Request 表示 HTTP 服务接收到的请求或者 HTTP 客户端发出的请求,其中包含 HTTP 请求的方法、URL、协议版本、协议头以及请求体等字段,除了这些字段之外,它还会持有一个指向 HTTP 响应的引用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
// A Request represents an HTTP request received by a server
// or to be sent by a client.
//
// The field semantics differ slightly between client and server
// usage. In addition to the notes on the fields below, see the
// documentation for Request.Write and RoundTripper.
type Request struct {
// Method specifies the HTTP method (GET, POST, PUT, etc.).
// For client requests, an empty string means GET.
//
// Go's HTTP client does not support sending a request with
// the CONNECT method. See the documentation on Transport for
// details.
Method string
// URL specifies either the URI being requested (for server
// requests) or the URL to access (for client requests).
//
// For server requests, the URL is parsed from the URI
// supplied on the Request-Line as stored in RequestURI. For
// most requests, fields other than Path and RawQuery will be
// empty. (See RFC 7230, Section 5.3)
//
// For client requests, the URL's Host specifies the server to
// connect to, while the Request's Host field optionally
// specifies the Host header value to send in the HTTP
// request.
URL *url.URL
// The protocol version for incoming server requests.
//
// For client requests, these fields are ignored. The HTTP
// client code always uses either HTTP/1.1 or HTTP/2.
// See the docs on Transport for details.
Proto string // "HTTP/1.0"
ProtoMajor int // 1
ProtoMinor int // 0
// Header contains the request header fields either received
// by the server or to be sent by the client.
//
// If a server received a request with header lines,
//
// Host: example.com
// accept-encoding: gzip, deflate
// Accept-Language: en-us
// fOO: Bar
// foo: two
//
// then
//
// Header = map[string][]string{
// "Accept-Encoding": {"gzip, deflate"},
// "Accept-Language": {"en-us"},
// "Foo": {"Bar", "two"},
// }
//
// For incoming requests, the Host header is promoted to the
// Request.Host field and removed from the Header map.
//
// HTTP defines that header names are case-insensitive. The
// request parser implements this by using CanonicalHeaderKey,
// making the first character and any characters following a
// hyphen uppercase and the rest lowercase.
//
// For client requests, certain headers such as Content-Length
// and Connection are automatically written when needed and
// values in Header may be ignored. See the documentation
// for the Request.Write method.
Header Header
// Body is the request's body.
//
// For client requests, a nil body means the request has no
// body, such as a GET request. The HTTP Client's Transport
// is responsible for calling the Close method.
//
// For server requests, the Request Body is always non-nil
// but will return EOF immediately when no body is present.
// The Server will close the request body. The ServeHTTP
// Handler does not need to.
Body io.ReadCloser
// GetBody defines an optional func to return a new copy of
// Body. It is used for client requests when a redirect requires
// reading the body more than once. Use of GetBody still
// requires setting Body.
//
// For server requests, it is unused.
GetBody func() (io.ReadCloser, error)
// ContentLength records the length of the associated content.
// The value -1 indicates that the length is unknown.
// Values >= 0 indicate that the given number of bytes may
// be read from Body.
//
// For client requests, a value of 0 with a non-nil Body is
// also treated as unknown.
ContentLength int64
// TransferEncoding lists the transfer encodings from outermost to
// innermost. An empty list denotes the "identity" encoding.
// TransferEncoding can usually be ignored; chunked encoding is
// automatically added and removed as necessary when sending and
// receiving requests.
TransferEncoding []string
// Close indicates whether to close the connection after
// replying to this request (for servers) or after sending this
// request and reading its response (for clients).
//
// For server requests, the HTTP server handles this automatically
// and this field is not needed by Handlers.
//
// For client requests, setting this field prevents re-use of
// TCP connections between requests to the same hosts, as if
// Transport.DisableKeepAlives were set.
Close bool
// For server requests, Host specifies the host on which the
// URL is sought. For HTTP/1 (per RFC 7230, section 5.4), this
// is either the value of the "Host" header or the host name
// given in the URL itself. For HTTP/2, it is the value of the
// ":authority" pseudo-header field.
// It may be of the form "host:port". For international domain
// names, Host may be in Punycode or Unicode form. Use
// golang.org/x/net/idna to convert it to either format if
// needed.
// To prevent DNS rebinding attacks, server Handlers should
// validate that the Host header has a value for which the
// Handler considers itself authoritative. The included
// ServeMux supports patterns registered to particular host
// names and thus protects its registered Handlers.
//
// For client requests, Host optionally overrides the Host
// header to send. If empty, the Request.Write method uses
// the value of URL.Host. Host may contain an international
// domain name.
Host string
// Form contains the parsed form data, including both the URL
// field's query parameters and the PATCH, POST, or PUT form data.
// This field is only available after ParseForm is called.
// The HTTP client ignores Form and uses Body instead.
Form url.Values
// PostForm contains the parsed form data from PATCH, POST
// or PUT body parameters.
//
// This field is only available after ParseForm is called.
// The HTTP client ignores PostForm and uses Body instead.
PostForm url.Values
// MultipartForm is the parsed multipart form, including file uploads.
// This field is only available after ParseMultipartForm is called.
// The HTTP client ignores MultipartForm and uses Body instead.
MultipartForm *multipart.Form
// Trailer specifies additional headers that are sent after the request
// body.
//
// For server requests, the Trailer map initially contains only the
// trailer keys, with nil values. (The client declares which trailers it
// will later send.) While the handler is reading from Body, it must
// not reference Trailer. After reading from Body returns EOF, Trailer
// can be read again and will contain non-nil values, if they were sent
// by the client.
//
// For client requests, Trailer must be initialized to a map containing
// the trailer keys to later send. The values may be nil or their final
// values. The ContentLength must be 0 or -1, to send a chunked request.
// After the HTTP request is sent the map values can be updated while
// the request body is read. Once the body returns EOF, the caller must
// not mutate Trailer.
//
// Few HTTP clients, servers, or proxies support HTTP trailers.
Trailer Header
// RemoteAddr allows HTTP servers and other software to record
// the network address that sent the request, usually for
// logging. This field is not filled in by ReadRequest and
// has no defined format. The HTTP server in this package
// sets RemoteAddr to an "IP:port" address before invoking a
// handler.
// This field is ignored by the HTTP client.
RemoteAddr string
// RequestURI is the unmodified request-target of the
// Request-Line (RFC 7230, Section 3.1.1) as sent by the client
// to a server. Usually the URL field should be used instead.
// It is an error to set this field in an HTTP client request.
RequestURI string
// TLS allows HTTP servers and other software to record
// information about the TLS connection on which the request
// was received. This field is not filled in by ReadRequest.
// The HTTP server in this package sets the field for
// TLS-enabled connections before invoking a handler;
// otherwise it leaves the field nil.
// This field is ignored by the HTTP client.
TLS *tls.ConnectionState
// Cancel is an optional channel whose closure indicates that the client
// request should be regarded as canceled. Not all implementations of
// RoundTripper may support Cancel.
//
// For server requests, this field is not applicable.
//
// Deprecated: Set the Request's context with NewRequestWithContext
// instead. If a Request's Cancel field and context are both
// set, it is undefined whether Cancel is respected.
Cancel <-chan struct{}
// Response is the redirect response which caused this request
// to be created. This field is only populated during client
// redirects.
Response *Response
// ctx is either the client or server context. It should only
// be modified via copying the whole Request using WithContext.
// It is unexported to prevent people from using Context wrong
// and mutating the contexts held by callers of the same request.
ctx context.Context
}
|
net/http.NewRequest 是标准库提供的用于创建请求的方法,这个方法会校验 HTTP 请求的字段并根据输入的参数拼装成新的请求结构体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
// NewRequestWithContext returns a new Request given a method, URL, and
// optional body.
//
// If the provided body is also an io.Closer, the returned
// Request.Body is set to body and will be closed by the Client
// methods Do, Post, and PostForm, and Transport.RoundTrip.
//
// NewRequestWithContext returns a Request suitable for use with
// Client.Do or Transport.RoundTrip. To create a request for use with
// testing a Server Handler, either use the NewRequest function in the
// net/http/httptest package, use ReadRequest, or manually update the
// Request fields. For an outgoing client request, the context
// controls the entire lifetime of a request and its response:
// obtaining a connection, sending the request, and reading the
// response headers and body. See the Request type's documentation for
// the difference between inbound and outbound request fields.
//
// If body is of type *bytes.Buffer, *bytes.Reader, or
// *strings.Reader, the returned request's ContentLength is set to its
// exact value (instead of -1), GetBody is populated (so 307 and 308
// redirects can replay the body), and Body is set to NoBody if the
// ContentLength is 0.
func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) {
if method == "" {
// We document that "" means "GET" for Request.Method, and people have
// relied on that from NewRequest, so keep that working.
// We still enforce validMethod for non-empty methods.
method = "GET"
}
if !validMethod(method) {
return nil, fmt.Errorf("net/http: invalid method %q", method)
}
if ctx == nil {
return nil, errors.New("net/http: nil Context")
}
u, err := urlpkg.Parse(url)
if err != nil {
return nil, err
}
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = ioutil.NopCloser(body)
}
// The host's colon:port should be normalized. See Issue 14836.
u.Host = removeEmptyPort(u.Host)
req := &Request{
ctx: ctx,
Method: method,
URL: u,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(Header),
Body: rc,
Host: u.Host,
}
if body != nil {
switch v := body.(type) {
case *bytes.Buffer:
req.ContentLength = int64(v.Len())
buf := v.Bytes()
req.GetBody = func() (io.ReadCloser, error) {
r := bytes.NewReader(buf)
return ioutil.NopCloser(r), nil
}
case *bytes.Reader:
req.ContentLength = int64(v.Len())
snapshot := *v
req.GetBody = func() (io.ReadCloser, error) {
r := snapshot
return ioutil.NopCloser(&r), nil
}
case *strings.Reader:
req.ContentLength = int64(v.Len())
snapshot := *v
req.GetBody = func() (io.ReadCloser, error) {
r := snapshot
return ioutil.NopCloser(&r), nil
}
default:
// This is where we'd set it to -1 (at least
// if body != NoBody) to mean unknown, but
// that broke people during the Go 1.8 testing
// period. People depend on it being 0 I
// guess. Maybe retry later. See Issue 18117.
}
// For client requests, Request.ContentLength of 0
// means either actually 0, or unknown. The only way
// to explicitly say that the ContentLength is zero is
// to set the Body to nil. But turns out too much code
// depends on NewRequest returning a non-nil Body,
// so we use a well-known ReadCloser variable instead
// and have the http package also treat that sentinel
// variable to mean explicitly zero.
if req.GetBody != nil && req.ContentLength == 0 {
req.Body = NoBody
req.GetBody = func() (io.ReadCloser, error) { return NoBody, nil }
}
}
return req, nil
}
|
请求拼装的过程比较简单,它会检查并校验输入的方法、URL 以及负载,然而初始化了新的 net/http.Request 结构,处理负载的过程稍微有一些复杂,我们会根据负载的类型不同,使用不同的方法将它们包装成 io.ReadCloser 类型。
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
type Client struct {
// Transport specifies the mechanism by which individual
// HTTP requests are made.
// If nil, DefaultTransport is used.
Transport RoundTripper
// CheckRedirect specifies the policy for handling redirects.
// If CheckRedirect is not nil, the client calls it before
// following an HTTP redirect. The arguments req and via are
// the upcoming request and the requests made already, oldest
// first. If CheckRedirect returns an error, the Client's Get
// method returns both the previous Response (with its Body
// closed) and CheckRedirect's error (wrapped in a url.Error)
// instead of issuing the Request req.
// As a special case, if CheckRedirect returns ErrUseLastResponse,
// then the most recent response is returned with its body
// unclosed, along with a nil error.
//
// If CheckRedirect is nil, the Client uses its default policy,
// which is to stop after 10 consecutive requests.
CheckRedirect func(req *Request, via []*Request) error
// Jar specifies the cookie jar.
//
// The Jar is used to insert relevant cookies into every
// outbound Request and is updated with the cookie values
// of every inbound Response. The Jar is consulted for every
// redirect that the Client follows.
//
// If Jar is nil, cookies are only sent if they are explicitly
// set on the Request.
Jar CookieJar
// Timeout specifies a time limit for requests made by this
// Client. The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
//
// A Timeout of zero means no timeout.
//
// The Client cancels requests to the underlying Transport
// as if the Request's Context ended.
//
// For compatibility, the Client will also use the deprecated
// CancelRequest method on Transport if found. New
// RoundTripper implementations should use the Request's Context
// for cancellation instead of implementing CancelRequest.
// 它涵盖整个交互过程,从发起连接到接收响应报文结束
Timeout time.Duration
}
|
Do
当我们使用标准库构建了 HTTP 请求之后,会开启 HTTP 事务发送 HTTP 请求并等待远程的响应,
- 参数检查
- 默认值设置
- 多跳请求
- 计算超时时间点deadline
- 调用c.send(req, deadline)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
// Do sends an HTTP request and returns an HTTP response, following
// policy (such as redirects, cookies, auth) as configured on the
// client.
//
// An error is returned if caused by client policy (such as
// CheckRedirect), or failure to speak HTTP (such as a network
// connectivity problem). A non-2xx status code doesn't cause an
// error.
//
// If the returned error is nil, the Response will contain a non-nil
// Body which the user is expected to close. If the Body is not both
// read to EOF and closed, the Client's underlying RoundTripper
// (typically Transport) may not be able to re-use a persistent TCP
// connection to the server for a subsequent "keep-alive" request.
//
// The request Body, if non-nil, will be closed by the underlying
// Transport, even on errors.
//
// On error, any Response can be ignored. A non-nil Response with a
// non-nil error only occurs when CheckRedirect fails, and even then
// the returned Response.Body is already closed.
//
// Generally Get, Post, or PostForm will be used instead of Do.
//
// If the server replies with a redirect, the Client first uses the
// CheckRedirect function to determine whether the redirect should be
// followed. If permitted, a 301, 302, or 303 redirect causes
// subsequent requests to use HTTP method GET
// (or HEAD if the original request was HEAD), with no body.
// A 307 or 308 redirect preserves the original HTTP method and body,
// provided that the Request.GetBody function is defined.
// The NewRequest function automatically sets GetBody for common
// standard library body types.
//
// Any returned error will be of type *url.Error. The url.Error
// value's Timeout method will report true if request timed out or was
// canceled.
func (c *Client) Do(req *Request) (*Response, error) {
return c.do(req)
}
var testHookClientDoResult func(retres *Response, reterr error)
func (c *Client) do(req *Request) (retres *Response, reterr error) {
if testHookClientDoResult != nil {
defer func() { testHookClientDoResult(retres, reterr) }()
}
if req.URL == nil {
req.closeBody()
return nil, &url.Error{
Op: urlErrorOp(req.Method),
Err: errors.New("http: nil Request.URL"),
}
}
var (
deadline = c.deadline()
reqs []*Request
resp *Response
copyHeaders = c.makeHeadersCopier(req)
reqBodyClosed = false // have we closed the current req.Body?
// Redirect behavior:
redirectMethod string
includeBody bool
)
uerr := func(err error) error {
// the body may have been closed already by c.send()
if !reqBodyClosed {
req.closeBody()
}
var urlStr string
if resp != nil && resp.Request != nil {
urlStr = stripPassword(resp.Request.URL)
} else {
urlStr = stripPassword(req.URL)
}
return &url.Error{
Op: urlErrorOp(reqs[0].Method),
URL: urlStr,
Err: err,
}
}
for {
// For all but the first request, create the next
// request hop and replace req.
if len(reqs) > 0 {
loc := resp.Header.Get("Location")
if loc == "" {
resp.closeBody()
return nil, uerr(fmt.Errorf("%d response missing Location header", resp.StatusCode))
}
u, err := req.URL.Parse(loc)
if err != nil {
resp.closeBody()
return nil, uerr(fmt.Errorf("failed to parse Location header %q: %v", loc, err))
}
host := ""
if req.Host != "" && req.Host != req.URL.Host {
// If the caller specified a custom Host header and the
// redirect location is relative, preserve the Host header
// through the redirect. See issue #22233.
if u, _ := url.Parse(loc); u != nil && !u.IsAbs() {
host = req.Host
}
}
ireq := reqs[0]
req = &Request{
Method: redirectMethod,
Response: resp,
URL: u,
Header: make(Header),
Host: host,
Cancel: ireq.Cancel,
ctx: ireq.ctx,
}
if includeBody && ireq.GetBody != nil {
req.Body, err = ireq.GetBody()
if err != nil {
resp.closeBody()
return nil, uerr(err)
}
req.ContentLength = ireq.ContentLength
}
// Copy original headers before setting the Referer,
// in case the user set Referer on their first request.
// If they really want to override, they can do it in
// their CheckRedirect func.
copyHeaders(req)
// Add the Referer header from the most recent
// request URL to the new one, if it's not https->http:
if ref := refererForURL(reqs[len(reqs)-1].URL, req.URL); ref != "" {
req.Header.Set("Referer", ref)
}
err = c.checkRedirect(req, reqs)
// Sentinel error to let users select the
// previous response, without closing its
// body. See Issue 10069.
if err == ErrUseLastResponse {
return resp, nil
}
// Close the previous response's body. But
// read at least some of the body so if it's
// small the underlying TCP connection will be
// re-used. No need to check for errors: if it
// fails, the Transport won't reuse it anyway.
const maxBodySlurpSize = 2 << 10
if resp.ContentLength == -1 || resp.ContentLength <= maxBodySlurpSize {
io.CopyN(ioutil.Discard, resp.Body, maxBodySlurpSize)
}
resp.Body.Close()
if err != nil {
// Special case for Go 1 compatibility: return both the response
// and an error if the CheckRedirect function failed.
// See https://golang.org/issue/3795
// The resp.Body has already been closed.
ue := uerr(err)
ue.(*url.Error).URL = loc
return resp, ue
}
}
reqs = append(reqs, req)
var err error
var didTimeout func() bool
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
// TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancellation/
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}
var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}
req.closeBody()
}
}
|
send
该方法主要实现了:
- Cookie的装载
- Transport对象的获取
- 调用send(req, c.transport(), deadline)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
if c.Jar != nil {
for _, cookie := range c.Jar.Cookies(req.URL) {
req.AddCookie(cookie)
}
}
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
if c.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
c.Jar.SetCookies(req.URL, rc)
}
}
return resp, nil, nil
}
// send issues an HTTP request.
// Caller should close resp.Body when done reading from it.
|
http.send
该方法主要实现了:
- 参数校验: URL, header, RoundTripper
- 超时取消: setRequestCancel(req, rt, deadline)
- 请求事务: rt.RoundTrip(req)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
req := ireq // req is either the original request, or a modified fork
if rt == nil {
req.closeBody()
return nil, alwaysFalse, errors.New("http: no Client.Transport or DefaultTransport")
}
if req.URL == nil {
req.closeBody()
return nil, alwaysFalse, errors.New("http: nil Request.URL")
}
if req.RequestURI != "" {
req.closeBody()
return nil, alwaysFalse, errors.New("http: Request.RequestURI can't be set in client requests")
}
// forkReq forks req into a shallow clone of ireq the first
// time it's called.
forkReq := func() {
if ireq == req {
req = new(Request)
*req = *ireq // shallow clone
}
}
// Most the callers of send (Get, Post, et al) don't need
// Headers, leaving it uninitialized. We guarantee to the
// Transport that this has been initialized, though.
if req.Header == nil {
forkReq()
req.Header = make(Header)
}
if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
username := u.Username()
password, _ := u.Password()
forkReq()
req.Header = cloneOrMakeHeader(ireq.Header)
req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
}
if !deadline.IsZero() {
forkReq()
}
// 请求是否超时的监控
stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
// 真正发送请求
resp, err = rt.RoundTrip(req)
if err != nil {
stopTimer()
if resp != nil {
log.Printf("RoundTripper returned a response & error; ignoring response")
}
if tlsErr, ok := err.(tls.RecordHeaderError); ok {
// If we get a bad TLS record header, check to see if the
// response looks like HTTP and give a more helpful error.
// See golang.org/issue/11111.
if string(tlsErr.RecordHeader[:]) == "HTTP/" {
err = errors.New("http: server gave HTTP response to HTTPS client")
}
}
return nil, didTimeout, err
}
if resp == nil {
return nil, didTimeout, fmt.Errorf("http: RoundTripper implementation (%T) returned a nil *Response with a nil error", rt)
}
if resp.Body == nil {
// The documentation on the Body field says “The http Client and Transport
// guarantee that Body is always non-nil, even on responses without a body
// or responses with a zero-length body.” Unfortunately, we didn't document
// that same constraint for arbitrary RoundTripper implementations, and
// RoundTripper implementations in the wild (mostly in tests) assume that
// they can use a nil Body to mean an empty one (similar to Request.Body).
// (See https://golang.org/issue/38095.)
//
// If the ContentLength allows the Body to be empty, fill in an empty one
// here to ensure that it is non-nil.
if resp.ContentLength > 0 && req.Method != "HEAD" {
return nil, didTimeout, fmt.Errorf("http: RoundTripper implementation (%T) returned a *Response with content length %d but a nil Body", rt, resp.ContentLength)
}
resp.Body = ioutil.NopCloser(strings.NewReader(""))
}
if !deadline.IsZero() {
resp.Body = &cancelTimerBody{
stop: stopTimer,
rc: resp.Body,
reqDidTimeout: didTimeout,
}
}
return resp, nil, nil
}
|
http.setRequestCancel
该方法主要实现了:
创建一个协程利用select chan机制阻塞等待取消请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
// setRequestCancel sets req.Cancel and adds a deadline context to req
// if deadline is non-zero. The RoundTripper's type is used to
// determine whether the legacy CancelRequest behavior should be used.
//
// As background, there are three ways to cancel a request:
// First was Transport.CancelRequest. (deprecated)
// Second was Request.Cancel.
// Third was Request.Context.
// This function populates the second and third, and uses the first if it really needs to.
func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTimer func(), didTimeout func() bool) {
if deadline.IsZero() {
return nop, alwaysFalse
}
knownTransport := knownRoundTripperImpl(rt, req)
oldCtx := req.Context()
if req.Cancel == nil && knownTransport {
// If they already had a Request.Context that's
// expiring sooner, do nothing:
if !timeBeforeContextDeadline(deadline, oldCtx) {
return nop, alwaysFalse
}
var cancelCtx func()
req.ctx, cancelCtx = context.WithDeadline(oldCtx, deadline)
return cancelCtx, func() bool { return time.Now().After(deadline) }
}
initialReqCancel := req.Cancel // the user's original Request.Cancel, if any
var cancelCtx func()
if oldCtx := req.Context(); timeBeforeContextDeadline(deadline, oldCtx) {
req.ctx, cancelCtx = context.WithDeadline(oldCtx, deadline)
}
cancel := make(chan struct{})
req.Cancel = cancel
doCancel := func() {
// The second way in the func comment above:
close(cancel)
// The first way, used only for RoundTripper
// implementations written before Go 1.5 or Go 1.6.
type canceler interface{ CancelRequest(*Request) }
if v, ok := rt.(canceler); ok {
v.CancelRequest(req)
}
}
stopTimerCh := make(chan struct{})
var once sync.Once
stopTimer = func() {
once.Do(func() {
close(stopTimerCh)
if cancelCtx != nil {
cancelCtx()
}
})
}
timer := time.NewTimer(time.Until(deadline))
var timedOut atomicBool
go func() {
select {
case <-initialReqCancel: // 用户传来的取消请求
doCancel()
timer.Stop()
case <-timer.C: // 超时取消请求
timedOut.setTrue()
doCancel()
case <-stopTimerCh:
timer.Stop()
}
}()
return stopTimer, timedOut.isSet
}
|
Transport
经过上面一连串的调用,我们最终来到了标准库实现底层 HTTP 协议的结构体 — net/http.Transport
- Transport用来缓存连接, 以供将来重用, 而不是根据需要创建
- Transport是并发安全的
- Transport仅是用来发送HTTP或HTTPS的低级功能, 像cookie和redirect等高级功能是http.Client实现的
DefaultTransport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// DefaultTransport is the default implementation of Transport and is
// used by DefaultClient. It establishes network connections as needed
// and caches them for reuse by subsequent calls. It uses HTTP proxies
// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and
// $no_proxy) environment variables.
var DefaultTransport RoundTripper = &Transport{
Proxy: ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
|
- net.Dialer.Timeout 限制创建一个TCP连接使用的时间(如果需要一个新的链接)
- net.Dialer.KeepAlive:开启长连接(说明默认http client是默认开启长连接的)。
- http.Transport.MaxIdleConns 最大空闲连接数是100
- http.Transport.IdleConnTimeout IdleConnTimeout它用于控制一个闲置连接在连接池中的保留时间,而不考虑一个客户端请求被阻塞在哪个阶段。
- http.Transport.TLSHandshakeTimeout TLS握手超时时间
- http.Transport.ExpectContinueTimeout:限制客户端在发送一个包含:100-continue的http报文头后,等待收到一个go-ahead响应报文所用的时间。
http client发起请求一般是由Do(req *Request) (*Response, error)方法开始,而真正处理请求分发的是transport的RoundTrip(*Request) (*Response, error)方法。那么Transport 到底应该怎么设置才合理呢?
IdleConn不仅受到MaxIdleConn的限制,也受到MaxIdleConnsPerHost的限制,DefaultTranspor中是没有设置该参数的,而默认的参数为2这是 RFC2616 建议的单个客户端发起的持久连接数,不过在大部分情况下,这个值有是不够用的。
MaxIdleConnsPerHost限制的是相同connectMethodKey(代表着不同的协议 不同的host,也就是不同的请求)到 persistConn 的映射的空闲连接数量.
需要特别注意的是,MaxIdleConnsPerHost默认等于2,即与目标主机最多只维护两个空闲连接。这会导致什么呢?
如果遇到突发流量,瞬间建立大量连接,但是回收连接时,由于最大空闲连接数的限制,该联机不能进入空闲连接池,只能直接关闭。结果是,一直新建大量连接,又关闭大量连,业务机器的TIME_WAIT连接数随之突增。
线上有些业务架构是这样的:客户端 ===> LVS ===> Nginx ===> 服务。LVS负载均衡方案采用DR模式,LVS与Nginx配置统一VIP。此时在客户端看来,只有一个IP地址,只有一个Host。上述问题更为明显。
如果我们想设置为短连接,有几种方法:
- 设置DisableKeepAlives = true: 这时就会发送Connections:close给server端,在server端响应后就会主动关闭连接,导致大量的TIME_WAIT出现,
- 设置MaxIdleConnsPerHost < 0: 当MaxIdleConnsPerHost < 0时,连接池是无法放置空闲连接的,所以无法复用,连接直接会在client端被关闭。这样每次请求后都会由client发起主动关闭连接的请求,server端就不会出现大量的TIME_WAIT.
RoundTripper
transport实现了RoundTripper接口,该接口只有一个方法RoundTrip(),故transport的入口函数就是RoundTrip()。transport的主要功能其实就是缓存了长连接,用于大量http请求场景下的连接复用,减少发送请求时TCP(TLS)连接建立的时间损耗,同时transport还能对连接做一些限制,如连接超时时间,每个host的最大连接数等。transport对长连接的缓存和控制仅限于TCP+(TLS)+HTTP1,不对HTTP2做缓存和限制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// RoundTripper is an interface representing the ability to execute a
// single HTTP transaction, obtaining the Response for a given Request.
//
// A RoundTripper must be safe for concurrent use by multiple
// goroutines.
type RoundTripper interface {
// RoundTrip executes a single HTTP transaction, returning
// a Response for the provided Request.
//
// RoundTrip should not attempt to interpret the response. In
// particular, RoundTrip must return err == nil if it obtained
// a response, regardless of the response's HTTP status code.
// A non-nil err should be reserved for failure to obtain a
// response. Similarly, RoundTrip should not attempt to
// handle higher-level protocol details such as redirects,
// authentication, or cookies.
//
// RoundTrip should not modify the request, except for
// consuming and closing the Request's Body. RoundTrip may
// read fields of the request in a separate goroutine. Callers
// should not mutate or reuse the request until the Response's
// Body has been closed.
//
// RoundTrip must always close the body, including on errors,
// but depending on the implementation may do so in a separate
// goroutine even after RoundTrip returns. This means that
// callers wanting to reuse the body for subsequent requests
// must arrange to wait for the Close call before doing so.
//
// The Request's URL and Header fields must be initialized.
RoundTrip(*Request) (*Response, error)
}
// RoundTrip implements the RoundTripper interface.
//
// For higher-level HTTP client support (such as handling of cookies
// and redirects), see Get, Post, and the Client type.
//
// Like the RoundTripper interface, the error types returned
// by RoundTrip are unspecified.
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
return t.roundTrip(req)
}
|
tranport
tranport包含如下几个主要概念:
- 连接池:在idleConn中保存了不同类型(connectMethodKey)的请求连接(persistConn)。当发生请求时,首先会尝试从连接池中取一条符合其请求类型的连接使用
- readLoop/writeLoop:连接之上的功能,循环处理该类型的请求(发送request,返回response)
- roundTrip:请求的真正入口,接收到一个请求后会交给writeLoop和readLoop处理。
一对readLoop/writeLoop只能处理一条连接,如果这条连接上没有更多的请求,则关闭连接,退出循环,释放系统资源
net/http.Transport 实现了 net/http.RoundTripper 接口,也是整个请求过程中最重要并且最复杂的结构体.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
|
// Transport is an implementation of RoundTripper that supports HTTP,
// HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT).
//
// By default, Transport caches connections for future re-use.
// This may leave many open connections when accessing many hosts.
// This behavior can be managed using Transport's CloseIdleConnections method
// and the MaxIdleConnsPerHost and DisableKeepAlives fields.
//
// Transports should be reused instead of created as needed.
// Transports are safe for concurrent use by multiple goroutines.
//
// A Transport is a low-level primitive for making HTTP and HTTPS requests.
// For high-level functionality, such as cookies and redirects, see Client.
//
// Transport uses HTTP/1.1 for HTTP URLs and either HTTP/1.1 or HTTP/2
// for HTTPS URLs, depending on whether the server supports HTTP/2,
// and how the Transport is configured. The DefaultTransport supports HTTP/2.
// To explicitly enable HTTP/2 on a transport, use golang.org/x/net/http2
// and call ConfigureTransport. See the package docs for more about HTTP/2.
//
// Responses with status codes in the 1xx range are either handled
// automatically (100 expect-continue) or ignored. The one
// exception is HTTP status code 101 (Switching Protocols), which is
// considered a terminal status and returned by RoundTrip. To see the
// ignored 1xx responses, use the httptrace trace package's
// ClientTrace.Got1xxResponse.
//
// Transport only retries a request upon encountering a network error
// if the request is idempotent and either has no body or has its
// Request.GetBody defined. HTTP requests are considered idempotent if
// they have HTTP methods GET, HEAD, OPTIONS, or TRACE; or if their
// Header map contains an "Idempotency-Key" or "X-Idempotency-Key"
// entry. If the idempotency key value is a zero-length slice, the
// request is treated as idempotent but the header is not sent on the
// wire.
type Transport struct {
//操作空闲连接需要获取锁
idleMu sync.Mutex
closeIdle bool // user has requested to close all idle conns
// idle状态的persistConn连接池,最大值受maxIdleConnsPerHost限制
idleConn map[connectMethodKey][]*persistConn // most recently used at end
//等待空闲连接的队列,基于切片实现,队列大小无限制
idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns
// 长度受MaxIdleConns限制,队列方式保存所有idle的pconn
idleLRU connLRU
reqMu sync.Mutex
// 请求取消器(如: 超时取消)
reqCanceler map[cancelKey]func(error)
altMu sync.Mutex // guards changing altProto only
//nil or map[string]RoundTripper,key为URI scheme,表示处理该scheme的RoundTripper实现。注意与TLSNextProto的不同,前者表示URI的scheme,后者表示tls之上的协议。如前者不会体现http2,后者会体现http2
altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme
//排队等待建立连接需要获取锁
connsPerHostMu sync.Mutex
//每个host建立的连接数
connsPerHost map[connectMethodKey]int
//等待建立连接的队列,同样基于切片实现,队列大小无限制
connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns
// Proxy specifies a function to return a proxy for a given
// Request. If the function returns a non-nil error, the
// request is aborted with the provided error.
//
// The proxy type is determined by the URL scheme. "http",
// "https", and "socks5" are supported. If the scheme is empty,
// "http" is assumed.
//
// If Proxy is nil or returns a nil *URL, no proxy is used.
// 为request返回一个代理的url
// Proxy指定一个函数来返回给定Request的代理
// 代理类型由URL scheme确定。支持http, https等。 默认为http
// 如果Proxy为空或返回空的url,则不使用任何代理。
Proxy func(*Request) (*url.URL, error)
// DialContext specifies the dial function for creating unencrypted TCP connections.
// If DialContext is nil (and the deprecated Dial below is also nil),
// then the transport dials using package net.
//
// DialContext runs concurrently with calls to RoundTrip.
// A RoundTrip call that initiates a dial may end up using
// a connection dialed previously when the earlier connection
// becomes idle before the later DialContext completes.
// 创建未加密的tcp连接,比Dial函数增加了context控制
// DialContext指定用于创建未加密的TCP连接的拨号功能。
// 如果DialContext为nil(并且下面不建议使用的Dial也为nil),则传输使用程序包net进行拨号。
// DialContext与RoundTrip的调用同时运行。
// 当较早的连接在以后的DialContext完成之前处于空闲状态时,
// 发起拨号的RoundTrip调用可能会使用先前拨打的连接结束。
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
// Dial specifies the dial function for creating unencrypted TCP connections.
//
// Dial runs concurrently with calls to RoundTrip.
// A RoundTrip call that initiates a dial may end up using
// a connection dialed previously when the earlier connection
// becomes idle before the later Dial completes.
//
// Deprecated: Use DialContext instead, which allows the transport
// to cancel dials as soon as they are no longer needed.
// If both are set, DialContext takes priority.
// 创建未加密的tcp连接,废弃,使用DialContext
// Dial指定用于创建未加密的TCP连接的拨号功能。
// 拨号与RoundTrip的呼叫同时运行。
// 当较早的连接在之后的拨号完成之前变为空闲时,发起拨号的RoundTrip呼叫可能会使用先前拨打的连接结束。
// 不推荐使用:改用DialContext,它使传输器在不再需要拨号时立即取消它们。
// 如果两者都设置,则DialContext优先。
Dial func(network, addr string) (net.Conn, error)
// DialTLSContext specifies an optional dial function for creating
// TLS connections for non-proxied HTTPS requests.
//
// If DialTLSContext is nil (and the deprecated DialTLS below is also nil),
// DialContext and TLSClientConfig are used.
//
// If DialTLSContext is set, the Dial and DialContext hooks are not used for HTTPS
// requests and the TLSClientConfig and TLSHandshakeTimeout
// are ignored. The returned net.Conn is assumed to already be
// past the TLS handshake.
DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
// DialTLS specifies an optional dial function for creating
// TLS connections for non-proxied HTTPS requests.
//
// Deprecated: Use DialTLSContext instead, which allows the transport
// to cancel dials as soon as they are no longer needed.
// If both are set, DialTLSContext takes priority.
// 为非代理模式的https创建连接的函数,如果该函数非空,则不会使用Dial函数,且忽略TLSClientConfig和TLSHandshakeTimeout;反之使用Dila和TLSClientConfig。即有限使用DialTLS进行tls协商
// DialTLS指定用于为非代理HTTPS请求创建TLS连接的可选拨号功能。
// 如果DialTLS为nil,则使用Dial和TLSClientConfig。
// 如果设置了DialTLS,则Dial Hook不用于HTTPS请求,
// 并且TLSClientConfig和TLSHandshakeTimeout将被忽略。
// 假定返回的net.Conn已通过TLS握手。
DialTLS func(network, addr string) (net.Conn, error)
// TLSClientConfig specifies the TLS configuration to use with
// tls.Client.
// If nil, the default configuration is used.
// If non-nil, HTTP/2 support may not be enabled by default.
// tls client用于tls协商的配置
// TLSClientConfig指定要与tls.Client一起使用的TLS配置。
// 如果为nil,则使用默认配置。
// 如果为非nil,则默认情况下可能不会启用HTTP / 2支持。
TLSClientConfig *tls.Config
// TLSHandshakeTimeout specifies the maximum amount of time waiting to
// wait for a TLS handshake. Zero means no timeout.
// 限制TLS握手使用的时间
TLSHandshakeTimeout time.Duration
// DisableKeepAlives, if true, disables HTTP keep-alives and
// will only use the connection to the server for a single
// HTTP request.
//
// This is unrelated to the similarly named TCP keep-alives.
// 是否取消长连接
// 默认为false,如果设为true,那么所有连接复用的优化选项都无效
// true: 将禁用HTTP保持活动状态,并且仅将与服务器的连接用于单个HTTP请求。
// 这与类似命名的TCP保持活动无关。
DisableKeepAlives bool
// DisableCompression, if true, prevents the Transport from
// requesting compression with an "Accept-Encoding: gzip"
// request header when the Request contains no existing
// Accept-Encoding value. If the Transport requests gzip on
// its own and gets a gzipped response, it's transparently
// decoded in the Response.Body. However, if the user
// explicitly requested gzip it is not automatically
// uncompressed.
// 是否取消HTTP压缩
// true: 当请求不包含现有的Accept-Encoding值时,
// 阻止传输使用“ Accept-Encoding:gzip”请求标头请求压缩。
// 如果传输本身请求gzip并获得gzip压缩的响应,则会在Response.Body中对其进行透明解码。
// 但是,如果用户明确请求gzip,则不会自动将其解压缩。
DisableCompression bool
// MaxIdleConns controls the maximum number of idle (keep-alive)
// connections across all hosts. Zero means no limit.
// 所有host的idle状态的最大连接数目,即idleConn中所有连接数
// 最大空闲连接数,该Transport可以维护最大这么多的空闲连接,用于连接复用, 为0时表示无限制
// 表示连接池对所有host的最大链接数量,host也即dest-ip,默认为无穷大(0),但是通常情况下为了性能考虑都要严格限制该数目(实际使用中通常利用压测 二分得到该参数的最佳近似值)。 太大容易导致客户端和服务端的socket数量剧增,导致内存吃满,文件描述符不足等问题;太小则限制了连接池的socket数量,资源利用率较低。
// MaxIdleConns控制所有主机之间的最大空闲(保持活动)连接数。 零表示无限制。
MaxIdleConns int
// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
// (keep-alive) connections to keep per-host. If zero,
// DefaultMaxIdleConnsPerHost is used.
// 每个host的idle状态的最大连接数目,即idleConn中的key对应的连接数
// 每个目标host最大空闲连接数;默认为2(注意默认值)
// 如果客户端只需要访问一个host,那么最好将MaxIdleConnsPerHost与MaxIdleConns设置为相同,这样逻辑更加清晰。
// MaxIdleConnsPerHost控制最大空闲(保持活动)连接以保留每个主机。
// 如果为零,则使用DefaultMaxIdleConnsPerHost=2。
MaxIdleConnsPerHost int
// MaxConnsPerHost optionally limits the total number of
// connections per host, including connections in the dialing,
// active, and idle states. On limit violation, dials will block.
//
// Zero means no limit.
// 每个host上的最大连接数目,含dialing/active/idle状态的connections。http2时,每个host只允许有一条idle的conneciton
// MaxConnsPerHost可以选择限制每个主机的连接总数,包括处于拨号,活动和空闲状态的连接。
// 超出限制时,拨号将阻塞。
// 零表示无限制。
// 对于HTTP / 2,当前仅控制一次创建的新连接数,而不是总数。
// 实际上,使用HTTP / 2的主机只有大约一个空闲连接。
MaxConnsPerHost int
// IdleConnTimeout is the maximum amount of time an idle
// (keep-alive) connection will remain idle before closing
// itself.
// Zero means no limit.
// 连接保持idle状态的最大时间,超时关闭pconn
// 空闲timeout设置,也即socket在该时间内没有交互则自动关闭连接(注意:该timeout起点是从每次空闲开始计时,若有交互则重置为0),该参数通常设置为分钟级别,例如:90秒。
IdleConnTimeout time.Duration
// ResponseHeaderTimeout, if non-zero, specifies the amount of
// time to wait for a server's response headers after fully
// writing the request (including its body, if any). This
// time does not include the time to read the response body.
// 限制读取响应报文头使用的时间
ResponseHeaderTimeout time.Duration
// ExpectContinueTimeout, if non-zero, specifies the amount of
// time to wait for a server's first response headers after fully
// writing the request headers if the request has an
// "Expect: 100-continue" header. Zero means no timeout and
// causes the body to be sent immediately, without
// waiting for the server to approve.
// This time does not include the time to send the request header.
// 等待服务器的第一个响应headers的时间,0表示没有超时,则body会立刻发送,无需等待服务器批准,这个时间不包括发送请求header的时间
//(如果非零)指定如果请求具有“期望:100-连续”标头,
// 则在完全写入请求标头之后等待服务器的第一个响应标头的时间。
// 零表示没有超时,并导致正文立即发送,而无需等待服务器批准。
// 此时间不包括发送请求标头的时间。
ExpectContinueTimeout time.Duration
// TLSNextProto specifies how the Transport switches to an
// alternate protocol (such as HTTP/2) after a TLS ALPN
// protocol negotiation. If Transport dials an TLS connection
// with a non-empty protocol name and TLSNextProto contains a
// map entry for that key (such as "h2"), then the func is
// called with the request's authority (such as "example.com"
// or "example.com:1234") and the TLS connection. The function
// must return a RoundTripper that then handles the request.
// If TLSNextProto is not nil, HTTP/2 support is not enabled
// automatically.
// RoundTripper 在tls协商带NPN/ALPN的扩展后,transport如何切换到其他协议。指tls之上的协议(next指的就是tls之上的意思)
// TLSNextProto指定在TLS NPN / ALPN协议协商之后,传输方式如何切换到备用协议(例如HTTP / 2)。
// 如果传输使用非空协议名称拨打TLS连接,并且TLSNextProto包含该键的映射条目(例如“ h2”),
// 则将以请求的权限(例如“ example.com”或“ example .com:1234“)和TLS连接。
// 该函数必须返回RoundTripper,然后再处理请求。
// 如果TLSNextProto不为nil,则不会自动启用HTTP / 2支持。
TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
// ProxyConnectHeader optionally specifies headers to send to
// proxies during CONNECT requests.
// 可以选择指定在CONNECT请求期间发送到代理的header。
ProxyConnectHeader Header
// MaxResponseHeaderBytes specifies a limit on how many
// response bytes are allowed in the server's response
// header.
//
// Zero means to use a default limit.
// 指定对服务器的响应标头中允许的响应字节数的限制。
// 零表示使用默认限制。
MaxResponseHeaderBytes int64
// WriteBufferSize specifies the size of the write buffer used
// when writing to the transport.
// If zero, a default (currently 4KB) is used.
WriteBufferSize int
// ReadBufferSize specifies the size of the read buffer used
// when reading from the transport.
// If zero, a default (currently 4KB) is used.
ReadBufferSize int
// nextProtoOnce guards initialization of TLSNextProto and
// h2transport (via onceSetNextProtoDefaults)
// nextProtoOnce防止TLSNextProto和h2transport的初始化(通过OnceSetNextProtoDefaults)
nextProtoOnce sync.Once
// 如果http2已连接,则为非null
h2transport h2Transport // non-nil if http2 wired up
tlsNextProtoWasNil bool // whether TLSNextProto was nil when the Once fired
// ForceAttemptHTTP2 controls whether HTTP/2 is enabled when a non-zero
// Dial, DialTLS, or DialContext func or TLSClientConfig is provided.
// By default, use of any those fields conservatively disables HTTP/2.
// To use a custom dialer or TLS config and still attempt HTTP/2
// upgrades, set this to true.
ForceAttemptHTTP2 bool
}
|
connectMethodKey
connectMethodKey的值就是client连接的server的host值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// connectMethodKey is the map key version of connectMethod, with a
// stringified proxy URL (or the empty string) instead of a pointer to
// a URL.
type connectMethodKey struct {
proxy, scheme, addr string
onlyH1 bool
}
func (k connectMethodKey) String() string {
// Only used by tests.
var h1 string
if k.onlyH1 {
h1 = ",h1"
}
return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
}
|
roundTrip
我们可以在标准库的 net/http.Transport 中调用 net/http.Transport.RegisterProtocol 为不同的协议注册 net/http.RoundTripper 的实现,在下面的这段代码中就会根据 URL 中的协议选择对应的实现来替代默认的逻辑:
在默认情况下,我们都会使用 net/http.persistConn 持久连接处理 HTTP 请求,该方法会先获取用于发送请求的连接,随后调用 net/http.persistConn.roundTrip:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// RegisterProtocol registers a new protocol with scheme.
// The Transport will pass requests using the given scheme to rt.
// It is rt's responsibility to simulate HTTP request semantics.
//
// RegisterProtocol can be used by other packages to provide
// implementations of protocol schemes like "ftp" or "file".
//
// If rt.RoundTrip returns ErrSkipAltProtocol, the Transport will
// handle the RoundTrip itself for that one request, as if the
// protocol were not registered.
func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
t.altMu.Lock()
defer t.altMu.Unlock()
oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
if _, exists := oldMap[scheme]; exists {
panic("protocol " + scheme + " already registered")
}
newMap := make(map[string]RoundTripper)
for k, v := range oldMap {
newMap[k] = v
}
newMap[scheme] = rt
t.altProto.Store(newMap)
}
|
Transport.roundTrip是主入口,它通过传入一个request参数,由此选择一个合适的长连接来发送该request并返回response。整个流程主要分为两步:
使用getConn函数来获得底层TCP(TLS)连接;调用roundTrip函数进行上层协议(HTTP)处理。
该方法主要实现了
- 参数校验: scheme, host, method, protocol…
- 获取缓存的或新建的连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
}
if req.Header == nil {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
scheme := req.URL.Scheme
isHTTP := scheme == "http" || scheme == "https"
// 下面判断request首部的有效性
if isHTTP {
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid header field name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
}
}
}
}
origReq := req
cancelKey := cancelKey{origReq}
req = setupRewindBody(req)
// 判断是否使用注册的RoundTrip来处理对应的scheme。对于使用tcp+tls+http1(wss协议升级)的场景
// 不能使用注册的roundTrip。后续代码对tcp+tls+http1或tcp+http1进行了roundTrip处理
if altRT := t.alternateRoundTripper(req); altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
var err error
req, err = rewindBody(req)
if err != nil {
return nil, err
}
}
// 后续仅处理URL scheme为http或https的连接
if !isHTTP {
req.closeBody()
return nil, badStringError("unsupported protocol scheme", scheme)
}
if req.Method != "" && !validMethod(req.Method) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
}
if req.URL.Host == "" {
req.closeBody()
return nil, errors.New("http: no Host in request URL")
}
// 下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled)
// 的情况是不会进行重试的。具体参见shouldRetryRequest函数
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// treq gets modified by roundTrip, so we need to recreate for each retry.
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
// connectMethodForRequest函数通过输入一个request返回一个connectMethod(简称cm),该类型通过
// {proxyURL,targetScheme,tartgetAddr,onlyH1},即{代理URL,server端的scheme,server的地址,是否HTTP1}
// 来表示一个请求。一个符合connectMethod描述的request将会在Transport.idleConn中匹配到一类长连接。
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// Get the cached or newly-created connection to either the
// host (for http or https), the http proxy, or the http proxy
// pre-CONNECTed to https server. In any case, we'll be ready
// to send it requests.
// 获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。该连接可能是HTTP2格式的,存放在persistCnn.alt中,
// 使用其自注册的RoundTrip处理。该函数描述参见下面内容。
// 从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。
pconn, err := t.getConn(treq, cm)
// 如果获取底层连接失败,无法继续上层协议的请求,直接返回错误
if err != nil {
// 每个request都会在getConn中设置reqCanceler,获取连接失败,清空设置
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}
var resp *Response
// pconn.alt就是从Transport.TLSNextProto中获取的,它表示TLS之上的协议,如HTTP2。从persistConn.alt的注释中可以看出
// 目前alt仅支持HTTP2协议,后续可能会支持更多协议。
if pconn.alt != nil {
// HTTP/2 path.
// 清除getConn中设置的标记。具体参见getConn
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
// pconn.roundTrip中做了比较复杂的处理,该函数用于发送request并返回response。
// 通过writeLoop发送request,通过readLoop返回response
resp, err = pconn.roundTrip(treq)
}
// 如果成功返回response,则整个处理结束.
if err == nil {
resp.Request = origReq
return resp, nil
}
// Failed. Clean up and determine whether to retry.
if http2isNoCachedConnError(err) {
if t.removeIdleConn(pconn) {
t.decConnsPerHost(pconn.cacheKey)
}
} else if !pconn.shouldRetryRequest(req, err) {
// Issue 16465: return underlying net.Conn.Read error from peek,
// as we've historically done.
if e, ok := err.(transportReadFromServerError); ok {
err = e.err
}
return nil, err
}
testHookRoundTripRetried()
// Rewind the body if we're able to.
req, err = rewindBody(req)
if err != nil {
return nil, err
}
}
}
func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
cm.targetScheme = treq.URL.Scheme
cm.targetAddr = canonicalAddr(treq.URL)
if t.Proxy != nil {
cm.proxyURL, err = t.Proxy(treq.Request)
}
cm.onlyH1 = treq.requiresHTTP1()
return cm, err
}
|
persistConn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
// persistConn wraps a connection, usually a persistent one
// (but may be used for non-keep-alive requests as well)
type persistConn struct {
// alt optionally specifies the TLS NextProto RoundTripper.
// This is used for HTTP/2 today and future protocols later.
// If it's non-nil, the rest of the fields are unused.
alt RoundTripper
t *Transport
cacheKey connectMethodKey
conn net.Conn
tlsState *tls.ConnectionState
br *bufio.Reader // from conn
bw *bufio.Writer // to conn
nwrite int64 // bytes written
reqch chan requestAndChan // written by roundTrip; read by readLoop
writech chan writeRequest // written by roundTrip; read by writeLoop
closech chan struct{} // closed when conn closed
isProxy bool
sawEOF bool // whether we've seen EOF from conn; owned by readLoop
readLimit int64 // bytes allowed to be read; owned by readLoop
// writeErrCh passes the request write error (usually nil)
// from the writeLoop goroutine to the readLoop which passes
// it off to the res.Body reader, which then uses it to decide
// whether or not a connection can be reused. Issue 7569.
writeErrCh chan error
writeLoopDone chan struct{} // closed when write loop ends
// Both guarded by Transport.idleMu:
idleAt time.Time // time it last become idle
idleTimer *time.Timer // holding an AfterFunc to close it
mu sync.Mutex // guards following fields
numExpectedResponses int
closed error // set non-nil when conn is closed, before closech is closed
canceledErr error // set non-nil if conn is canceled
broken bool // an error has happened on this connection; marked broken so it's not reused.
reused bool // whether conn has had successful request/response and is being reused.
// mutateHeaderFunc is an optional func to modify extra
// headers on each outbound request before it's written. (the
// original Request given to RoundTrip is not modified)
mutateHeaderFunc func(Header)
}
|
getConn
getConn用于返回一条长连接。长连接的来源有2种路径:
- 调用 net/http.Transport.queueForIdleConn 在队列中等待闲置的连接;
- 调用 net/http.Transport.queueForDial 在队列中等待建立新的连接;

连接是一种相对比较昂贵的资源,如果在每次发出 HTTP 请求之前都建立新的连接,可能会消耗比较多的时间,带来较大的额外开销,通过连接池对资源进行分配和复用可以有效地提高 HTTP 请求的整体性能,多数的网络库客户端都会采取类似的策略来复用资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS. If this doesn't return an error, the persistConn
// is ready to write requests to.
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()
// Queue for idle connection.
// 从连接池中找一条合适的连接,如果找到则返回该连接,否则新建连接
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
// Trace only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
// 此处设置transport.reqCanceler比较难理解,主要功能是做一个标记,用于判断当前到执行pconn.roundTrip
// 期间,request有没有被(如Request.Cancel,Request.Context().Done())取消,被取消的request将无需继续roundTrip处理
t.setReqCanceler(treq.cancelKey, func(error) {})
return pc, nil
}
cancelc := make(chan error, 1)
t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
// Queue for permission to dial.
t.queueForDial(w)
// Wait for completion or cancellation.
select {
case <-w.ready:
// Trace success but only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
}
if w.err != nil {
// If the request has been cancelled, that's probably
// what caused w.err; if so, prefer to return the
// cancellation error (see golang.org/issue/16049).
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// return below
}
}
return w.pc, w.err
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
|
queueForIdleConn
从连接池中找一条合适的连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
// queueForIdleConn queues w to receive the next idle connection for w.cm.
// As an optimization hint to the caller, queueForIdleConn reports whether
// it successfully delivered an already-idle connection.
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
// 可以看出当不使用长连接时,Transport不从连接池读取
if t.DisableKeepAlives {
return false
}
t.idleMu.Lock()
defer t.idleMu.Unlock()
// Stop closing connections that become idle - we might want one.
// (That is, undo the effect of t.CloseIdleConnections.)
t.closeIdle = false
if w == nil {
// Happens in test hook.
return false
}
// If IdleConnTimeout is set, calculate the oldest
// persistConn.idleAt time we're willing to use a cached idle
// conn.
var oldTime time.Time
// 如果配置了空闲超时时间,获取到连接需要检测,超时则关闭连接
if t.IdleConnTimeout > 0 {
oldTime = time.Now().Add(-t.IdleConnTimeout)
}
// Look for most recently-used idle connection.
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {
pconn := list[len(list)-1]
// See whether this connection has been idle too long, considering
// only the wall time (the Round(0)), in case this is a laptop or VM
// coming out of suspend with previously cached idle connections.
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
//超时了,关闭连接
if tooOld {
// Async cleanup. Launch in its own goroutine (as if a
// time.AfterFunc called it); it acquires idleMu, which we're
// holding, and does a synchronous net.Conn.Close.
go pconn.closeConnIfStillIdle()
}
if pconn.isBroken() || tooOld {
// If either persistConn.readLoop has marked the connection
// broken, but Transport.removeIdleConn has not yet removed it
// from the idle list, or if this persistConn is too old (it was
// idle too long), then ignore it and look for another. In both
// cases it's already in the process of being closed.
list = list[:len(list)-1]
continue
}
//分发连接到wantConn
delivered = w.tryDeliver(pconn, nil)
if delivered {
if pconn.alt != nil {
// HTTP/2: multiple clients can share pconn.
// Leave it in the list.
} else {
// HTTP/1: only one client can use pconn.
// Remove it from the list.
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
stop = true
}
if len(list) > 0 {
t.idleConn[w.key] = list
} else {
delete(t.idleConn, w.key)
}
if stop {
return delivered
}
}
// Register to receive next connection that becomes idle.
if t.idleConnWait == nil {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.idleConnWait[w.key]
q.cleanFront()
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
}
|
queueForDial
当我们调用 net/http.Transport.queueForDial 尝试与远程建立连接时,标准库会在内部启动新的 Goroutine 执行 net/http.Transport.dialConnFor 用于建连,从最终调用的 net/http.Transport.dialConn 中我们能找到 TCP 连接和 net 库的身影:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// queueForDial queues w to wait for permission to begin dialing.
// Once w receives permission to dial, it will do so in a separate goroutine.
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
//如果没有限制最大连接数,直接建立连接
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}
t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()
//如果没超过连接数限制,直接建立连接
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
//排队等待连接建立
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
// dialConnFor dials on behalf of w and delivers the result to w.
// dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()].
// If the dial is cancelled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()].
func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()
pc, err := t.dialConn(w.ctx, w.cm)
delivered := w.tryDeliver(pc, err)
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc)
}
if err != nil {
t.decConnsPerHost(w.key)
}
}
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
if cm.proxyURL != nil {
// Return a typed error, per Issue 16997
return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
}
return err
}
// 调用注册的DialTLS处理tls。使用自注册的TLS处理函数时,transport的TLSClientConfig和TLSHandshakeTimeout
// 参数会被忽略
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
var err error
// 调用注册的连接函数创建一条连接,注意cm.addr()的实现,如果该连接存在proxy,则此处是与proxy建立TLS连接;否则直接连server。
// 存在proxy时,与server建立连接分为2步:与proxy建立TLP(TLS)连接;与server建立HTTP(HTTPS)连接
pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
// 如果连接类型是TLS的,则需要处理TLS协商
if tc, ok := pconn.conn.(*tls.Conn); ok {
// Handshake here, in case DialTLS didn't. TLSNextProto below
// depends on it for knowing the connection state.
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
// 启动TLS协商,如果协商失败需要关闭连接
if err := tc.Handshake(); err != nil {
go pconn.conn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return nil, err
}
cs := tc.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
// 保存TLS协商结果
pconn.tlsState = &cs
}
} else {
// 使用默认方式创建连接,此时会用到transport的TLSClientConfig和TLSHandshakeTimeout参数。同样注意cm.addr()
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
// 如果scheme是需要TLS协商的,则处理TLS协商,否则为普通的HTTP连接
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
// 进行TLS协商,具体参见下文addTLS
if err = pconn.addTLS(firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
// Proxy setup.
// 处理proxy的情况
switch {
// 不存在proxy 直接跳过
case cm.proxyURL == nil:
// Do nothing. Not using a proxy.
case cm.proxyURL.Scheme == "socks5":
conn := pconn.conn
d := socksNewDialer("tcp", conn.RemoteAddr().String())
if u := cm.proxyURL.User; u != nil {
auth := &socksUsernamePassword{
Username: u.Username(),
}
auth.Password, _ = u.Password()
d.AuthMethods = []socksAuthMethod{
socksAuthMethodNotRequired,
socksAuthMethodUsernamePassword,
}
d.Authenticate = auth.Authenticate
}
if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
conn.Close()
return nil, err
}
// 如果存在proxy,且server的scheme为"http",如果需要代理认证,则设置认证信息
case cm.targetScheme == "http":
pconn.isProxy = true
if pa := cm.proxyAuth(); pa != "" {
pconn.mutateHeaderFunc = func(h Header) {
h.Set("Proxy-Authorization", pa)
}
}
// 如果存在proxy,且server的scheme为"https"。与"http"不同,在与server进行tls协商前,会给proxy
// 发送一个method为"CONNECT"的HTTP请求,如果请求通过(返回200),则可以继续与server进行TLS协商
case cm.targetScheme == "https":
// 该conn表示与proxy建立的连接
conn := pconn.conn
hdr := t.ProxyConnectHeader
if hdr == nil {
hdr = make(Header)
}
if pa := cm.proxyAuth(); pa != "" {
hdr = hdr.Clone()
hdr.Set("Proxy-Authorization", pa)
}
connectReq := &Request{
Method: "CONNECT",
URL: &url.URL{Opaque: cm.targetAddr},
Host: cm.targetAddr,
Header: hdr,
}
// If there's no done channel (no deadline or cancellation
// from the caller possible), at least set some (long)
// timeout here. This will make sure we don't block forever
// and leak a goroutine if the connection stops replying
// after the TCP connect.
connectCtx := ctx
if ctx.Done() == nil {
newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
connectCtx = newCtx
}
didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails
var (
resp *Response
err error // write or read error
)
// Write the CONNECT request & read the response.
go func() {
defer close(didReadResponse)
err = connectReq.Write(conn)
if err != nil {
return
}
// Okay to use and discard buffered reader here, because
// TLS server will not speak until spoken to.
br := bufio.NewReader(conn)
resp, err = ReadResponse(br, connectReq)
}()
select {
case <-connectCtx.Done():
conn.Close()
<-didReadResponse
return nil, connectCtx.Err()
case <-didReadResponse:
// resp or err now set
}
if err != nil {
conn.Close()
return nil, err
}
// proxy返回非200,表示无法建立连接,可能情况如proxy认证失败
if resp.StatusCode != 200 {
f := strings.SplitN(resp.Status, " ", 2)
conn.Close()
if len(f) < 2 {
return nil, errors.New("unknown status code")
}
return nil, errors.New(f[1])
}
}
// 与proxy建立连接后,再与server进行TLS协商
if cm.proxyURL != nil && cm.targetScheme == "https" {
if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
return nil, err
}
}
// 后续进行TLS之上的协议处理,如果TLS之上的协议为注册协议,则使用注册的roundTrip进行处理
// TLS之上的协议为TLS协商过程中使用NPN/ALPN扩展协商出的协议,如HTTP2(参见golang.org/x/net/http2)
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
if e, ok := alt.(http2erringRoundTripper); ok {
// pconn.conn was closed by next (http2configureTransport.upgradeFn).
return nil, e.err
}
return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
}
}
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
// 创建读写通道,writeLoop用于发送request,readLoop用于接收响应。roundTrip函数中会通过chan给writeLoop发送
// request,通过chan从readLoop接口response。每个连接都有一个readLoop和writeLoop,连接关闭后,这2个Loop也会退出。
// pconn.br给readLoop使用,pconn.bw给writeLoop使用,注意此时已经建立了tcp连接。
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
|
在创建新的 TCP 连接后,我们还会在后台为当前的连接创建两个 Goroutine,分别从 TCP 连接中读取数据或者向 TCP 连接写入数据,从建立连接的过程我们可以发现,如果我们为每一个 HTTP 请求都创建新的连接并启动 Goroutine 处理读写数据,会占用很多的资源。
addTLS
addTLS用于进行非注册协议下的TLS协商
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
// Add TLS to a persistent connection, i.e. negotiate a TLS session. If pconn is already a TLS
// tunnel, this function establishes a nested TLS session inside the encrypted channel.
// The remote endpoint's name may be overridden by TLSClientConfig.ServerName.
func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) error {
// Initiate TLS and check remote host name against certificate.
cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
if cfg.ServerName == "" {
cfg.ServerName = name
}
if pconn.cacheKey.onlyH1 {
cfg.NextProtos = nil
}
plainConn := pconn.conn
// 配置TLS client,包含一个TCP连接和TLC配置
tlsConn := tls.Client(plainConn, cfg)
errc := make(chan error, 2)
var timer *time.Timer // for canceling TLS handshake
// 设置TLS超时时间,并在超时后往errc中写入一个tlsHandshakeTimeoutError{}
if d := pconn.t.TLSHandshakeTimeout; d != 0 {
timer = time.AfterFunc(d, func() {
errc <- tlsHandshakeTimeoutError{}
})
}
go func() {
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
// 执行TLS协商,如果协商没有超时,则将协商结果err放入errc中
err := tlsConn.Handshake()
if timer != nil {
timer.Stop()
}
errc <- err
}()
// 阻塞等待TLS协商结果,如果协商失败或协商超时,关闭底层连接
if err := <-errc; err != nil {
plainConn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return err
}
// 获取协商结果并设置到pconn.tlsState
cs := tlsConn.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
pconn.tlsState = &cs
pconn.conn = tlsConn
return nil
}
|
wantConn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// A wantConn records state about a wanted connection
// (that is, an active call to getConn).
// The conn may be gotten by dialing or by finding an idle connection,
// or a cancellation may make the conn no longer wanted.
// These three options are racing against each other and use
// wantConn to coordinate and agree about the winning outcome.
type wantConn struct {
cm connectMethod
key connectMethodKey // cm.key()
ctx context.Context // context for dial
ready chan struct{} // closed when pc, err pair is delivered
// hooks for testing to know when dials are done
// beforeDial is called in the getConn goroutine when the dial is queued.
// afterDial is called when the dial is completed or cancelled.
beforeDial func()
afterDial func()
mu sync.Mutex // protects pc, err, close(ready)
pc *persistConn
err error
}
|
tryDeliver
连接建立完成后,同样会调用tryDeliver分发连接到wantConn,同时关闭通道w.ready,这样主协程就解除阻塞了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// tryDeliver attempts to deliver pc, err to w and reports whether it succeeded.
func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
w.mu.Lock()
defer w.mu.Unlock()
if w.pc != nil || w.err != nil {
return false
}
w.pc = pc
w.err = err
if w.pc == nil && w.err == nil {
panic("net/http: internal error: misuse of tryDeliver")
}
close(w.ready)
return true
}
|
wantConnQueue
Golang在实现队列时,使用了两个切片head和tail;head切片用于出队操作,tail切片用于入队操作;出队时,如果head切片为空,则交换head与tail。通过这种方式,Golang实现了底层数组空间的复用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
// A wantConnQueue is a queue of wantConns.
type wantConnQueue struct {
// This is a queue, not a deque.
// It is split into two stages - head[headPos:] and tail.
// popFront is trivial (headPos++) on the first stage, and
// pushBack is trivial (append) on the second stage.
// If the first stage is empty, popFront can swap the
// first and second stages to remedy the situation.
//
// This two-stage split is analogous to the use of two lists
// in Okasaki's purely functional queue but without the
// overhead of reversing the list when swapping stages.
head []*wantConn
headPos int
tail []*wantConn
}
// len returns the number of items in the queue.
func (q *wantConnQueue) len() int {
return len(q.head) - q.headPos + len(q.tail)
}
// pushBack adds w to the back of the queue.
func (q *wantConnQueue) pushBack(w *wantConn) {
q.tail = append(q.tail, w)
}
// popFront removes and returns the wantConn at the front of the queue.
func (q *wantConnQueue) popFront() *wantConn {
if q.headPos >= len(q.head) {
if len(q.tail) == 0 {
return nil
}
// Pick up tail as new head, clear tail.
q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
}
w := q.head[q.headPos]
q.head[q.headPos] = nil
q.headPos++
return w
}
// peekFront returns the wantConn at the front of the queue without removing it.
func (q *wantConnQueue) peekFront() *wantConn {
if q.headPos < len(q.head) {
return q.head[q.headPos]
}
if len(q.tail) > 0 {
return q.tail[0]
}
return nil
}
// cleanFront pops any wantConns that are no longer waiting from the head of the
// queue, reporting whether any were popped.
func (q *wantConnQueue) cleanFront() (cleaned bool) {
for {
w := q.peekFront()
if w == nil || w.waiting() {
return cleaned
}
q.popFront()
cleaned = true
}
}
|
roundTrip
持久的 TCP 连接会实现 net/http.persistConn.roundTrip 处理写入 HTTP 请求并在 select 语句中等待响应的返回:
在获取到底层TCP(TLS)连接后在roundTrip中处理上层协议:即发送HTTP request,返回HTTP response。roundTrip给writeLoop提供request,从readLoop获取response。
一个roundTrip用于处理一类request。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
// 此处与getConn中的"t.setReqCanceler(req, func(error) {})"相对应,用于判断request是否被取消
// 返回false表示request被取消,不必继续后续请求,关闭连接并返回错误
if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
pc.t.putOrCloseIdleConn(pc)
return nil, errRequestCanceled
}
pc.mu.Lock()
// 与readLoop配合使用,表示期望的响应的个数
pc.numExpectedResponses++
// dialConn中定义的函数,设置了proxy的认证信息
headerFn := pc.mutateHeaderFunc
pc.mu.Unlock()
if headerFn != nil {
headerFn(req.extraHeaders())
}
// Ask for a compressed version if the caller didn't set their
// own value for Accept-Encoding. We only attempt to
// uncompress the gzip stream if we were the layer that
// requested it.
requestedGzip := false
// 如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且
// 仅在调用者没有设置这些首部时设置
if !pc.t.DisableCompression &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// https://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
requestedGzip = true
req.extraHeaders().Set("Accept-Encoding", "gzip")
}
// 用于处理首部含"Expect: 100-continue"的request,客户端使用该首部探测服务器是否能够
// 处理request首部中的规格要求(如长度过大的request)。
var continueCh chan struct{}
if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
continueCh = make(chan struct{}, 1)
}
// HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会
// 新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置
// "Connection: close",则为request设置该首部。将底层表现与上层协议保持一致。
if pc.t.DisableKeepAlives && !req.wantsClose() {
req.extraHeaders().Set("Connection", "close")
}
// 用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞
gone := make(chan struct{})
defer close(gone)
defer func() {
if err != nil {
pc.t.setReqCanceler(req.cancelKey, nil)
}
}()
const debugRoundTrip = false
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
// 表示发送了多少个字节的request,debug使用
startBytesWritten := pc.nwrite
// 给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop
// 接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}
// 给readLoop封装并发送信息
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
// 该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse
// 退出roundtrip函数
for {
testHookWaitResLoop()
select {
// writeLoop返回发送request后的结果
case err := <-writeErrCh:
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
// 设置一个接收response的定时器,如果在这段时间内没有接收到response(即没有进入下面代码
// 的"case re := <-resc:"分支),超时后进入""case <-respHeaderTimer:分支,关闭连接,
// 防止readLoop一直等待读取response,导致处理阻塞;没有超时则关闭定时器
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
// 处理底层连接关闭。"case <-cancelChan:"和”case <-ctxDoneChan:“为request关闭,request
// 关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。
case <-pc.closech:
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
// 等待获取response超时,关闭连接
case <-respHeaderTimer:
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
// 接收到readLoop返回的response结果
case re := <-resc:
// 极异常情况,直接程序panic
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
return re.res, nil
// request取消
case <-cancelChan:
pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
// 将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0)
cancelChan = nil
case <-ctxDoneChan:
pc.t.cancelRequest(req.cancelKey, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
}
|
writeLoop
每个 HTTP 请求都由另一个 Goroutine 中的 net/http.persistConn.writeLoop 循环写入的,这两个 Goroutine 独立执行并通过 Channel 进行通信。net/http.Request.write 会根据 net/http.Request 结构中的字段按照 HTTP 协议组成 TCP 数据段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
// writeLoop会阻塞等待两个IO case:
// 循环等待并处理roundTrip发来的writeRequest数据,此时需要发送request;
// 如果底层连接关闭,则退出writeLoop
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
// 构造request并发送request请求。waitForContinue用于处理首部含"Expect: 100-continue"的request
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
if bre, ok := err.(requestBodyReadError); ok {
err = bre.error
// Errors reading from the user's
// Request.Body are high priority.
// Set it here before sending on the
// channels below or calling
// pc.close() which tears town
// connections and causes other
// errors.
wr.req.setError(err)
}
if err == nil {
err = pc.bw.Flush()
}
// 请求失败时,需要关闭request和底层连接
if err != nil {
wr.req.Request.closeBody()
if pc.nwrite == startBytesWritten {
err = nothingWrittenError{err}
}
}
// 将结果发送给readLoop的pc.wroteRequest()函数处理
pc.writeErrCh <- err // to the body reader, which might recycle us
// 将结果返回给roundTrip处理,防止响应超时
wr.ch <- err // to the roundTrip function
// 如果发送request失败,需要关闭连接。writeLoop退出时会关闭pc.conn和pc.closech,
// 同时会导致readLoop退出
if err != nil {
pc.close(err)
return
}
case <-pc.closech:
return
}
}
}
|
当我们调用 net/http.Request.write 向请求中写入数据时,实际上直接写入了 net/http.persistConnWriter 中的 TCP 连接中,TCP 协议栈会负责将 HTTP 请求中的内容发送到目标服务器上:
1
2
3
4
5
6
7
8
9
|
type persistConnWriter struct {
pc *persistConn
}
func (w persistConnWriter) Write(p []byte) (n int, err error) {
n, err = w.pc.conn.Write(p)
w.pc.nwrite += int64(n)
return
}
|
readLoop
readLoop循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。当readLoop正常处理完一个response之后,会将连接重新放入到连接池中;
当readloop退出后,该连接会被关闭移除。
持久连接中的另一个读循环 net/http.persistConn.readLoop 会负责从 TCP 连接中读取数据并将数据发送会 HTTP 请求的调用方,真正负责解析 HTTP 协议的还是 net/http.ReadResponse:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
// 当writeLoop或readLoop(异常)跳出循环后,都需要关闭底层连接。即一条连接包含writeLoop和readLoop两个
// 处理,任何一个loop退出(协议升级除外)则该连接不可用
// readLoo跳出循环的正常原因是连接上没有待处理的请求,此时关闭连接,释放资源
defer func() {
pc.close(closeErr) // 关闭连接
pc.t.removeIdleConn(pc) // 从连接池中删除
}()
// 尝试将连接放回连接池
tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}
// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
// back to the idle pool.
// 从上面注释可以看出该变量主要用于阻塞调用者协程读取EOF的resp.body,
// 直到该连接重新放入连接池中。处理逻辑与上面先尝试放入连接池,然后返回response一样,
// 便于连接快速重用
eofc := make(chan struct{})
// 出现错误时也需要释放读取resp.Body的协程,防止调用者协程挂死
defer close(eofc) // unblock reader on errors
// Read this once, before loop starts. (to avoid races in tests)
testHookMu.Lock()
testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
testHookMu.Unlock()
alive := true
for alive {
// 获取允许的response首部的最大字节数
pc.readLimit = pc.maxHeaderResponseSize()
// 从接收buffer中peek一个字节来判断底层是否接收到response。roundTrip保证了request先于response发送。
// 此处peek会阻塞等待response(这也是roundtrip中设置response超时定时器的原因)。goroutine中的read/write
// 操作都是阻塞模式。
_, err := pc.br.Peek(1)
pc.mu.Lock()
// 如果期望的response为0,则直接退出readLoop并关闭连接,此时连接上没有需要处理的数据,
// 关闭连接,释放系统资源。
if pc.numExpectedResponses == 0 {
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()
// 阻塞等待roundTrip发来的数据
rc := <-pc.reqch // 从管道中拿到请求,roundTrip 对该管道进行输入
trace := httptrace.ContextClientTrace(rc.req.Context())
var resp *Response
// 如果有response数据,则读取并解析为Response格式
if err == nil {
resp, err = pc.readResponse(rc, trace)// 更多的是解析 header
} else {
// 可能的错误如server端关闭,发送EOF
err = transportReadFromServerError{err}
closeErr = err
}
// 底层没有接收到server的任何数据,断开该连接,可能原因是在client发出request的同时,server关闭
// 了连接。参见transportReadFromServerError的注释。
if err != nil {
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}
// 传递错误信息给roundTrip并退出loop
select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}
pc.readLimit = maxInt64 // effectively no limit for response bodies
pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()
// 判断response是否可写,在使用101 Switching Protocol进行协议升级时需要返回一个可写的resp.body
// 如果使用了101 Switching Protocol,升级完成后就与transport没有关系了(后续使用http2或websocket等)
bodyWritable := resp.bodyIsWritable()
// 判断response的body是否为空,如果body为空,则不必读取body内容(HEAD的resp.body没有数据)
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
// 如果server关闭连接或client关闭连接或非预期的响应码或使用了协议升级,这几种情况下不能在该连接上继续
// 接收响应,退出readLoop
if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}
// 此处用于处理body为空或协议升级场景,会尝试将连接放回连接池,对于后者,连接由调用者管理,退出readLoop
if !hasBody || bodyWritable {
pc.t.setReqCanceler(rc.cancelKey, nil)
// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
// but after
// 在返回response前将连接放回连接池,快速回收利用。回收连接需要按顺序满足:
// 1.alive 为true
// 2.接收到EOF错误,此时底层连接关闭,该连接不可用
// 3.成功发送request;
// 此处的执行顺序很重要,将连接返回连接池的操作放到最后,即在协议升级的场景,服务端不再
// 发送数据的场景,以及request发送失败的场景下都不会将连接放回连接池,这些情况会导致
// alive为false,readLoop退出并关闭该连接(协议升级后的连接不能关闭)
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)
if bodyWritable {
// 协议升级之后还是会使用同一条连接,设置closeErr为errCallerOwnsConn,这样在readLoop
// return后不会被pc.close(closeErr)关闭连接
closeErr = errCallerOwnsConn
}
select {
// 1:将response成功返回后继续等待下一个response;
// 2:如果roundTrip退出,(此时无法返回给roundTrip response)则退出readLoop。
// 即roundTrip接收完response后退出不会影响readLoop继续运行
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
// we're allowed to exit now if needed (if alive is false)
testHookReadLoopBeforeNextRead()
continue
}
// 下面处理response body存在数据的场景,逻辑与body不存在数据的场景类似
waitForBodyRead := make(chan bool, 2)
// 初始化body的处理函数,读取完response会返回EOF,这类连接是可重用的
body := &bodyEOFSignal{
body: resp.Body,
// 提前关闭 !!! 输出false
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
// 正常收尾 !!!
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
//返回的resp.Body类型变为了bodyEOFSignal,如果调用者在读取resp.Body后没有关闭,会导致
// readLoop阻塞在下面"case bodyEOF := <-waitForBodyRead:"中
resp.Body = body
if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Uncompressed = true
}
// 此处与处理不带resp.body的场景相同
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancellation or death)
select {
case bodyEOF := <-waitForBodyRead:
pc.t.setReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
// alive 为 false, 不能继续 continue
// 如果读取完response的数据,则该连接可以被重用,否则直接释放。释放一个未读取完数据的连接会导致数据丢失。
//注意区分bodyEOF和pc.sawEOF的区别,一个是上层通道(http response.Body)关闭,一个是底层通道(TCP)关闭。
//只有符合以下条件的前提下,才能尝试将连接回收放入连接池:
//1. 开启KeepAlive
//2. Body读取到EOF
//3. TCP连接没有被关闭(pc.sawEOF)
//4. 请求已经彻底发送并成功
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)
// 释放阻塞的读操作
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
testHookReadLoopBeforeNextRead()
}
}
|
resp.body实际上是bodyEOFSignal结构体:
httpclient 每个连接会创建读写协程两个协程,分别使用 reqch 和 writech 来跟 roundTrip 通信。上层使用的response.Body 其实是经过多次封装的,一次封装的 body 是直接跟 net.conn 进行交互读取,二次封装的 body 则是加强了 close 和 eof 处理的 bodyEOFSignal。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// bodyEOFSignal is used by the HTTP/1 transport when reading response
// bodies to make sure we see the end of a response body before
// proceeding and reading on the connection again.
//
// It wraps a ReadCloser but runs fn (if non-nil) at most
// once, right before its final (error-producing) Read or Close call
// returns. fn should return the new error to return from Read or Close.
//
// If earlyCloseFn is non-nil and Close is called before io.EOF is
// seen, earlyCloseFn is called instead of fn, and its return value is
// the return value from Close.
type bodyEOFSignal struct {
body io.ReadCloser
mu sync.Mutex // guards following 4 fields
closed bool // whether Close has been called
rerr error // sticky Read error
fn func(error) error // err will be nil on Read io.EOF
earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
}
var errReadOnClosedResBody = errors.New("http: read on closed response body")
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock()
closed, rerr := es.closed, es.rerr
es.mu.Unlock()
if closed {
return 0, errReadOnClosedResBody
}
if rerr != nil {
return 0, rerr
}
n, err = es.body.Read(p)
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
es.rerr = err
}
err = es.condfn(err)
}
return
}
|
当未读取 body 就进行 close 时,会触发 earlyCloseFn() 回调,看 earlyCloseFn 的函数定义,在 close 未见 io.EOF 时才调用。
自定义的 earlyCloseFn 方法会给 readLoop 监听的 waitForBodyRead 传入 false, 这样引发 alive 为 false 不能继续循环的接收新请求,只能是退出调用注册过的 defer 方法,关闭连接和清理连接池。
众所周知,golang httpclient 要注意 response Body 关闭问题,但当 http client 返回值不为空,只读取 response header,但不读 body 内容就执行 response.Body.Close(),那么连接会被主动关闭,得不到复用。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (es *bodyEOFSignal) Close() error {
es.mu.Lock()
defer es.mu.Unlock()
if es.closed {
return nil
}
es.closed = true
if es.earlyCloseFn != nil && es.rerr != io.EOF {
return es.earlyCloseFn()
}
err := es.body.Close()
return es.condfn(err)
}
|
最终会调用 persistConn 的 close(), 连接关闭并关闭closech:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
// close closes the underlying TCP connection and closes
// the pc.closech channel.
//
// The provided err is only for testing and debugging; in normal
// circumstances it should never be seen by users.
func (pc *persistConn) close(err error) {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.closeLocked(err)
}
func (pc *persistConn) closeLocked(err error) {
if err == nil {
panic("nil error")
}
pc.broken = true
if pc.closed == nil {
pc.closed = err
pc.t.decConnsPerHost(pc.cacheKey)
// Close HTTP/1 (pc.alt == nil) connection.
// HTTP/2 closes its connection itself.
if pc.alt == nil {
if err != errCallerOwnsConn {
pc.conn.Close()// 关闭连接
}
close(pc.closech)// 通知读写协程
}
}
pc.mutateHeaderFunc = nil
}
|
tryPutIdleConn
请求处理完成后,通过tryPutIdleConn将连接放回连接池;这时候如果存在等待空闲连接的协程,则需要分发复用该连接。另外,在回收连接时,还需要校验空闲连接数目是否超过限制:

如果DisableKeepAlives为true表示不使用连接复用,所以请求完后会把连接关掉,但是这里需要注意的是,同时发请求的时候我们会设置Connections: close, 所以server端发送完数据后就会自动断开,所以这种情况的连接其实是server端发起的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting
// a new request.
// If pconn is no longer needed or not in a good state, tryPutIdleConn returns
// an error explaining why it wasn't registered.
// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that.
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
// 禁用长连接;或者最大空闲连接数不合法
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
return errKeepAlivesDisabled
}
if pconn.isBroken() {
return errConnBroken
}
pconn.markReused()
t.idleMu.Lock()
defer t.idleMu.Unlock()
// HTTP/2 (pconn.alt != nil) connections do not come out of the idle list,
// because multiple goroutines can use them simultaneously.
// If this is an HTTP/2 connection being “returned,” we're done.
if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
return nil
}
// Deliver pconn to goroutine waiting for idle connection, if any.
// (They may be actively dialing, but this conn is ready first.
// Chrome calls this socket late binding.
// See https://www.chromium.org/developers/design-documents/network-stack#TOC-Connection-Management.)
key := pconn.cacheKey
if q, ok := t.idleConnWait[key]; ok {
done := false
if pconn.alt == nil {
// HTTP/1.
// Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
for q.len() > 0 {
w := q.popFront()
if w.tryDeliver(pconn, nil) {
done = true
break
}
}
} else {
// HTTP/2.
// Can hand the same pconn to everyone in the waiting list,
// and we still won't be done: we want to put it in the idle
// list unconditionally, for any future clients too.
//如果等待队列不为空,分发连接
for q.len() > 0 {
w := q.popFront()
w.tryDeliver(pconn, nil)
}
}
if q.len() == 0 {
delete(t.idleConnWait, key)
} else {
t.idleConnWait[key] = q
}
if done {
return nil
}
}
if t.closeIdle {
return errCloseIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
idles := t.idleConn[key]
// 空闲连接数目超过限制,默认为DefaultMaxIdleConnsPerHost=2
if len(idles) >= t.maxIdleConnsPerHost() {
return errTooManyIdleHost
}
for _, exist := range idles {
if exist == pconn {
log.Fatalf("dup idle pconn %p in freelist", pconn)
}
}
t.idleConn[key] = append(idles, pconn)
t.idleLRU.add(pconn)
if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
oldest := t.idleLRU.removeOldest()
oldest.close(errTooManyIdle)
t.removeIdleConnLocked(oldest)
}
// Set idle timer, but only for HTTP/1 (pconn.alt == nil).
// The HTTP/2 implementation manages the idle timer itself
// (see idleConnTimeout in h2_bundle.go).
// 在将空闲连接添加到连接池时,Golang同时还设置了定时器,定时器到期后,自然会关闭该连接。
if t.IdleConnTimeout > 0 && pconn.alt == nil {
if pconn.idleTimer != nil {
pconn.idleTimer.Reset(t.IdleConnTimeout)
} else {
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
}
}
pconn.idleAt = time.Now()
return nil
}
|
Response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
// Response represents the response from an HTTP request.
//
// The Client and Transport return Responses from servers once
// the response headers have been received. The response body
// is streamed on demand as the Body field is read.
type Response struct {
Status string // e.g. "200 OK"
StatusCode int // e.g. 200
Proto string // e.g. "HTTP/1.0"
ProtoMajor int // e.g. 1
ProtoMinor int // e.g. 0
// Header maps header keys to values. If the response had multiple
// headers with the same key, they may be concatenated, with comma
// delimiters. (RFC 7230, section 3.2.2 requires that multiple headers
// be semantically equivalent to a comma-delimited sequence.) When
// Header values are duplicated by other fields in this struct (e.g.,
// ContentLength, TransferEncoding, Trailer), the field values are
// authoritative.
//
// Keys in the map are canonicalized (see CanonicalHeaderKey).
Header Header
// Body represents the response body.
//
// The response body is streamed on demand as the Body field
// is read. If the network connection fails or the server
// terminates the response, Body.Read calls return an error.
//
// The http Client and Transport guarantee that Body is always
// non-nil, even on responses without a body or responses with
// a zero-length body. It is the caller's responsibility to
// close Body. The default HTTP client's Transport may not
// reuse HTTP/1.x "keep-alive" TCP connections if the Body is
// not read to completion and closed.
//
// The Body is automatically dechunked if the server replied
// with a "chunked" Transfer-Encoding.
//
// As of Go 1.12, the Body will also implement io.Writer
// on a successful "101 Switching Protocols" response,
// as used by WebSockets and HTTP/2's "h2c" mode.
Body io.ReadCloser
// ContentLength records the length of the associated content. The
// value -1 indicates that the length is unknown. Unless Request.Method
// is "HEAD", values >= 0 indicate that the given number of bytes may
// be read from Body.
ContentLength int64
// Contains transfer encodings from outer-most to inner-most. Value is
// nil, means that "identity" encoding is used.
TransferEncoding []string
// Close records whether the header directed that the connection be
// closed after reading Body. The value is advice for clients: neither
// ReadResponse nor Response.Write ever closes a connection.
Close bool
// Uncompressed reports whether the response was sent compressed but
// was decompressed by the http package. When true, reading from
// Body yields the uncompressed content instead of the compressed
// content actually set from the server, ContentLength is set to -1,
// and the "Content-Length" and "Content-Encoding" fields are deleted
// from the responseHeader. To get the original response from
// the server, set Transport.DisableCompression to true.
Uncompressed bool
// Trailer maps trailer keys to values in the same
// format as Header.
//
// The Trailer initially contains only nil values, one for
// each key specified in the server's "Trailer" header
// value. Those values are not added to Header.
//
// Trailer must not be accessed concurrently with Read calls
// on the Body.
//
// After Body.Read has returned io.EOF, Trailer will contain
// any trailer values sent by the server.
Trailer Header
// Request is the request that was sent to obtain this Response.
// Request's Body is nil (having already been consumed).
// This is only populated for Client requests.
Request *Request
// TLS contains information about the TLS connection on which the
// response was received. It is nil for unencrypted responses.
// The pointer is shared between responses and should not be
// modified.
TLS *tls.ConnectionState
}
|
readResponse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
// readResponse reads an HTTP response (or two, in the case of "Expect:
// 100-continue") from the server. It returns the final non-100 one.
// trace is optional.
func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
if trace != nil && trace.GotFirstResponseByte != nil {
if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
trace.GotFirstResponseByte()
}
}
num1xx := 0 // number of informational 1xx headers received
const max1xxResponses = 5 // arbitrary bound on number of informational responses
continueCh := rc.continueCh
for {
resp, err = ReadResponse(pc.br, rc.req)
if err != nil {
return
}
resCode := resp.StatusCode
if continueCh != nil {
if resCode == 100 {
if trace != nil && trace.Got100Continue != nil {
trace.Got100Continue()
}
continueCh <- struct{}{}
continueCh = nil
} else if resCode >= 200 {
close(continueCh)
continueCh = nil
}
}
is1xx := 100 <= resCode && resCode <= 199
// treat 101 as a terminal status, see issue 26161
is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
if is1xxNonTerminal {
num1xx++
if num1xx > max1xxResponses {
return nil, errors.New("net/http: too many 1xx informational responses")
}
pc.readLimit = pc.maxHeaderResponseSize() // reset the limit
if trace != nil && trace.Got1xxResponse != nil {
if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
return nil, err
}
}
continue
}
break
}
if resp.isProtocolSwitch() {
resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
}
resp.TLS = pc.tlsState
return
}
|
我们在上述方法中可以看到 HTTP 响应结构的大致框架,其中包含状态码、协议版本、请求头等内容,响应体还是在读取循环 net/http.persistConn.readLoop 中根据 HTTP 协议头进行解析的。
服务器
Go 语言标准库 net/http 包提供了非常易用的接口,如下所示,我们可以利用标准库提供的功能快速搭建新的 HTTP 服务:
1
2
3
4
5
6
7
8
|
func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hi there, I love %s!", r.URL.Path[1:])
}
func main() {
http.HandleFunc("/", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
|
上述的 main 函数只调用了两个标准库提供的函数,它们分别是用于注册处理器的 net/http.HandleFunc 函数和用于监听和处理器请求的 net/http.ListenAndServe,多数的服务器框架都会包含这两类接口,分别负责注册处理器和处理外部请求,这一种非常常见的模式,我们在这里也会按照这两个维度介绍标准库如何支持 HTTP 服务器的实现。
注册处理器
HTTP 服务是由一组实现了 net/http.Handler 接口的处理器组成的,处理 HTTP 请求时会根据请求的路由选择合适的处理器:

当我们直接调用 net/http.HandleFunc 注册处理器时,标准库会使用默认的 HTTP 服务器 net/http.DefaultServeMux 处理请求,该方法会直接调用 net/http.ServeMux.HandleFunc:
1
2
3
|
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter,*Request)) {
mux.Handle(pattern, HandlerFunc(handler))
}
|
上述方法会将处理器转换成 net/http.Handler 接口类型调用 net/http.ServeMux.Handle 注册处理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (mux *ServeMux) Handle(pattern string, handler Handler) {
if _, exist := mux.m[pattern]; exist {
panic("http: multiple registrations for " + pattern)
}
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
if pattern[len(pattern)-1] == '/' {
mux.es = appendSorted(mux.es, e)
}
if pattern[0] != '/' {
mux.hosts = true
}
}
|
路由和对应的处理器会被组成 net/http.DefaultServeMux,该结构会持有一个 net/http.muxEntry 哈希,其中存储了从 URL 到处理器的映射关系,HTTP 服务器在处理请求时就会使用该哈希查找处理器。
处理请求
标准库提供的 net/http.ListenAndServe 可以用来监听 TCP 连接并处理请求,该函数会使用传入的监听地址和处理器初始化一个 HTTP 服务器 net/http.Server,调用该服务器的 net/http.Server.ListenAndServe 方法:
1
2
3
4
|
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
|
net/http.Server.ListenAndServe 会使用网络库提供的 net.Listen 监听对应地址上的 TCP 连接并通过 net/http.Server.Serve 处理客户端的请求:
1
2
3
4
5
6
7
8
9
10
|
func (srv *Server) ListenAndServe() error {
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(ln)
}
|
net/http.Server.Serve 会在循环中监听外部的 TCP 连接并为每个连接调用 net/http.Server.newConn 创建新的 net/http.conn,它是 HTTP 连接的服务端表示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func (srv *Server) Serve(l net.Listener) error {
l = &onceCloseListener{Listener: l}
defer l.Close()
baseCtx := context.Background()
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
if err != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
...
return err
}
connCtx := ctx
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(connCtx)
}
}
|
创建了服务端的连接之后,标准库中的实现会为每个 HTTP 请求创建单独的 Goroutine 并在其中调用 net/http.Conn.serve 方法,如果当前 HTTP 服务接收到了海量的请求,会在内部创建大量的 Goroutine,这可能会使整个服务质量明显降低无法处理请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
if !c.hijacked() {
c.close()
c.setState(c.rwc, StateClosed)
}
}()
if tlsConn, ok := c.rwc.(*tls.Conn); ok {
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if err := tlsConn.Handshake(); err != nil {
// If the handshake failed due to the client not speaking
// TLS, assume they're speaking plaintext HTTP and write a
// 400 response on the TLS conn's underlying net.Conn.
if re, ok := err.(tls.RecordHeaderError); ok && re.Conn != nil && tlsRecordHeaderLooksLikeHTTP(re.RecordHeader) {
io.WriteString(re.Conn, "HTTP/1.0 400 Bad Request\r\n\r\nClient sent an HTTP request to an HTTPS server.\n")
re.Conn.Close()
return
}
c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
return
}
c.tlsState = new(tls.ConnectionState)
*c.tlsState = tlsConn.ConnectionState()
if proto := c.tlsState.NegotiatedProtocol; validNextProto(proto) {
if fn := c.server.TLSNextProto[proto]; fn != nil {
h := initALPNRequest{ctx, tlsConn, serverHandler{c.server}}
fn(c.server, tlsConn, h)
}
return
}
}
// HTTP/1.x from here on.
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(c.rwc, StateActive)
}
if err != nil {
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
switch {
case err == errTooLarge:
// Their HTTP client may or may not be
// able to read this if we're
// responding to them and hanging up
// while they're still writing their
// request. Undefined behavior.
const publicErr = "431 Request Header Fields Too Large"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
c.closeWriteAndWait()
return
case isUnsupportedTEError(err):
// Respond as per RFC 7230 Section 3.3.1 which says,
// A server that receives a request message with a
// transfer coding it does not understand SHOULD
// respond with 501 (Unimplemented).
code := StatusNotImplemented
// We purposefully aren't echoing back the transfer-encoding's value,
// so as to mitigate the risk of cross side scripting by an attacker.
fmt.Fprintf(c.rwc, "HTTP/1.1 %d %s%sUnsupported transfer encoding", code, StatusText(code), errorHeaders)
return
case isCommonNetReadError(err):
return // don't reply
default:
publicErr := "400 Bad Request"
if v, ok := err.(badRequestError); ok {
publicErr = publicErr + ": " + string(v)
}
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}
}
// Expect 100 Continue support
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
// Wrap the Body reader with one that replies on the connection
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
w.canWriteContinue.setTrue()
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}
c.curReq.Store(w)
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}
// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle)
c.curReq.Store((*response)(nil))
if !w.conn.server.doKeepAlives() {
// We're in shutdown mode. We might've replied
// to the user without "Connection: close" and
// they might think they can send another
// request, but such is life with HTTP/1.1.
return
}
if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
return
}
}
c.rwc.SetReadDeadline(time.Time{})
}
}
|
上述代码片段包含读取 HTTP 请求、调用 Handler 处理 HTTP 请求以及调用完成该请求。读取 HTTP 请求会调用 net/http.Conn.readRequest,该方法会从连接中获取 HTTP 请求并构建一个实现了 net/http.ResponseWriter 接口的变量 net/http.response,向该结构体写入的数据都会被转发到它持有的缓冲区中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// either dataB or dataS is non-zero.
func (w *response) write(lenData int, dataB []byte, dataS string) (n int, err error) {
if w.conn.hijacked() {
if lenData > 0 {
caller := relevantCaller()
w.conn.server.logf("http: response.Write on hijacked connection from %s (%s:%d)", caller.Function, path.Base(caller.File), caller.Line)
}
return 0, ErrHijacked
}
if w.canWriteContinue.isSet() {
// Body reader wants to write 100 Continue but hasn't yet.
// Tell it not to. The store must be done while holding the lock
// because the lock makes sure that there is not an active write
// this very moment.
w.writeContinueMu.Lock()
w.canWriteContinue.setFalse()
w.writeContinueMu.Unlock()
}
if !w.wroteHeader {
w.WriteHeader(StatusOK)
}
if lenData == 0 {
return 0, nil
}
if !w.bodyAllowed() {
return 0, ErrBodyNotAllowed
}
w.written += int64(lenData) // ignoring errors, for errorKludge
if w.contentLength != -1 && w.written > w.contentLength {
return 0, ErrContentLength
}
if dataB != nil {
return w.w.Write(dataB)
} else {
return w.w.WriteString(dataS)
}
}
|
解析了 HTTP 请求并初始化 net/http.ResponseWriter 之后,我们就可以调用 net/http.serverHandler.ServeHTTP 查找处理器来处理 HTTP 请求了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
type serverHandler struct {
srv *Server
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
handler.ServeHTTP(rw, req)
}
|
如果当前的 HTTP 服务器中不包含任何处理器,我们会使用默认的 net/http.DefaultServeMux 处理外部的 HTTP 请求。
net/http.ServeMux 是一个 HTTP 请求的多路复用器,它可以接收外部的 HTTP 请求、根据请求的 URL 匹配并调用最合适的处理器:
1
2
3
4
|
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r*Request) {
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}
|
经过一系列的函数调用,上述过程最终会调用 HTTP 服务器的 net/http.ServerMux.match,该方法会遍历前面注册过的路由表并根据特定规则进行匹配:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (mux *ServeMux) match(path string) (h Handler, pattern string) {
v, ok := mux.m[path]
if ok {
return v.h, v.pattern
}
for _, e := range mux.es {
if strings.HasPrefix(path, e.pattern) {
return e.h, e.pattern
}
}
return nil, ""
}
|
如果请求的路径和路由中的表项匹配成功,我们会调用表项中对应的处理器,处理器中包含的业务逻辑会通过 net/http.ResponseWriter 构建 HTTP 请求对应的响应并通过 TCP 连接发送回客户端。
总结
Go 语言的 HTTP 标准库提供了非常丰富的功能,很多语言的标准库只提供了最基本的功能,实现 HTTP 客户端和服务器往往都需要借助其他开源的框架,但是 Go 语言的很多项目都会直接使用标准库实现 HTTP 服务器,这也从侧面说明了 Go 语言标准库的价值。
参考
9.2 HTTP
详解golang net之transport
Golang 你一定要懂的连接池
go http请求流程分析
golang http client 连接池
Go http client 连接池不复用的问题