处理AI胡乱生成的乱摊子

This commit is contained in:
2025-09-07 20:31:38 +08:00
parent c377a0784d
commit ba513e0827
202 changed files with 11751 additions and 67183 deletions

View File

@@ -1,51 +0,0 @@
#
# This Dockerfile builds a recent curl with HTTP/2 client support, using
# a recent nghttp2 build.
#
# See the Makefile for how to tag it. If Docker and that image is found, the
# Go tests use this curl binary for integration tests.
#
FROM ubuntu:trusty
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y git-core build-essential wget
RUN apt-get install -y --no-install-recommends \
autotools-dev libtool pkg-config zlib1g-dev \
libcunit1-dev libssl-dev libxml2-dev libevent-dev \
automake autoconf
# The list of packages nghttp2 recommends for h2load:
RUN apt-get install -y --no-install-recommends make binutils \
autoconf automake autotools-dev \
libtool pkg-config zlib1g-dev libcunit1-dev libssl-dev libxml2-dev \
libev-dev libevent-dev libjansson-dev libjemalloc-dev \
cython python3.4-dev python-setuptools
# Note: setting NGHTTP2_VER before the git clone, so an old git clone isn't cached:
ENV NGHTTP2_VER 895da9a
RUN cd /root && git clone https://github.com/tatsuhiro-t/nghttp2.git
WORKDIR /root/nghttp2
RUN git reset --hard $NGHTTP2_VER
RUN autoreconf -i
RUN automake
RUN autoconf
RUN ./configure
RUN make
RUN make install
WORKDIR /root
RUN wget https://curl.se/download/curl-7.45.0.tar.gz
RUN tar -zxvf curl-7.45.0.tar.gz
WORKDIR /root/curl-7.45.0
RUN ./configure --with-ssl --with-nghttp2=/usr/local
RUN make
RUN make install
RUN ldconfig
CMD ["-h"]
ENTRYPOINT ["/usr/local/bin/curl"]

View File

@@ -1,3 +0,0 @@
curlimage:
docker build -t gohttp2/curl .

View File

@@ -20,41 +20,44 @@ import (
// TODO: Benchmark to determine if the pools are necessary. The GC may have
// improved enough that we can instead allocate chunks like this:
// make([]byte, max(16<<10, expectedBytesRemaining))
var (
dataChunkSizeClasses = []int{
1 << 10,
2 << 10,
4 << 10,
8 << 10,
16 << 10,
}
dataChunkPools = [...]sync.Pool{
{New: func() interface{} { return make([]byte, 1<<10) }},
{New: func() interface{} { return make([]byte, 2<<10) }},
{New: func() interface{} { return make([]byte, 4<<10) }},
{New: func() interface{} { return make([]byte, 8<<10) }},
{New: func() interface{} { return make([]byte, 16<<10) }},
}
)
var dataChunkPools = [...]sync.Pool{
{New: func() interface{} { return new([1 << 10]byte) }},
{New: func() interface{} { return new([2 << 10]byte) }},
{New: func() interface{} { return new([4 << 10]byte) }},
{New: func() interface{} { return new([8 << 10]byte) }},
{New: func() interface{} { return new([16 << 10]byte) }},
}
func getDataBufferChunk(size int64) []byte {
i := 0
for ; i < len(dataChunkSizeClasses)-1; i++ {
if size <= int64(dataChunkSizeClasses[i]) {
break
}
switch {
case size <= 1<<10:
return dataChunkPools[0].Get().(*[1 << 10]byte)[:]
case size <= 2<<10:
return dataChunkPools[1].Get().(*[2 << 10]byte)[:]
case size <= 4<<10:
return dataChunkPools[2].Get().(*[4 << 10]byte)[:]
case size <= 8<<10:
return dataChunkPools[3].Get().(*[8 << 10]byte)[:]
default:
return dataChunkPools[4].Get().(*[16 << 10]byte)[:]
}
return dataChunkPools[i].Get().([]byte)
}
func putDataBufferChunk(p []byte) {
for i, n := range dataChunkSizeClasses {
if len(p) == n {
dataChunkPools[i].Put(p)
return
}
switch len(p) {
case 1 << 10:
dataChunkPools[0].Put((*[1 << 10]byte)(p))
case 2 << 10:
dataChunkPools[1].Put((*[2 << 10]byte)(p))
case 4 << 10:
dataChunkPools[2].Put((*[4 << 10]byte)(p))
case 8 << 10:
dataChunkPools[3].Put((*[8 << 10]byte)(p))
case 16 << 10:
dataChunkPools[4].Put((*[16 << 10]byte)(p))
default:
panic(fmt.Sprintf("unexpected buffer len=%v", len(p)))
}
panic(fmt.Sprintf("unexpected buffer len=%v", len(p)))
}
// dataBuffer is an io.ReadWriter backed by a list of data chunks.

View File

@@ -490,6 +490,9 @@ func terminalReadFrameError(err error) bool {
// returned error is ErrFrameTooLarge. Other errors may be of type
// ConnectionError, StreamError, or anything else from the underlying
// reader.
//
// If ReadFrame returns an error and a non-nil Frame, the Frame's StreamID
// indicates the stream responsible for the error.
func (fr *Framer) ReadFrame() (Frame, error) {
fr.errDetail = nil
if fr.lastFrame != nil {
@@ -1510,19 +1513,18 @@ func (mh *MetaHeadersFrame) checkPseudos() error {
}
func (fr *Framer) maxHeaderStringLen() int {
v := fr.maxHeaderListSize()
if uint32(int(v)) == v {
return int(v)
v := int(fr.maxHeaderListSize())
if v < 0 {
// If maxHeaderListSize overflows an int, use no limit (0).
return 0
}
// They had a crazy big number for MaxHeaderBytes anyway,
// so give them unlimited header lengths:
return 0
return v
}
// readMetaFrame returns 0 or more CONTINUATION frames from fr and
// merge them into the provided hf and returns a MetaHeadersFrame
// with the decoded hpack values.
func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
func (fr *Framer) readMetaFrame(hf *HeadersFrame) (Frame, error) {
if fr.AllowIllegalReads {
return nil, errors.New("illegal use of AllowIllegalReads with ReadMetaHeaders")
}
@@ -1565,6 +1567,7 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
if size > remainSize {
hdec.SetEmitEnabled(false)
mh.Truncated = true
remainSize = 0
return
}
remainSize -= size
@@ -1577,8 +1580,38 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
var hc headersOrContinuation = hf
for {
frag := hc.HeaderBlockFragment()
// Avoid parsing large amounts of headers that we will then discard.
// If the sender exceeds the max header list size by too much,
// skip parsing the fragment and close the connection.
//
// "Too much" is either any CONTINUATION frame after we've already
// exceeded the max header list size (in which case remainSize is 0),
// or a frame whose encoded size is more than twice the remaining
// header list bytes we're willing to accept.
if int64(len(frag)) > int64(2*remainSize) {
if VerboseLogs {
log.Printf("http2: header list too large")
}
// It would be nice to send a RST_STREAM before sending the GOAWAY,
// but the structure of the server's frame writer makes this difficult.
return mh, ConnectionError(ErrCodeProtocol)
}
// Also close the connection after any CONTINUATION frame following an
// invalid header, since we stop tracking the size of the headers after
// an invalid one.
if invalid != nil {
if VerboseLogs {
log.Printf("http2: invalid header: %v", invalid)
}
// It would be nice to send a RST_STREAM before sending the GOAWAY,
// but the structure of the server's frame writer makes this difficult.
return mh, ConnectionError(ErrCodeProtocol)
}
if _, err := hdec.Write(frag); err != nil {
return nil, ConnectionError(ErrCodeCompression)
return mh, ConnectionError(ErrCodeCompression)
}
if hc.HeadersEnded() {
@@ -1595,7 +1628,7 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
mh.HeadersFrame.invalidate()
if err := hdec.Close(); err != nil {
return nil, ConnectionError(ErrCodeCompression)
return mh, ConnectionError(ErrCodeCompression)
}
if invalid != nil {
fr.errDetail = invalid

View File

@@ -1,30 +0,0 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.11
// +build go1.11
package http2
import (
"net/http/httptrace"
"net/textproto"
)
func traceHasWroteHeaderField(trace *httptrace.ClientTrace) bool {
return trace != nil && trace.WroteHeaderField != nil
}
func traceWroteHeaderField(trace *httptrace.ClientTrace, k, v string) {
if trace != nil && trace.WroteHeaderField != nil {
trace.WroteHeaderField(k, []string{v})
}
}
func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
if trace != nil {
return trace.Got1xxResponse
}
return nil
}

View File

@@ -1,27 +0,0 @@
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.15
// +build go1.15
package http2
import (
"context"
"crypto/tls"
)
// dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
// connection.
func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
dialer := &tls.Dialer{
Config: cfg,
}
cn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
return tlsCn, nil
}

View File

@@ -1,17 +0,0 @@
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.18
// +build go1.18
package http2
import (
"crypto/tls"
"net"
)
func tlsUnderlyingConn(tc *tls.Conn) net.Conn {
return tc.NetConn()
}

View File

@@ -44,7 +44,7 @@ func init() {
// HTTP/1, but unlikely to occur in practice and (2) Upgrading from HTTP/1 to
// h2c - this works by using the HTTP/1 Upgrade header to request an upgrade to
// h2c. When either of those situations occur we hijack the HTTP/1 connection,
// convert it to a HTTP/2 connection and pass the net.Conn to http2.ServeConn.
// convert it to an HTTP/2 connection and pass the net.Conn to http2.ServeConn.
type h2cHandler struct {
Handler http.Handler
s *http2.Server

View File

@@ -17,15 +17,18 @@ package http2 // import "golang.org/x/net/http2"
import (
"bufio"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/net/http/httpguts"
)
@@ -210,12 +213,6 @@ type stringWriter interface {
WriteString(s string) (n int, err error)
}
// A gate lets two goroutines coordinate their activities.
type gate chan struct{}
func (g gate) Done() { g <- struct{}{} }
func (g gate) Wait() { <-g }
// A closeWaiter is like a sync.WaitGroup but only goes 1 to 0 (open to closed).
type closeWaiter chan struct{}
@@ -241,13 +238,19 @@ func (cw closeWaiter) Wait() {
// Its buffered writer is lazily allocated as needed, to minimize
// idle memory usage with many connections.
type bufferedWriter struct {
_ incomparable
w io.Writer // immutable
bw *bufio.Writer // non-nil when data is buffered
_ incomparable
group synctestGroupInterface // immutable
conn net.Conn // immutable
bw *bufio.Writer // non-nil when data is buffered
byteTimeout time.Duration // immutable, WriteByteTimeout
}
func newBufferedWriter(w io.Writer) *bufferedWriter {
return &bufferedWriter{w: w}
func newBufferedWriter(group synctestGroupInterface, conn net.Conn, timeout time.Duration) *bufferedWriter {
return &bufferedWriter{
group: group,
conn: conn,
byteTimeout: timeout,
}
}
// bufWriterPoolBufferSize is the size of bufio.Writer's
@@ -274,7 +277,7 @@ func (w *bufferedWriter) Available() int {
func (w *bufferedWriter) Write(p []byte) (n int, err error) {
if w.bw == nil {
bw := bufWriterPool.Get().(*bufio.Writer)
bw.Reset(w.w)
bw.Reset((*bufferedWriterTimeoutWriter)(w))
w.bw = bw
}
return w.bw.Write(p)
@@ -292,6 +295,38 @@ func (w *bufferedWriter) Flush() error {
return err
}
type bufferedWriterTimeoutWriter bufferedWriter
func (w *bufferedWriterTimeoutWriter) Write(p []byte) (n int, err error) {
return writeWithByteTimeout(w.group, w.conn, w.byteTimeout, p)
}
// writeWithByteTimeout writes to conn.
// If more than timeout passes without any bytes being written to the connection,
// the write fails.
func writeWithByteTimeout(group synctestGroupInterface, conn net.Conn, timeout time.Duration, p []byte) (n int, err error) {
if timeout <= 0 {
return conn.Write(p)
}
for {
var now time.Time
if group == nil {
now = time.Now()
} else {
now = group.Now()
}
conn.SetWriteDeadline(now.Add(timeout))
nn, err := conn.Write(p[n:])
n += nn
if n == len(p) || nn == 0 || !errors.Is(err, os.ErrDeadlineExceeded) {
// Either we finished the write, made no progress, or hit the deadline.
// Whichever it is, we're done now.
conn.SetWriteDeadline(time.Time{})
return n, err
}
}
}
func mustUint31(v int32) uint32 {
if v < 0 || v > 2147483647 {
panic("out of range")
@@ -383,3 +418,14 @@ func validPseudoPath(v string) bool {
// makes that struct also non-comparable, and generally doesn't add
// any size (as long as it's first).
type incomparable [0]func()
// synctestGroupInterface is the methods of synctestGroup used by Server and Transport.
// It's defined as an interface here to let us keep synctestGroup entirely test-only
// and not a part of non-test builds.
type synctestGroupInterface interface {
Join()
Now() time.Time
NewTimer(d time.Duration) timer
AfterFunc(d time.Duration, f func()) timer
ContextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc)
}

View File

@@ -1,21 +0,0 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.11
// +build !go1.11
package http2
import (
"net/http/httptrace"
"net/textproto"
)
func traceHasWroteHeaderField(trace *httptrace.ClientTrace) bool { return false }
func traceWroteHeaderField(trace *httptrace.ClientTrace, k, v string) {}
func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
return nil
}

View File

@@ -1,31 +0,0 @@
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.15
// +build !go1.15
package http2
import (
"context"
"crypto/tls"
)
// dialTLSWithContext opens a TLS connection.
func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
cn, err := tls.Dial(network, addr, cfg)
if err != nil {
return nil, err
}
if err := cn.Handshake(); err != nil {
return nil, err
}
if cfg.InsecureSkipVerify {
return cn, nil
}
if err := cn.VerifyHostname(cfg.ServerName); err != nil {
return nil, err
}
return cn, nil
}

View File

@@ -1,17 +0,0 @@
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.18
// +build !go1.18
package http2
import (
"crypto/tls"
"net"
)
func tlsUnderlyingConn(tc *tls.Conn) net.Conn {
return nil
}

View File

@@ -77,7 +77,10 @@ func (p *pipe) Read(d []byte) (n int, err error) {
}
}
var errClosedPipeWrite = errors.New("write on closed buffer")
var (
errClosedPipeWrite = errors.New("write on closed buffer")
errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
)
// Write copies bytes from p into the buffer and wakes a reader.
// It is an error to write more data than the buffer can hold.
@@ -91,6 +94,12 @@ func (p *pipe) Write(d []byte) (n int, err error) {
if p.err != nil || p.breakErr != nil {
return 0, errClosedPipeWrite
}
// pipe.setBuffer is never invoked, leaving the buffer uninitialized.
// We shouldn't try to write to an uninitialized pipe,
// but returning an error is better than panicking.
if p.b == nil {
return 0, errUninitializedPipeWrite
}
return p.b.Write(d)
}

View File

@@ -29,6 +29,7 @@ import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"errors"
"fmt"
@@ -52,10 +53,14 @@ import (
)
const (
prefaceTimeout = 10 * time.Second
firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
handlerChunkWriteSize = 4 << 10
defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
prefaceTimeout = 10 * time.Second
firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
handlerChunkWriteSize = 4 << 10
defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
// maxQueuedControlFrames is the maximum number of control frames like
// SETTINGS, PING and RST_STREAM that will be queued for writing before
// the connection is closed to prevent memory exhaustion attacks.
maxQueuedControlFrames = 10000
)
@@ -124,8 +129,25 @@ type Server struct {
// IdleTimeout specifies how long until idle clients should be
// closed with a GOAWAY frame. PING frames are not considered
// activity for the purposes of IdleTimeout.
// If zero or negative, there is no timeout.
IdleTimeout time.Duration
// ReadIdleTimeout is the timeout after which a health check using a ping
// frame will be carried out if no frame is received on the connection.
// If zero, no health check is performed.
ReadIdleTimeout time.Duration
// PingTimeout is the timeout after which the connection will be closed
// if a response to a ping is not received.
// If zero, a default of 15 seconds is used.
PingTimeout time.Duration
// WriteByteTimeout is the timeout after which a connection will be
// closed if no data can be written to it. The timeout begins when data is
// available to write, and is extended whenever any bytes are written.
// If zero or negative, there is no timeout.
WriteByteTimeout time.Duration
// MaxUploadBufferPerConnection is the size of the initial flow
// control window for each connections. The HTTP/2 spec does not
// allow this to be smaller than 65535 or larger than 2^32-1.
@@ -153,57 +175,39 @@ type Server struct {
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *serverInternalState
// Synchronization group used for testing.
// Outside of tests, this is nil.
group synctestGroupInterface
}
func (s *Server) initialConnRecvWindowSize() int32 {
if s.MaxUploadBufferPerConnection >= initialWindowSize {
return s.MaxUploadBufferPerConnection
func (s *Server) markNewGoroutine() {
if s.group != nil {
s.group.Join()
}
return 1 << 20
}
func (s *Server) initialStreamRecvWindowSize() int32 {
if s.MaxUploadBufferPerStream > 0 {
return s.MaxUploadBufferPerStream
func (s *Server) now() time.Time {
if s.group != nil {
return s.group.Now()
}
return 1 << 20
return time.Now()
}
func (s *Server) maxReadFrameSize() uint32 {
if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
return v
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (s *Server) newTimer(d time.Duration) timer {
if s.group != nil {
return s.group.NewTimer(d)
}
return defaultMaxReadFrameSize
return timeTimer{time.NewTimer(d)}
}
func (s *Server) maxConcurrentStreams() uint32 {
if v := s.MaxConcurrentStreams; v > 0 {
return v
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (s *Server) afterFunc(d time.Duration, f func()) timer {
if s.group != nil {
return s.group.AfterFunc(d, f)
}
return defaultMaxStreams
}
func (s *Server) maxDecoderHeaderTableSize() uint32 {
if v := s.MaxDecoderHeaderTableSize; v > 0 {
return v
}
return initialHeaderTableSize
}
func (s *Server) maxEncoderHeaderTableSize() uint32 {
if v := s.MaxEncoderHeaderTableSize; v > 0 {
return v
}
return initialHeaderTableSize
}
// maxQueuedControlFrames is the maximum number of control frames like
// SETTINGS, PING and RST_STREAM that will be queued for writing before
// the connection is closed to prevent memory exhaustion attacks.
func (s *Server) maxQueuedControlFrames() int {
// TODO: if anybody asks, add a Server field, and remember to define the
// behavior of negative values.
return maxQueuedControlFrames
return timeTimer{time.AfterFunc(d, f)}
}
type serverInternalState struct {
@@ -399,16 +403,22 @@ func (o *ServeConnOpts) handler() http.Handler {
//
// The opts parameter is optional. If nil, default values are used.
func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
s.serveConn(c, opts, nil)
}
func (s *Server) serveConn(c net.Conn, opts *ServeConnOpts, newf func(*serverConn)) {
baseCtx, cancel := serverConnBaseContext(c, opts)
defer cancel()
http1srv := opts.baseConfig()
conf := configFromServer(http1srv, s)
sc := &serverConn{
srv: s,
hs: opts.baseConfig(),
hs: http1srv,
conn: c,
baseCtx: baseCtx,
remoteAddrStr: c.RemoteAddr().String(),
bw: newBufferedWriter(c),
bw: newBufferedWriter(s.group, c, conf.WriteByteTimeout),
handler: opts.handler(),
streams: make(map[uint32]*stream),
readFrameCh: make(chan readFrameResult),
@@ -418,13 +428,19 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}),
clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
advMaxStreams: s.maxConcurrentStreams(),
advMaxStreams: conf.MaxConcurrentStreams,
initialStreamSendWindowSize: initialWindowSize,
initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
maxFrameSize: initialMaxFrameSize,
pingTimeout: conf.PingTimeout,
countErrorFunc: conf.CountError,
serveG: newGoroutineLock(),
pushEnabled: true,
sawClientPreface: opts.SawClientPreface,
}
if newf != nil {
newf(sc)
}
s.state.registerConn(sc)
defer s.state.unregisterConn(sc)
@@ -434,14 +450,14 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
// passes the connection off to us with the deadline already set.
// Write deadlines are set per stream in serverConn.newStream.
// Disarm the net.Conn write deadline here.
if sc.hs.WriteTimeout != 0 {
if sc.hs.WriteTimeout > 0 {
sc.conn.SetWriteDeadline(time.Time{})
}
if s.NewWriteScheduler != nil {
sc.writeSched = s.NewWriteScheduler()
} else {
sc.writeSched = NewPriorityWriteScheduler(nil)
sc.writeSched = newRoundRobinWriteScheduler()
}
// These start at the RFC-specified defaults. If there is a higher
@@ -450,15 +466,15 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
sc.flow.add(initialWindowSize)
sc.inflow.init(initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
sc.hpackEncoder.SetMaxDynamicTableSizeLimit(s.maxEncoderHeaderTableSize())
sc.hpackEncoder.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
fr := NewFramer(sc.bw, c)
if s.CountError != nil {
fr.countError = s.CountError
if conf.CountError != nil {
fr.countError = conf.CountError
}
fr.ReadMetaHeaders = hpack.NewDecoder(s.maxDecoderHeaderTableSize(), nil)
fr.ReadMetaHeaders = hpack.NewDecoder(conf.MaxDecoderHeaderTableSize, nil)
fr.MaxHeaderListSize = sc.maxHeaderListSize()
fr.SetMaxReadFrameSize(s.maxReadFrameSize())
fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
sc.framer = fr
if tc, ok := c.(connectionStater); ok {
@@ -491,7 +507,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
// So for now, do nothing here again.
}
if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
if !conf.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
// "Endpoints MAY choose to generate a connection error
// (Section 5.4.1) of type INADEQUATE_SECURITY if one of
// the prohibited cipher suites are negotiated."
@@ -528,7 +544,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
opts.UpgradeRequest = nil
}
sc.serve()
sc.serve(conf)
}
func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
@@ -568,6 +584,7 @@ type serverConn struct {
tlsState *tls.ConnectionState // shared by all handlers, like net/http
remoteAddrStr string
writeSched WriteScheduler
countErrorFunc func(errType string)
// Everything following is owned by the serve loop; use serveG.check():
serveG goroutineLock // used to verify funcs are on serve()
@@ -581,10 +598,13 @@ type serverConn struct {
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
curClientStreams uint32 // number of open streams initiated by the client
curPushedStreams uint32 // number of open streams initiated by server push
curHandlers uint32 // number of running handler goroutines
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
streams map[uint32]*stream
unstartedHandlers []unstartedHandler
initialStreamSendWindowSize int32
initialStreamRecvWindowSize int32
maxFrameSize int32
peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
@@ -595,9 +615,14 @@ type serverConn struct {
inGoAway bool // we've started to or sent GOAWAY
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
pingSent bool
sentPingData [8]byte
goAwayCode ErrCode
shutdownTimer *time.Timer // nil until used
idleTimer *time.Timer // nil if unused
shutdownTimer timer // nil until used
idleTimer timer // nil if unused
readIdleTimeout time.Duration
pingTimeout time.Duration
readIdleTimer timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -612,11 +637,7 @@ func (sc *serverConn) maxHeaderListSize() uint32 {
if n <= 0 {
n = http.DefaultMaxHeaderBytes
}
// http2's count is in a slightly different unit and includes 32 bytes per pair.
// So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
const perFieldOverhead = 32 // per http2 spec
const typicalHeaders = 10 // conservative
return uint32(n + typicalHeaders*perFieldOverhead)
return uint32(adjustHTTP1MaxHeaderSize(int64(n)))
}
func (sc *serverConn) curOpenStreams() uint32 {
@@ -646,12 +667,12 @@ type stream struct {
flow outflow // limits writing from Handler to client
inflow inflow // what the client is allowed to POST/etc to us
state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline *time.Timer // nil if unused
writeDeadline *time.Timer // nil if unused
closeErr error // set before cw is closed
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline timer // nil if unused
writeDeadline timer // nil if unused
closeErr error // set before cw is closed
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
@@ -729,11 +750,7 @@ func isClosedConnError(err error) bool {
return false
}
// TODO: remove this string search and be more like the Windows
// case below. That might involve modifying the standard library
// to return better error types.
str := err.Error()
if strings.Contains(str, "use of closed network connection") {
if errors.Is(err, net.ErrClosed) {
return true
}
@@ -812,8 +829,9 @@ type readFrameResult struct {
// consumer is done with the frame.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() {
gate := make(gate)
gateDone := gate.Done
sc.srv.markNewGoroutine()
gate := make(chan struct{})
gateDone := func() { gate <- struct{}{} }
for {
f, err := sc.framer.ReadFrame()
select {
@@ -844,6 +862,7 @@ type frameWriteResult struct {
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
sc.srv.markNewGoroutine()
var err error
if wd == nil {
err = wr.write.writeFrame(sc)
@@ -882,7 +901,7 @@ func (sc *serverConn) notePanic() {
}
}
func (sc *serverConn) serve() {
func (sc *serverConn) serve(conf http2Config) {
sc.serveG.check()
defer sc.notePanic()
defer sc.conn.Close()
@@ -896,18 +915,18 @@ func (sc *serverConn) serve() {
sc.writeFrame(FrameWriteRequest{
write: writeSettings{
{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
{SettingMaxFrameSize, conf.MaxReadFrameSize},
{SettingMaxConcurrentStreams, sc.advMaxStreams},
{SettingMaxHeaderListSize, sc.maxHeaderListSize()},
{SettingHeaderTableSize, sc.srv.maxDecoderHeaderTableSize()},
{SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
{SettingHeaderTableSize, conf.MaxDecoderHeaderTableSize},
{SettingInitialWindowSize, uint32(sc.initialStreamRecvWindowSize)},
},
})
sc.unackedSettings++
// Each connection starts with initialWindowSize inflow tokens.
// If a higher value is configured, we add more tokens.
if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
if diff := conf.MaxUploadBufferPerConnection - initialWindowSize; diff > 0 {
sc.sendWindowUpdate(nil, int(diff))
}
@@ -922,16 +941,23 @@ func (sc *serverConn) serve() {
sc.setConnState(http.StateActive)
sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout != 0 {
sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
if sc.srv.IdleTimeout > 0 {
sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop()
}
if conf.SendPingTimeout > 0 {
sc.readIdleTimeout = conf.SendPingTimeout
sc.readIdleTimer = sc.srv.afterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
defer sc.readIdleTimer.Stop()
}
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
settingsTimer := sc.srv.afterFunc(firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
lastFrameTime := sc.srv.now()
loopNum := 0
for {
loopNum++
@@ -945,6 +971,7 @@ func (sc *serverConn) serve() {
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
lastFrameTime = sc.srv.now()
// Process any written frames before reading new frames from the client since a
// written frame could have triggered a new stream to be started.
if sc.writingFrameAsync {
@@ -976,11 +1003,15 @@ func (sc *serverConn) serve() {
case idleTimerMsg:
sc.vlogf("connection is idle")
sc.goAway(ErrCodeNo)
case readIdleTimerMsg:
sc.handlePingTimer(lastFrameTime)
case shutdownTimerMsg:
sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
return
case gracefulShutdownMsg:
sc.startGracefulShutdownInternal()
case handlerDoneMsg:
sc.handlerDone()
default:
panic("unknown timer")
}
@@ -996,7 +1027,7 @@ func (sc *serverConn) serve() {
// If the peer is causing us to generate a lot of control frames,
// but not reading them from us, assume they are trying to make us
// run out of memory.
if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
if sc.queuedControlFrames > maxQueuedControlFrames {
sc.vlogf("http2: too many control frames in send queue, closing connection")
return
}
@@ -1012,12 +1043,30 @@ func (sc *serverConn) serve() {
}
}
func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
select {
case <-sc.doneServing:
case <-sharedCh:
close(privateCh)
func (sc *serverConn) handlePingTimer(lastFrameReadTime time.Time) {
if sc.pingSent {
sc.vlogf("timeout waiting for PING response")
sc.conn.Close()
return
}
pingAt := lastFrameReadTime.Add(sc.readIdleTimeout)
now := sc.srv.now()
if pingAt.After(now) {
// We received frames since arming the ping timer.
// Reset it for the next possible timeout.
sc.readIdleTimer.Reset(pingAt.Sub(now))
return
}
sc.pingSent = true
// Ignore crypto/rand.Read errors: It generally can't fail, and worse case if it does
// is we send a PING frame containing 0s.
_, _ = rand.Read(sc.sentPingData[:])
sc.writeFrame(FrameWriteRequest{
write: &writePing{data: sc.sentPingData},
})
sc.readIdleTimer.Reset(sc.pingTimeout)
}
type serverMessage int
@@ -1026,12 +1075,15 @@ type serverMessage int
var (
settingsTimerMsg = new(serverMessage)
idleTimerMsg = new(serverMessage)
readIdleTimerMsg = new(serverMessage)
shutdownTimerMsg = new(serverMessage)
gracefulShutdownMsg = new(serverMessage)
handlerDoneMsg = new(serverMessage)
)
func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
func (sc *serverConn) onReadIdleTimer() { sc.sendServeMsg(readIdleTimerMsg) }
func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
func (sc *serverConn) sendServeMsg(msg interface{}) {
@@ -1063,10 +1115,10 @@ func (sc *serverConn) readPreface() error {
errc <- nil
}
}()
timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
timer := sc.srv.newTimer(prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
case <-timer.C:
case <-timer.C():
return errPrefaceTimeout
case err := <-errc:
if err == nil {
@@ -1284,6 +1336,10 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
sc.writingFrame = false
sc.writingFrameAsync = false
if res.err != nil {
sc.conn.Close()
}
wr := res.wr
if writeEndsStream(wr.write) {
@@ -1431,7 +1487,7 @@ func (sc *serverConn) goAway(code ErrCode) {
func (sc *serverConn) shutDownIn(d time.Duration) {
sc.serveG.check()
sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer)
}
func (sc *serverConn) resetStream(se StreamError) {
@@ -1484,6 +1540,11 @@ func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
sc.goAway(ErrCodeFlowControl)
return true
case ConnectionError:
if res.f != nil {
if id := res.f.Header().StreamID; id > sc.maxClientStreamID {
sc.maxClientStreamID = id
}
}
sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
sc.goAway(ErrCode(ev))
return true // goAway will handle shutdown
@@ -1553,6 +1614,11 @@ func (sc *serverConn) processFrame(f Frame) error {
func (sc *serverConn) processPing(f *PingFrame) error {
sc.serveG.check()
if f.IsAck() {
if sc.pingSent && sc.sentPingData == f.Data {
// This is a response to a PING we sent.
sc.pingSent = false
sc.readIdleTimer.Reset(sc.readIdleTimeout)
}
// 6.7 PING: " An endpoint MUST NOT respond to PING frames
// containing this flag."
return nil
@@ -1640,7 +1706,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
delete(sc.streams, st.id)
if len(sc.streams) == 0 {
sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout != 0 {
if sc.srv.IdleTimeout > 0 && sc.idleTimer != nil {
sc.idleTimer.Reset(sc.srv.IdleTimeout)
}
if h1ServerKeepAlivesDisabled(sc.hs) {
@@ -1662,6 +1728,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
}
}
st.closeErr = err
st.cancelCtx()
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
sc.writeSched.CloseStream(st.id)
}
@@ -1900,9 +1967,11 @@ func (st *stream) copyTrailersToHandlerRequest() {
// onReadTimeout is run on its own goroutine (from time.AfterFunc)
// when the stream's ReadTimeout has fired.
func (st *stream) onReadTimeout() {
// Wrap the ErrDeadlineExceeded to avoid callers depending on us
// returning the bare error.
st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
if st.body != nil {
// Wrap the ErrDeadlineExceeded to avoid callers depending on us
// returning the bare error.
st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
}
}
// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
@@ -2018,15 +2087,12 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// similar to how the http1 server works. Here it's
// technically more like the http1 Server's ReadHeaderTimeout
// (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout != 0 {
if sc.hs.ReadTimeout > 0 {
sc.conn.SetReadDeadline(time.Time{})
if st.body != nil {
st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
}
st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
}
go sc.runHandler(rw, req, handler)
return nil
return sc.scheduleHandler(id, rw, req, handler)
}
func (sc *serverConn) upgradeRequest(req *http.Request) {
@@ -2042,10 +2108,14 @@ func (sc *serverConn) upgradeRequest(req *http.Request) {
// Disable any read deadline set by the net/http package
// prior to the upgrade.
if sc.hs.ReadTimeout != 0 {
if sc.hs.ReadTimeout > 0 {
sc.conn.SetReadDeadline(time.Time{})
}
// This is the first request on the connection,
// so start the handler directly rather than going
// through scheduleHandler.
sc.curHandlers++
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
}
@@ -2115,9 +2185,9 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
st.cw.Init()
st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.init(sc.srv.initialStreamRecvWindowSize())
if sc.hs.WriteTimeout != 0 {
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
st.inflow.init(sc.initialStreamRecvWindowSize)
if sc.hs.WriteTimeout > 0 {
st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
sc.streams[id] = st
@@ -2286,8 +2356,63 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response
return &responseWriter{rws: rws}
}
type unstartedHandler struct {
streamID uint32
rw *responseWriter
req *http.Request
handler func(http.ResponseWriter, *http.Request)
}
// scheduleHandler starts a handler goroutine,
// or schedules one to start as soon as an existing handler finishes.
func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error {
sc.serveG.check()
maxHandlers := sc.advMaxStreams
if sc.curHandlers < maxHandlers {
sc.curHandlers++
go sc.runHandler(rw, req, handler)
return nil
}
if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
}
sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
streamID: streamID,
rw: rw,
req: req,
handler: handler,
})
return nil
}
func (sc *serverConn) handlerDone() {
sc.serveG.check()
sc.curHandlers--
i := 0
maxHandlers := sc.advMaxStreams
for ; i < len(sc.unstartedHandlers); i++ {
u := sc.unstartedHandlers[i]
if sc.streams[u.streamID] == nil {
// This stream was reset before its goroutine had a chance to start.
continue
}
if sc.curHandlers >= maxHandlers {
break
}
sc.curHandlers++
go sc.runHandler(u.rw, u.req, u.handler)
sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references
}
sc.unstartedHandlers = sc.unstartedHandlers[i:]
if len(sc.unstartedHandlers) == 0 {
sc.unstartedHandlers = nil
}
}
// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
sc.srv.markNewGoroutine()
defer sc.sendServeMsg(handlerDoneMsg)
didPanic := true
defer func() {
rw.rws.stream.cancelCtx()
@@ -2429,7 +2554,7 @@ type requestBody struct {
conn *serverConn
closeOnce sync.Once // for use by Close only
sawEOF bool // for use by Read only
pipe *pipe // non-nil if we have a HTTP entity message body
pipe *pipe // non-nil if we have an HTTP entity message body
needsContinue bool // need to send a 100-continue
}
@@ -2495,7 +2620,6 @@ type responseWriterState struct {
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
sentHeader bool // have we sent the header frame?
handlerDone bool // handler has finished
dirty bool // a Write failed; don't reuse this responseWriterState
sentContentLen int64 // non-zero if handler set a Content-Length header
wroteBytes int64
@@ -2569,7 +2693,8 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
clen = ""
}
}
if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
_, hasContentLength := rws.snapHeader["Content-Length"]
if !hasContentLength && clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
clen = strconv.Itoa(len(p))
}
_, hasContentType := rws.snapHeader["Content-Type"]
@@ -2583,7 +2708,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
var date string
if _, ok := rws.snapHeader["Date"]; !ok {
// TODO(bradfitz): be faster here, like net/http? measure.
date = time.Now().UTC().Format(http.TimeFormat)
date = rws.conn.srv.now().UTC().Format(http.TimeFormat)
}
for _, v := range rws.snapHeader["Trailer"] {
@@ -2614,7 +2739,6 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
date: date,
})
if err != nil {
rws.dirty = true
return 0, err
}
if endStream {
@@ -2635,7 +2759,6 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
if len(p) > 0 || endStream {
// only send a 0 byte DATA frame if we're ending the stream.
if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
rws.dirty = true
return 0, err
}
}
@@ -2647,9 +2770,6 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
trailers: rws.trailers,
endStream: true,
})
if err != nil {
rws.dirty = true
}
return len(p), err
}
return len(p), nil
@@ -2710,7 +2830,7 @@ func (rws *responseWriterState) promoteUndeclaredTrailers() {
func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
st := w.rws.stream
if !deadline.IsZero() && deadline.Before(time.Now()) {
if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onReadTimeout()
@@ -2726,9 +2846,9 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.readDeadline = nil
} else if st.readDeadline == nil {
st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout)
} else {
st.readDeadline.Reset(deadline.Sub(time.Now()))
st.readDeadline.Reset(deadline.Sub(sc.srv.now()))
}
})
return nil
@@ -2736,7 +2856,7 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
st := w.rws.stream
if !deadline.IsZero() && deadline.Before(time.Now()) {
if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onWriteTimeout()
@@ -2752,9 +2872,9 @@ func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.writeDeadline = nil
} else if st.writeDeadline == nil {
st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout)
} else {
st.writeDeadline.Reset(deadline.Sub(time.Now()))
st.writeDeadline.Reset(deadline.Sub(sc.srv.now()))
}
})
return nil
@@ -2774,7 +2894,7 @@ func (w *responseWriter) FlushError() error {
err = rws.bw.Flush()
} else {
// The bufio.Writer won't call chunkWriter.Write
// (writeChunk with zero bytes, so we have to do it
// (writeChunk with zero bytes), so we have to do it
// ourselves to force the HTTP response header and/or
// final DATA frame (with END_STREAM) to be sent.
_, err = chunkWriter{rws}.Write(nil)
@@ -2865,14 +2985,12 @@ func (rws *responseWriterState) writeHeader(code int) {
h.Del("Transfer-Encoding")
}
if rws.conn.writeHeaders(rws.stream, &writeResHeaders{
rws.conn.writeHeaders(rws.stream, &writeResHeaders{
streamID: rws.stream.id,
httpResCode: code,
h: h,
endStream: rws.handlerDone && !rws.hasTrailers(),
}) != nil {
rws.dirty = true
}
})
return
}
@@ -2937,19 +3055,10 @@ func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int,
func (w *responseWriter) handlerDone() {
rws := w.rws
dirty := rws.dirty
rws.handlerDone = true
w.Flush()
w.rws = nil
if !dirty {
// Only recycle the pool if all prior Write calls to
// the serverConn goroutine completed successfully. If
// they returned earlier due to resets from the peer
// there might still be write goroutines outstanding
// from the serverConn referencing the rws memory. See
// issue 20704.
responseWriterStatePool.Put(rws)
}
responseWriterStatePool.Put(rws)
}
// Push errors.
@@ -3132,6 +3241,7 @@ func (sc *serverConn) startPush(msg *startPushRequest) {
panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
}
sc.curHandlers++
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
return promisedID, nil
}
@@ -3216,7 +3326,7 @@ func (sc *serverConn) countError(name string, err error) error {
if sc == nil || sc.srv == nil {
return err
}
f := sc.srv.CountError
f := sc.countErrorFunc
if f == nil {
return err
}

View File

@@ -19,12 +19,12 @@ import (
"io/fs"
"log"
"math"
"math/bits"
mathrand "math/rand"
"net"
"net/http"
"net/http/httptrace"
"net/textproto"
"os"
"sort"
"strconv"
"strings"
@@ -146,6 +146,12 @@ type Transport struct {
// waiting for their turn.
StrictMaxConcurrentStreams bool
// IdleConnTimeout is the maximum amount of time an idle
// (keep-alive) connection will remain idle before closing
// itself.
// Zero means no limit.
IdleConnTimeout time.Duration
// ReadIdleTimeout is the timeout after which a health check using ping
// frame will be carried out if no frame is received on the connection.
// Note that a ping response will is considered a received frame, so if
@@ -177,43 +183,69 @@ type Transport struct {
connPoolOnce sync.Once
connPoolOrDef ClientConnPool // non-nil version of ConnPool
*transportTestHooks
}
// Hook points used for testing.
// Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
// Inside tests, see the testSyncHooks function docs.
type transportTestHooks struct {
newclientconn func(*ClientConn)
group synctestGroupInterface
}
func (t *Transport) markNewGoroutine() {
if t != nil && t.transportTestHooks != nil {
t.transportTestHooks.group.Join()
}
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (t *Transport) newTimer(d time.Duration) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (t *Transport) afterFunc(d time.Duration, f func()) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
}
func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.ContextWithTimeout(ctx, d)
}
return context.WithTimeout(ctx, d)
}
func (t *Transport) maxHeaderListSize() uint32 {
if t.MaxHeaderListSize == 0 {
n := int64(t.MaxHeaderListSize)
if t.t1 != nil && t.t1.MaxResponseHeaderBytes != 0 {
n = t.t1.MaxResponseHeaderBytes
if n > 0 {
n = adjustHTTP1MaxHeaderSize(n)
}
}
if n <= 0 {
return 10 << 20
}
if t.MaxHeaderListSize == 0xffffffff {
if n >= 0xffffffff {
return 0
}
return t.MaxHeaderListSize
}
func (t *Transport) maxFrameReadSize() uint32 {
if t.MaxReadFrameSize == 0 {
return 0 // use the default provided by the peer
}
if t.MaxReadFrameSize < minMaxFrameSize {
return minMaxFrameSize
}
if t.MaxReadFrameSize > maxFrameSize {
return maxFrameSize
}
return t.MaxReadFrameSize
return uint32(n)
}
func (t *Transport) disableCompression() bool {
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
}
func (t *Transport) pingTimeout() time.Duration {
if t.PingTimeout == 0 {
return 15 * time.Second
}
return t.PingTimeout
}
// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
// It returns an error if t1 has already been HTTP/2-enabled.
//
@@ -290,8 +322,7 @@ func (t *Transport) initConnPool() {
// HTTP/2 server.
type ClientConn struct {
t *Transport
tconn net.Conn // usually *tls.Conn, except specialized impls
tconnClosed bool
tconn net.Conn // usually *tls.Conn, except specialized impls
tlsState *tls.ConnectionState // nil only for specialized impls
reused uint32 // whether conn is being reused; atomic
singleUse bool // whether being used for a single http.Request
@@ -302,7 +333,7 @@ type ClientConn struct {
readerErr error // set before readerDone is closed
idleTimeout time.Duration // or 0 for never
idleTimer *time.Timer
idleTimer timer
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
@@ -324,11 +355,14 @@ type ClientConn struct {
lastActive time.Time
lastIdle time.Time // time last idle
// Settings from peer: (also guarded by wmu)
maxFrameSize uint32
maxConcurrentStreams uint32
peerMaxHeaderListSize uint64
peerMaxHeaderTableSize uint32
initialWindowSize uint32
maxFrameSize uint32
maxConcurrentStreams uint32
peerMaxHeaderListSize uint64
peerMaxHeaderTableSize uint32
initialWindowSize uint32
initialStreamRecvWindowSize int32
readIdleTimeout time.Duration
pingTimeout time.Duration
// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
// Write to reqHeaderMu to lock it, read from it to unlock.
@@ -446,12 +480,14 @@ func (cs *clientStream) closeReqBodyLocked() {
cs.reqBodyClosed = make(chan struct{})
reqBodyClosed := cs.reqBodyClosed
go func() {
cs.cc.t.markNewGoroutine()
cs.reqBody.Close()
close(reqBodyClosed)
}()
}
type stickyErrWriter struct {
group synctestGroupInterface
conn net.Conn
timeout time.Duration
err *error
@@ -461,22 +497,9 @@ func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
for {
if sew.timeout != 0 {
sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout))
}
nn, err := sew.conn.Write(p[n:])
n += nn
if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) {
// Keep extending the deadline so long as we're making progress.
continue
}
if sew.timeout != 0 {
sew.conn.SetWriteDeadline(time.Time{})
}
*sew.err = err
return n, err
}
n, err = writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p)
*sew.err = err
return n, err
}
// noCachedConnError is the concrete type of ErrNoCachedConn, which
@@ -518,11 +541,14 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
func authorityAddr(scheme string, authority string) (addr string) {
host, port, err := net.SplitHostPort(authority)
if err != nil { // authority didn't have a port
host = authority
port = ""
}
if port == "" { // authority's port was empty
port = "443"
if scheme == "http" {
port = "80"
}
host = authority
}
if a, err := idna.ToASCII(host); err == nil {
host = a
@@ -534,15 +560,6 @@ func authorityAddr(scheme string, authority string) (addr string) {
return net.JoinHostPort(host, port)
}
var retryBackoffHook func(time.Duration) *time.Timer
func backoffNewTimer(d time.Duration) *time.Timer {
if retryBackoffHook != nil {
return retryBackoffHook(d)
}
return time.NewTimer(d)
}
// RoundTripOpt is like RoundTrip, but takes options.
func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
@@ -570,13 +587,13 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64())
d := time.Second * time.Duration(backoff)
timer := backoffNewTimer(d)
tm := t.newTimer(d)
select {
case <-timer.C:
case <-tm.C():
t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
continue
case <-req.Context().Done():
timer.Stop()
tm.Stop()
err = req.Context().Err()
}
}
@@ -655,6 +672,9 @@ func canRetryError(err error) bool {
}
func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
if t.transportTestHooks != nil {
return t.newClientConn(nil, singleUse)
}
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
@@ -714,43 +734,36 @@ func (t *Transport) expectContinueTimeout() time.Duration {
return t.t1.ExpectContinueTimeout
}
func (t *Transport) maxDecoderHeaderTableSize() uint32 {
if v := t.MaxDecoderHeaderTableSize; v > 0 {
return v
}
return initialHeaderTableSize
}
func (t *Transport) maxEncoderHeaderTableSize() uint32 {
if v := t.MaxEncoderHeaderTableSize; v > 0 {
return v
}
return initialHeaderTableSize
}
func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
return t.newClientConn(c, t.disableKeepAlives())
}
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
conf := configFromTransport(t)
cc := &ClientConn{
t: t,
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
t: t,
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
readIdleTimeout: conf.SendPingTimeout,
pingTimeout: conf.PingTimeout,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
}
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
var group synctestGroupInterface
if t.transportTestHooks != nil {
t.markNewGoroutine()
t.transportTestHooks.newclientconn(cc)
c = cc.tconn
group = t.group
}
if VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
@@ -762,30 +775,25 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(stickyErrWriter{
group: group,
conn: c,
timeout: t.WriteByteTimeout,
timeout: conf.WriteByteTimeout,
err: &cc.werr,
})
cc.br = bufio.NewReader(c)
cc.fr = NewFramer(cc.bw, cc.br)
if t.maxFrameReadSize() != 0 {
cc.fr.SetMaxReadFrameSize(t.maxFrameReadSize())
}
cc.fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
if t.CountError != nil {
cc.fr.countError = t.CountError
}
maxHeaderTableSize := t.maxDecoderHeaderTableSize()
maxHeaderTableSize := conf.MaxDecoderHeaderTableSize
cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
cc.henc = hpack.NewEncoder(&cc.hbuf)
cc.henc.SetMaxDynamicTableSizeLimit(t.maxEncoderHeaderTableSize())
cc.henc.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
cc.peerMaxHeaderTableSize = initialHeaderTableSize
if t.AllowHTTP {
cc.nextStreamID = 3
}
if cs, ok := c.(connectionStater); ok {
state := cs.ConnectionState()
cc.tlsState = &state
@@ -793,11 +801,9 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
initialSettings := []Setting{
{ID: SettingEnablePush, Val: 0},
{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
}
if max := t.maxFrameReadSize(); max != 0 {
initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: max})
{ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
}
initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: conf.MaxReadFrameSize})
if max := t.maxHeaderListSize(); max != 0 {
initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
}
@@ -807,23 +813,29 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
cc.bw.Write(clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
cc.inflow.init(transportDefaultConnFlow + initialWindowSize)
cc.fr.WriteWindowUpdate(0, uint32(conf.MaxUploadBufferPerConnection))
cc.inflow.init(conf.MaxUploadBufferPerConnection + initialWindowSize)
cc.bw.Flush()
if cc.werr != nil {
cc.Close()
return nil, cc.werr
}
// Start the idle timer after the connection is fully initialized.
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
}
go cc.readLoop()
return cc, nil
}
func (cc *ClientConn) healthCheck() {
pingTimeout := cc.t.pingTimeout()
pingTimeout := cc.pingTimeout
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received.
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
defer cancel()
cc.vlogf("http2: Transport sending health check")
err := cc.Ping(ctx)
@@ -858,7 +870,20 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
}
last := f.LastStreamID
for streamID, cs := range cc.streams {
if streamID > last {
if streamID <= last {
// The server's GOAWAY indicates that it received this stream.
// It will either finish processing it, or close the connection
// without doing so. Either way, leave the stream alone for now.
continue
}
if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
// Don't retry the first stream on a connection if we get a non-NO error.
// If the server is sending an error on a new connection,
// retrying the request on a new one probably isn't going to work.
cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
} else {
// Aborting the stream with errClentConnGotGoAway indicates that
// the request should be retried on a new connection.
cs.abortStreamLocked(errClientConnGotGoAway)
}
}
@@ -1015,7 +1040,7 @@ func (cc *ClientConn) forceCloseConn() {
if !ok {
return
}
if nc := tlsUnderlyingConn(tc); nc != nil {
if nc := tc.NetConn(); nc != nil {
nc.Close()
}
}
@@ -1054,6 +1079,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
done := make(chan struct{})
cancelled := false // guarded by cc.mu
go func() {
cc.t.markNewGoroutine()
cc.mu.Lock()
defer cc.mu.Unlock()
for {
@@ -1212,6 +1238,10 @@ func (cc *ClientConn) decrStreamReservationsLocked() {
}
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
return cc.roundTrip(req, nil)
}
func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) (*http.Response, error) {
ctx := req.Context()
cs := &clientStream{
cc: cc,
@@ -1226,7 +1256,28 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
respHeaderRecv: make(chan struct{}),
donec: make(chan struct{}),
}
go cs.doRequest(req)
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
!cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// http://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
cs.requestedGzip = true
}
go cs.doRequest(req, streamf)
waitDone := func() error {
select {
@@ -1268,21 +1319,23 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
cancelRequest := func(cs *clientStream, err error) error {
cs.cc.mu.Lock()
defer cs.cc.mu.Unlock()
cs.abortStreamLocked(err)
if cs.ID != 0 {
// This request may have failed because of a problem with the connection,
// or for some unrelated reason. (For example, the user might have canceled
// the request without waiting for a response.) Mark the connection as
// not reusable, since trying to reuse a dead connection is worse than
// unnecessarily creating a new one.
//
// If cs.ID is 0, then the request was never allocated a stream ID and
// whatever went wrong was unrelated to the connection. We might have
// timed out waiting for a stream slot when StrictMaxConcurrentStreams
// is set, for example, in which case retrying on a different connection
// will not help.
cs.cc.doNotReuse = true
bodyClosed := cs.reqBodyClosed
cs.cc.mu.Unlock()
// Wait for the request body to be closed.
//
// If nothing closed the body before now, abortStreamLocked
// will have started a goroutine to close it.
//
// Closing the body before returning avoids a race condition
// with net/http checking its readTrackingBody to see if the
// body was read from or closed. See golang/go#60041.
//
// The body is closed in a separate goroutine without the
// connection mutex held, but dropping the mutex before waiting
// will keep us from holding it indefinitely if the body
// close is slow for some reason.
if bodyClosed != nil {
<-bodyClosed
}
return err
}
@@ -1301,11 +1354,14 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
return handleResponseHeaders()
default:
waitDone()
return nil, cancelRequest(cs, cs.abortErr)
return nil, cs.abortErr
}
case <-ctx.Done():
return nil, cancelRequest(cs, ctx.Err())
err := ctx.Err()
cs.abortStream(err)
return nil, cancelRequest(cs, err)
case <-cs.reqCancel:
cs.abortStream(errRequestCanceled)
return nil, cancelRequest(cs, errRequestCanceled)
}
}
@@ -1314,8 +1370,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
func (cs *clientStream) doRequest(req *http.Request) {
err := cs.writeRequest(req)
func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
cs.cc.t.markNewGoroutine()
err := cs.writeRequest(req, streamf)
cs.cleanupWriteRequest(err)
}
@@ -1326,7 +1383,7 @@ func (cs *clientStream) doRequest(req *http.Request) {
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
func (cs *clientStream) writeRequest(req *http.Request) (err error) {
func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStream)) (err error) {
cc := cs.cc
ctx := cs.ctx
@@ -1364,24 +1421,8 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
}
cc.mu.Unlock()
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
!cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// http://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
cs.requestedGzip = true
if streamf != nil {
streamf(cs)
}
continueTimeout := cc.t.expectContinueTimeout()
@@ -1444,9 +1485,9 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
var respHeaderTimer <-chan time.Time
var respHeaderRecv chan struct{}
if d := cc.responseHeaderTimeout(); d != 0 {
timer := time.NewTimer(d)
timer := cc.t.newTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C
respHeaderTimer = timer.C()
respHeaderRecv = cs.respHeaderRecv
}
// Wait until the peer half-closes its end of the stream,
@@ -1672,7 +1713,27 @@ func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
return int(n) // doesn't truncate; max is 512K
}
var bufPool sync.Pool // of *[]byte
// Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
// streaming requests using small frame sizes occupy large buffers initially allocated for prior
// requests needing big buffers. The size ranges are as follows:
// {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
// {256 KB, 512 KB], {512 KB, infinity}
// In practice, the maximum scratch buffer size should not exceed 512 KB due to
// frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
// It exists mainly as a safety measure, for potential future increases in max buffer size.
var bufPools [7]sync.Pool // of *[]byte
func bufPoolIndex(size int) int {
if size <= 16384 {
return 0
}
size -= 1
bits := bits.Len(uint(size))
index := bits - 14
if index >= len(bufPools) {
return len(bufPools) - 1
}
return index
}
func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
@@ -1690,12 +1751,13 @@ func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
// Scratch buffer for reading into & writing from.
scratchLen := cs.frameScratchBufferLen(maxFrameSize)
var buf []byte
if bp, ok := bufPool.Get().(*[]byte); ok && len(*bp) >= scratchLen {
defer bufPool.Put(bp)
index := bufPoolIndex(scratchLen)
if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
defer bufPools[index].Put(bp)
buf = *bp
} else {
buf = make([]byte, scratchLen)
defer bufPool.Put(&buf)
defer bufPools[index].Put(&buf)
}
var sawEOF bool
@@ -1846,6 +1908,22 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
}
}
func validateHeaders(hdrs http.Header) string {
for k, vv := range hdrs {
if !httpguts.ValidHeaderFieldName(k) {
return fmt.Sprintf("name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
// Don't include the value in the error,
// because it may be sensitive.
return fmt.Sprintf("value for header %q", k)
}
}
}
return ""
}
var errNilRequestURL = errors.New("http2: Request.URI is nil")
// requires cc.wmu be held.
@@ -1863,6 +1941,9 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
if err != nil {
return nil, err
}
if !httpguts.ValidHostHeader(host) {
return nil, errors.New("http2: invalid Host header")
}
var path string
if req.Method != "CONNECT" {
@@ -1880,26 +1961,21 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
}
}
// Check for any invalid headers and return an error before we
// Check for any invalid headers+trailers and return an error before we
// potentially pollute our hpack state. (We want to be able to
// continue to reuse the hpack encoder for future requests)
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
return nil, fmt.Errorf("invalid HTTP header name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
// Don't include the value in the error, because it may be sensitive.
return nil, fmt.Errorf("invalid HTTP header value for header %q", k)
}
}
if err := validateHeaders(req.Header); err != "" {
return nil, fmt.Errorf("invalid HTTP header %s", err)
}
if err := validateHeaders(req.Trailer); err != "" {
return nil, fmt.Errorf("invalid HTTP trailer %s", err)
}
enumerateHeaders := func(f func(name, value string)) {
// 8.1.2.3 Request Pseudo-Header Fields
// The :path pseudo-header field includes the path and query parts of the
// target URI (the path-absolute production and optionally a '?' character
// followed by the query production (see Sections 3.3 and 3.4 of
// followed by the query production, see Sections 3.3 and 3.4 of
// [RFC3986]).
f(":authority", host)
m := req.Method
@@ -2088,7 +2164,7 @@ type resAndError struct {
func (cc *ClientConn) addStreamLocked(cs *clientStream) {
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
cs.inflow.init(transportDefaultStreamFlow)
cs.inflow.init(cc.initialStreamRecvWindowSize)
cs.ID = cc.nextStreamID
cc.nextStreamID += 2
cc.streams[cs.ID] = cs
@@ -2133,6 +2209,7 @@ type clientConnReadLoop struct {
// readLoop runs in its own goroutine and reads and dispatches frames.
func (cc *ClientConn) readLoop() {
cc.t.markNewGoroutine()
rl := &clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
@@ -2233,11 +2310,10 @@ func (cc *ClientConn) countReadFrameError(err error) {
func (rl *clientConnReadLoop) run() error {
cc := rl.cc
gotSettings := false
readIdleTimeout := cc.t.ReadIdleTimeout
var t *time.Timer
readIdleTimeout := cc.readIdleTimeout
var t timer
if readIdleTimeout != 0 {
t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
defer t.Stop()
t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
}
for {
f, err := cc.fr.ReadFrame()
@@ -2652,7 +2728,7 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
})
return nil
}
if !cs.firstByte {
if !cs.pastHeaders {
cc.logf("protocol error: received DATA before a HEADERS frame")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
@@ -2879,6 +2955,15 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
fl = &cs.flow
}
if !fl.add(int32(f.Increment)) {
// For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
if cs != nil {
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
Code: ErrCodeFlowControl,
})
return nil
}
return ConnectionError(ErrCodeFlowControl)
}
cc.cond.Broadcast()
@@ -2923,24 +3008,26 @@ func (cc *ClientConn) Ping(ctx context.Context) error {
}
cc.mu.Unlock()
}
errc := make(chan error, 1)
var pingError error
errc := make(chan struct{})
go func() {
cc.t.markNewGoroutine()
cc.wmu.Lock()
defer cc.wmu.Unlock()
if err := cc.fr.WritePing(false, p); err != nil {
errc <- err
if pingError = cc.fr.WritePing(false, p); pingError != nil {
close(errc)
return
}
if err := cc.bw.Flush(); err != nil {
errc <- err
if pingError = cc.bw.Flush(); pingError != nil {
close(errc)
return
}
}()
select {
case <-c:
return nil
case err := <-errc:
return err
case <-errc:
return pingError
case <-ctx.Done():
return ctx.Err()
case <-cc.readerDone:
@@ -3109,9 +3196,17 @@ func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, err
}
func (t *Transport) idleConnTimeout() time.Duration {
// to keep things backwards compatible, we use non-zero values of
// IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
// http1 transport, followed by 0
if t.IdleConnTimeout != 0 {
return t.IdleConnTimeout
}
if t.t1 != nil {
return t.t1.IdleConnTimeout
}
return 0
}
@@ -3169,3 +3264,34 @@ func traceFirstResponseByte(trace *httptrace.ClientTrace) {
trace.GotFirstResponseByte()
}
}
func traceHasWroteHeaderField(trace *httptrace.ClientTrace) bool {
return trace != nil && trace.WroteHeaderField != nil
}
func traceWroteHeaderField(trace *httptrace.ClientTrace, k, v string) {
if trace != nil && trace.WroteHeaderField != nil {
trace.WroteHeaderField(k, []string{v})
}
}
func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
if trace != nil {
return trace.Got1xxResponse
}
return nil
}
// dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
// connection.
func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
dialer := &tls.Dialer{
Config: cfg,
}
cn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
return tlsCn, nil
}

View File

@@ -131,6 +131,16 @@ func (se StreamError) writeFrame(ctx writeContext) error {
func (se StreamError) staysWithinBuffer(max int) bool { return frameHeaderLen+4 <= max }
type writePing struct {
data [8]byte
}
func (w writePing) writeFrame(ctx writeContext) error {
return ctx.Framer().WritePing(false, w.data)
}
func (w writePing) staysWithinBuffer(max int) bool { return frameHeaderLen+len(w.data) <= max }
type writePingAck struct{ pf *PingFrame }
func (w writePingAck) writeFrame(ctx writeContext) error {

View File

@@ -184,7 +184,8 @@ func (wr *FrameWriteRequest) replyToWriter(err error) {
// writeQueue is used by implementations of WriteScheduler.
type writeQueue struct {
s []FrameWriteRequest
s []FrameWriteRequest
prev, next *writeQueue
}
func (q *writeQueue) empty() bool { return len(q.s) == 0 }

View File

@@ -443,8 +443,8 @@ func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, max
}
func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
for k := n.kids; k != nil; k = k.next {
k.setParent(n.parent)
for n.kids != nil {
n.kids.setParent(n.parent)
}
n.setParent(nil)
delete(ws.nodes, n.id)