修正依赖问题
This commit is contained in:
223
vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
generated
vendored
223
vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
generated
vendored
@@ -52,6 +52,12 @@ type LookupFunc func(ctx context.Context, host string) (addrs []string, err erro
|
||||
// BuildFrontendFunc is a function that can be used to create Frontend implementation for connection.
|
||||
type BuildFrontendFunc func(r io.Reader, w io.Writer) *pgproto3.Frontend
|
||||
|
||||
// PgErrorHandler is a function that handles errors returned from Postgres. This function must return true to keep
|
||||
// the connection open. Returning false will cause the connection to be closed immediately. You should return
|
||||
// false on any FATAL-severity errors. This will not receive network errors. The *PgConn is provided so the handler is
|
||||
// aware of the origin of the error, but it must not invoke any query method.
|
||||
type PgErrorHandler func(*PgConn, *PgError) bool
|
||||
|
||||
// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at
|
||||
// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin
|
||||
// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY
|
||||
@@ -74,6 +80,7 @@ type PgConn struct {
|
||||
frontend *pgproto3.Frontend
|
||||
bgReader *bgreader.BGReader
|
||||
slowWriteTimer *time.Timer
|
||||
bgReaderStarted chan struct{}
|
||||
|
||||
config *Config
|
||||
|
||||
@@ -145,11 +152,11 @@ func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err er
|
||||
ctx := octx
|
||||
fallbackConfigs, err = expandWithIPs(ctx, config.LookupFunc, fallbackConfigs)
|
||||
if err != nil {
|
||||
return nil, &connectError{config: config, msg: "hostname resolving error", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "hostname resolving error", err: err}
|
||||
}
|
||||
|
||||
if len(fallbackConfigs) == 0 {
|
||||
return nil, &connectError{config: config, msg: "hostname resolving error", err: errors.New("ip addr wasn't found")}
|
||||
return nil, &ConnectError{Config: config, msg: "hostname resolving error", err: errors.New("ip addr wasn't found")}
|
||||
}
|
||||
|
||||
foundBestServer := false
|
||||
@@ -171,7 +178,7 @@ func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err er
|
||||
foundBestServer = true
|
||||
break
|
||||
} else if pgerr, ok := err.(*PgError); ok {
|
||||
err = &connectError{config: config, msg: "server error", err: pgerr}
|
||||
err = &ConnectError{Config: config, msg: "server error", err: pgerr}
|
||||
const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password
|
||||
const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings
|
||||
const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
|
||||
@@ -182,7 +189,7 @@ func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err er
|
||||
pgerr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE {
|
||||
break
|
||||
}
|
||||
} else if cerr, ok := err.(*connectError); ok {
|
||||
} else if cerr, ok := err.(*ConnectError); ok {
|
||||
if _, ok := cerr.err.(*NotPreferredError); ok {
|
||||
fallbackConfig = fc
|
||||
}
|
||||
@@ -192,7 +199,7 @@ func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err er
|
||||
if !foundBestServer && fallbackConfig != nil {
|
||||
pgConn, err = connect(ctx, config, fallbackConfig, true)
|
||||
if pgerr, ok := err.(*PgError); ok {
|
||||
err = &connectError{config: config, msg: "server error", err: pgerr}
|
||||
err = &ConnectError{Config: config, msg: "server error", err: pgerr}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,7 +211,7 @@ func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err er
|
||||
err := config.AfterConnect(ctx, pgConn)
|
||||
if err != nil {
|
||||
pgConn.conn.Close()
|
||||
return nil, &connectError{config: config, msg: "AfterConnect error", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "AfterConnect error", err: err}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,7 +283,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port)
|
||||
netConn, err := config.DialFunc(ctx, network, address)
|
||||
if err != nil {
|
||||
return nil, &connectError{config: config, msg: "dial error", err: normalizeTimeoutError(ctx, err)}
|
||||
return nil, &ConnectError{Config: config, msg: "dial error", err: normalizeTimeoutError(ctx, err)}
|
||||
}
|
||||
|
||||
pgConn.conn = netConn
|
||||
@@ -288,7 +295,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS.
|
||||
if err != nil {
|
||||
netConn.Close()
|
||||
return nil, &connectError{config: config, msg: "tls error", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "tls error", err: normalizeTimeoutError(ctx, err)}
|
||||
}
|
||||
|
||||
pgConn.conn = nbTLSConn
|
||||
@@ -301,8 +308,14 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
pgConn.parameterStatuses = make(map[string]string)
|
||||
pgConn.status = connStatusConnecting
|
||||
pgConn.bgReader = bgreader.New(pgConn.conn)
|
||||
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
|
||||
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
|
||||
func() {
|
||||
pgConn.bgReader.Start()
|
||||
pgConn.bgReaderStarted <- struct{}{}
|
||||
},
|
||||
)
|
||||
pgConn.slowWriteTimer.Stop()
|
||||
pgConn.bgReaderStarted = make(chan struct{})
|
||||
pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn)
|
||||
|
||||
startupMsg := pgproto3.StartupMessage{
|
||||
@@ -323,7 +336,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
pgConn.frontend.Send(&startupMsg)
|
||||
if err := pgConn.flushWithPotentialWriteReadDeadlock(); err != nil {
|
||||
pgConn.conn.Close()
|
||||
return nil, &connectError{config: config, msg: "failed to write startup message", err: normalizeTimeoutError(ctx, err)}
|
||||
return nil, &ConnectError{Config: config, msg: "failed to write startup message", err: normalizeTimeoutError(ctx, err)}
|
||||
}
|
||||
|
||||
for {
|
||||
@@ -333,7 +346,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
if err, ok := err.(*PgError); ok {
|
||||
return nil, err
|
||||
}
|
||||
return nil, &connectError{config: config, msg: "failed to receive message", err: normalizeTimeoutError(ctx, err)}
|
||||
return nil, &ConnectError{Config: config, msg: "failed to receive message", err: normalizeTimeoutError(ctx, err)}
|
||||
}
|
||||
|
||||
switch msg := msg.(type) {
|
||||
@@ -346,26 +359,26 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
err = pgConn.txPasswordMessage(pgConn.config.Password)
|
||||
if err != nil {
|
||||
pgConn.conn.Close()
|
||||
return nil, &connectError{config: config, msg: "failed to write password message", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "failed to write password message", err: err}
|
||||
}
|
||||
case *pgproto3.AuthenticationMD5Password:
|
||||
digestedPassword := "md5" + hexMD5(hexMD5(pgConn.config.Password+pgConn.config.User)+string(msg.Salt[:]))
|
||||
err = pgConn.txPasswordMessage(digestedPassword)
|
||||
if err != nil {
|
||||
pgConn.conn.Close()
|
||||
return nil, &connectError{config: config, msg: "failed to write password message", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "failed to write password message", err: err}
|
||||
}
|
||||
case *pgproto3.AuthenticationSASL:
|
||||
err = pgConn.scramAuth(msg.AuthMechanisms)
|
||||
if err != nil {
|
||||
pgConn.conn.Close()
|
||||
return nil, &connectError{config: config, msg: "failed SASL auth", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "failed SASL auth", err: err}
|
||||
}
|
||||
case *pgproto3.AuthenticationGSS:
|
||||
err = pgConn.gssAuth()
|
||||
if err != nil {
|
||||
pgConn.conn.Close()
|
||||
return nil, &connectError{config: config, msg: "failed GSS auth", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "failed GSS auth", err: err}
|
||||
}
|
||||
case *pgproto3.ReadyForQuery:
|
||||
pgConn.status = connStatusIdle
|
||||
@@ -383,7 +396,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
return pgConn, nil
|
||||
}
|
||||
pgConn.conn.Close()
|
||||
return nil, &connectError{config: config, msg: "ValidateConnect failed", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "ValidateConnect failed", err: err}
|
||||
}
|
||||
}
|
||||
return pgConn, nil
|
||||
@@ -394,7 +407,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
return nil, ErrorResponseToPgError(msg)
|
||||
default:
|
||||
pgConn.conn.Close()
|
||||
return nil, &connectError{config: config, msg: "received unexpected message", err: err}
|
||||
return nil, &ConnectError{Config: config, msg: "received unexpected message", err: err}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -540,11 +553,12 @@ func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) {
|
||||
case *pgproto3.ParameterStatus:
|
||||
pgConn.parameterStatuses[msg.Name] = msg.Value
|
||||
case *pgproto3.ErrorResponse:
|
||||
if msg.Severity == "FATAL" {
|
||||
err := ErrorResponseToPgError(msg)
|
||||
if pgConn.config.OnPgError != nil && !pgConn.config.OnPgError(pgConn, err) {
|
||||
pgConn.status = connStatusClosed
|
||||
pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return.
|
||||
close(pgConn.cleanupDone)
|
||||
return nil, ErrorResponseToPgError(msg)
|
||||
return nil, err
|
||||
}
|
||||
case *pgproto3.NoticeResponse:
|
||||
if pgConn.config.OnNotice != nil {
|
||||
@@ -593,7 +607,7 @@ func (pgConn *PgConn) Frontend() *pgproto3.Frontend {
|
||||
return pgConn.frontend
|
||||
}
|
||||
|
||||
// Close closes a connection. It is safe to call Close on a already closed connection. Close attempts a clean close by
|
||||
// Close closes a connection. It is safe to call Close on an already closed connection. Close attempts a clean close by
|
||||
// sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The
|
||||
// underlying net.Conn.Close() will always be called regardless of any other errors.
|
||||
func (pgConn *PgConn) Close(ctx context.Context) error {
|
||||
@@ -806,6 +820,9 @@ type StatementDescription struct {
|
||||
|
||||
// Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This
|
||||
// allows Prepare to also to describe statements without creating a server-side prepared statement.
|
||||
//
|
||||
// Prepare does not send a PREPARE statement to the server. It uses the PostgreSQL Parse and Describe protocol messages
|
||||
// directly.
|
||||
func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*StatementDescription, error) {
|
||||
if err := pgConn.lock(); err != nil {
|
||||
return nil, err
|
||||
@@ -862,6 +879,52 @@ readloop:
|
||||
return psd, nil
|
||||
}
|
||||
|
||||
// Deallocate deallocates a prepared statement.
|
||||
//
|
||||
// Deallocate does not send a DEALLOCATE statement to the server. It uses the PostgreSQL Close protocol message
|
||||
// directly. This has slightly different behavior than executing DEALLOCATE statement.
|
||||
// - Deallocate can succeed in an aborted transaction.
|
||||
// - Deallocating a non-existent prepared statement is not an error.
|
||||
func (pgConn *PgConn) Deallocate(ctx context.Context, name string) error {
|
||||
if err := pgConn.lock(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer pgConn.unlock()
|
||||
|
||||
if ctx != context.Background() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return newContextAlreadyDoneError(ctx)
|
||||
default:
|
||||
}
|
||||
pgConn.contextWatcher.Watch(ctx)
|
||||
defer pgConn.contextWatcher.Unwatch()
|
||||
}
|
||||
|
||||
pgConn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name})
|
||||
pgConn.frontend.SendSync(&pgproto3.Sync{})
|
||||
err := pgConn.flushWithPotentialWriteReadDeadlock()
|
||||
if err != nil {
|
||||
pgConn.asyncClose()
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := pgConn.receiveMessage()
|
||||
if err != nil {
|
||||
pgConn.asyncClose()
|
||||
return normalizeTimeoutError(ctx, err)
|
||||
}
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *pgproto3.ErrorResponse:
|
||||
return ErrorResponseToPgError(msg)
|
||||
case *pgproto3.ReadyForQuery:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ErrorResponseToPgError converts a wire protocol error message to a *PgError.
|
||||
func ErrorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError {
|
||||
return &PgError{
|
||||
@@ -935,16 +998,21 @@ func (pgConn *PgConn) CancelRequest(ctx context.Context) error {
|
||||
buf := make([]byte, 16)
|
||||
binary.BigEndian.PutUint32(buf[0:4], 16)
|
||||
binary.BigEndian.PutUint32(buf[4:8], 80877102)
|
||||
binary.BigEndian.PutUint32(buf[8:12], uint32(pgConn.pid))
|
||||
binary.BigEndian.PutUint32(buf[12:16], uint32(pgConn.secretKey))
|
||||
// Postgres will process the request and close the connection
|
||||
// so when don't need to read the reply
|
||||
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.10
|
||||
_, err = cancelConn.Write(buf)
|
||||
return err
|
||||
binary.BigEndian.PutUint32(buf[8:12], pgConn.pid)
|
||||
binary.BigEndian.PutUint32(buf[12:16], pgConn.secretKey)
|
||||
|
||||
if _, err := cancelConn.Write(buf); err != nil {
|
||||
return fmt.Errorf("write to connection for cancellation: %w", err)
|
||||
}
|
||||
|
||||
// Wait for the cancel request to be acknowledged by the server.
|
||||
// It copies the behavior of the libpq: https://github.com/postgres/postgres/blob/REL_16_0/src/interfaces/libpq/fe-connect.c#L4946-L4960
|
||||
_, _ = cancelConn.Read(buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not
|
||||
// WaitForNotification waits for a LISTEN/NOTIFY message to be received. It returns an error if a notification was not
|
||||
// received.
|
||||
func (pgConn *PgConn) WaitForNotification(ctx context.Context) error {
|
||||
if err := pgConn.lock(); err != nil {
|
||||
@@ -1606,25 +1674,55 @@ func (rr *ResultReader) concludeCommand(commandTag CommandTag, err error) {
|
||||
// Batch is a collection of queries that can be sent to the PostgreSQL server in a single round-trip.
|
||||
type Batch struct {
|
||||
buf []byte
|
||||
err error
|
||||
}
|
||||
|
||||
// ExecParams appends an ExecParams command to the batch. See PgConn.ExecParams for parameter descriptions.
|
||||
func (batch *Batch) ExecParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) {
|
||||
batch.buf = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(batch.buf)
|
||||
if batch.err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
batch.buf, batch.err = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(batch.buf)
|
||||
if batch.err != nil {
|
||||
return
|
||||
}
|
||||
batch.ExecPrepared("", paramValues, paramFormats, resultFormats)
|
||||
}
|
||||
|
||||
// ExecPrepared appends an ExecPrepared e command to the batch. See PgConn.ExecPrepared for parameter descriptions.
|
||||
func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) {
|
||||
batch.buf = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(batch.buf)
|
||||
batch.buf = (&pgproto3.Describe{ObjectType: 'P'}).Encode(batch.buf)
|
||||
batch.buf = (&pgproto3.Execute{}).Encode(batch.buf)
|
||||
if batch.err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
batch.buf, batch.err = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(batch.buf)
|
||||
if batch.err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
batch.buf, batch.err = (&pgproto3.Describe{ObjectType: 'P'}).Encode(batch.buf)
|
||||
if batch.err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
batch.buf, batch.err = (&pgproto3.Execute{}).Encode(batch.buf)
|
||||
if batch.err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a
|
||||
// transaction is already in progress or SQL contains transaction control statements. This is a simpler way of executing
|
||||
// multiple queries in a single round trip than using pipeline mode.
|
||||
func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader {
|
||||
if batch.err != nil {
|
||||
return &MultiResultReader{
|
||||
closed: true,
|
||||
err: batch.err,
|
||||
}
|
||||
}
|
||||
|
||||
if err := pgConn.lock(); err != nil {
|
||||
return &MultiResultReader{
|
||||
closed: true,
|
||||
@@ -1650,7 +1748,13 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR
|
||||
pgConn.contextWatcher.Watch(ctx)
|
||||
}
|
||||
|
||||
batch.buf = (&pgproto3.Sync{}).Encode(batch.buf)
|
||||
batch.buf, batch.err = (&pgproto3.Sync{}).Encode(batch.buf)
|
||||
if batch.err != nil {
|
||||
multiResult.closed = true
|
||||
multiResult.err = batch.err
|
||||
pgConn.unlock()
|
||||
return multiResult
|
||||
}
|
||||
|
||||
pgConn.enterPotentialWriteReadDeadlock()
|
||||
defer pgConn.exitPotentialWriteReadDeadlock()
|
||||
@@ -1732,10 +1836,16 @@ func (pgConn *PgConn) enterPotentialWriteReadDeadlock() {
|
||||
|
||||
// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock.
|
||||
func (pgConn *PgConn) exitPotentialWriteReadDeadlock() {
|
||||
// The state of the timer is not relevant upon exiting the potential slow write. It may both
|
||||
// fire (due to a slow write), or not fire (due to a fast write).
|
||||
_ = pgConn.slowWriteTimer.Stop()
|
||||
pgConn.bgReader.Stop()
|
||||
if !pgConn.slowWriteTimer.Stop() {
|
||||
// The timer starts its function in a separate goroutine. It is necessary to ensure the background reader has
|
||||
// started before calling Stop. Otherwise, the background reader may not be stopped. That on its own is not a
|
||||
// serious problem. But what is a serious problem is that the background reader may start at an inopportune time in
|
||||
// a subsequent query. For example, if a subsequent query was canceled then a deadline may be set on the net.Conn to
|
||||
// interrupt an in-progress read. After the read is interrupted, but before the deadline is cleared, the background
|
||||
// reader could start and read a deadline error. Then the next query would receive the an unexpected deadline error.
|
||||
<-pgConn.bgReaderStarted
|
||||
pgConn.bgReader.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error {
|
||||
@@ -1764,7 +1874,7 @@ func (pgConn *PgConn) SyncConn(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// This should never happen. Only way I can imagine this occuring is if the server is constantly sending data such as
|
||||
// This should never happen. Only way I can imagine this occurring is if the server is constantly sending data such as
|
||||
// LISTEN/NOTIFY or log notifications such that we never can get an empty buffer.
|
||||
return errors.New("SyncConn: conn never synchronized")
|
||||
}
|
||||
@@ -1830,8 +1940,14 @@ func Construct(hc *HijackedConn) (*PgConn, error) {
|
||||
|
||||
pgConn.contextWatcher = newContextWatcher(pgConn.conn)
|
||||
pgConn.bgReader = bgreader.New(pgConn.conn)
|
||||
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
|
||||
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
|
||||
func() {
|
||||
pgConn.bgReader.Start()
|
||||
pgConn.bgReaderStarted <- struct{}{}
|
||||
},
|
||||
)
|
||||
pgConn.slowWriteTimer.Stop()
|
||||
pgConn.bgReaderStarted = make(chan struct{})
|
||||
pgConn.frontend = hc.Config.BuildFrontend(pgConn.bgReader, pgConn.conn)
|
||||
|
||||
return pgConn, nil
|
||||
@@ -1973,6 +2089,13 @@ func (p *Pipeline) Flush() error {
|
||||
|
||||
// Sync establishes a synchronization point and flushes the queued requests.
|
||||
func (p *Pipeline) Sync() error {
|
||||
if p.closed {
|
||||
if p.err != nil {
|
||||
return p.err
|
||||
}
|
||||
return errors.New("pipeline closed")
|
||||
}
|
||||
|
||||
p.conn.frontend.SendSync(&pgproto3.Sync{})
|
||||
err := p.Flush()
|
||||
if err != nil {
|
||||
@@ -1989,14 +2112,28 @@ func (p *Pipeline) Sync() error {
|
||||
// *PipelineSync. If an ErrorResponse is received from the server, results will be nil and err will be a *PgError. If no
|
||||
// results are available, results and err will both be nil.
|
||||
func (p *Pipeline) GetResults() (results any, err error) {
|
||||
if p.closed {
|
||||
if p.err != nil {
|
||||
return nil, p.err
|
||||
}
|
||||
return nil, errors.New("pipeline closed")
|
||||
}
|
||||
|
||||
if p.expectedReadyForQueryCount == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return p.getResults()
|
||||
}
|
||||
|
||||
func (p *Pipeline) getResults() (results any, err error) {
|
||||
for {
|
||||
msg, err := p.conn.receiveMessage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
p.closed = true
|
||||
p.err = err
|
||||
p.conn.asyncClose()
|
||||
return nil, normalizeTimeoutError(p.ctx, err)
|
||||
}
|
||||
|
||||
switch msg := msg.(type) {
|
||||
@@ -2018,7 +2155,8 @@ func (p *Pipeline) GetResults() (results any, err error) {
|
||||
case *pgproto3.ParseComplete:
|
||||
peekedMsg, err := p.conn.peekMessage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
p.conn.asyncClose()
|
||||
return nil, normalizeTimeoutError(p.ctx, err)
|
||||
}
|
||||
if _, ok := peekedMsg.(*pgproto3.ParameterDescription); ok {
|
||||
return p.getResultsPrepare()
|
||||
@@ -2078,6 +2216,7 @@ func (p *Pipeline) Close() error {
|
||||
if p.closed {
|
||||
return p.err
|
||||
}
|
||||
|
||||
p.closed = true
|
||||
|
||||
if p.pendingSync {
|
||||
@@ -2090,7 +2229,7 @@ func (p *Pipeline) Close() error {
|
||||
}
|
||||
|
||||
for p.expectedReadyForQueryCount > 0 {
|
||||
_, err := p.GetResults()
|
||||
_, err := p.getResults()
|
||||
if err != nil {
|
||||
p.err = err
|
||||
var pgErr *PgError
|
||||
|
||||
Reference in New Issue
Block a user