build(deps): bump github.com/jackc/pgx/v4 from 4.13.0 to 4.15.0

Bumps [github.com/jackc/pgx/v4](https://github.com/jackc/pgx) from 4.13.0 to 4.15.0.
- [Release notes](https://github.com/jackc/pgx/releases)
- [Changelog](https://github.com/jackc/pgx/blob/master/CHANGELOG.md)
- [Commits](https://github.com/jackc/pgx/compare/v4.13.0...v4.15.0)

---
updated-dependencies:
- dependency-name: github.com/jackc/pgx/v4
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot] 2022-04-07 18:09:49 +00:00 committed by Ondřej Budai
parent a3c207945f
commit 2c2668f493
29 changed files with 605 additions and 150 deletions

View file

@ -1,3 +1,17 @@
# 1.11.0 (February 7, 2022)
* Support port in ip from LookupFunc to override config (James Hartig)
* Fix TLS connection timeout (Blake Embrey)
* Add support for read-only, primary, standby, prefer-standby target_session_attributes (Oscar)
* Fix connect when receiving NoticeResponse
# 1.10.1 (November 20, 2021)
* Close without waiting for response (Kei Kamikawa)
* Save waiting for network round-trip in CopyFrom (Rueian)
* Fix concurrency issue with ContextWatcher
* LRU.Get always checks context for cancellation / expiration (Georges Varouchas)
# 1.10.0 (July 24, 2021)
* net.Timeout errors are no longer returned when a query is canceled via context. A wrapped context error is returned.

View file

@ -248,21 +248,21 @@ func ParseConfig(connString string) (*Config, error) {
config.LookupFunc = makeDefaultResolver().LookupHost
notRuntimeParams := map[string]struct{}{
"host": struct{}{},
"port": struct{}{},
"database": struct{}{},
"user": struct{}{},
"password": struct{}{},
"passfile": struct{}{},
"connect_timeout": struct{}{},
"sslmode": struct{}{},
"sslkey": struct{}{},
"sslcert": struct{}{},
"sslrootcert": struct{}{},
"target_session_attrs": struct{}{},
"min_read_buffer_size": struct{}{},
"service": struct{}{},
"servicefile": struct{}{},
"host": {},
"port": {},
"database": {},
"user": {},
"password": {},
"passfile": {},
"connect_timeout": {},
"sslmode": {},
"sslkey": {},
"sslcert": {},
"sslrootcert": {},
"target_session_attrs": {},
"min_read_buffer_size": {},
"service": {},
"servicefile": {},
}
for k, v := range settings {
@ -329,10 +329,19 @@ func ParseConfig(connString string) (*Config, error) {
}
}
if settings["target_session_attrs"] == "read-write" {
switch tsa := settings["target_session_attrs"]; tsa {
case "read-write":
config.ValidateConnect = ValidateConnectTargetSessionAttrsReadWrite
} else if settings["target_session_attrs"] != "any" {
return nil, &parseConfigError{connString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", settings["target_session_attrs"])}
case "read-only":
config.ValidateConnect = ValidateConnectTargetSessionAttrsReadOnly
case "primary":
config.ValidateConnect = ValidateConnectTargetSessionAttrsPrimary
case "standby":
config.ValidateConnect = ValidateConnectTargetSessionAttrsStandby
case "any", "prefer-standby":
// do nothing
default:
return nil, &parseConfigError{connString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", tsa)}
}
return config, nil
@ -727,3 +736,48 @@ func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgC
return nil
}
// ValidateConnectTargetSessionAttrsReadOnly is an ValidateConnectFunc that implements libpq compatible
// target_session_attrs=read-only.
func ValidateConnectTargetSessionAttrsReadOnly(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read()
if result.Err != nil {
return result.Err
}
if string(result.Rows[0][0]) != "on" {
return errors.New("connection is not read only")
}
return nil
}
// ValidateConnectTargetSessionAttrsStandby is an ValidateConnectFunc that implements libpq compatible
// target_session_attrs=standby.
func ValidateConnectTargetSessionAttrsStandby(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read()
if result.Err != nil {
return result.Err
}
if string(result.Rows[0][0]) != "t" {
return errors.New("server is not in hot standby mode")
}
return nil
}
// ValidateConnectTargetSessionAttrsPrimary is an ValidateConnectFunc that implements libpq compatible
// target_session_attrs=primary.
func ValidateConnectTargetSessionAttrsPrimary(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read()
if result.Err != nil {
return result.Err
}
if string(result.Rows[0][0]) == "t" {
return errors.New("server is in standby mode")
}
return nil
}

View file

@ -2,6 +2,7 @@ package ctxwatch
import (
"context"
"sync"
)
// ContextWatcher watches a context and performs an action when the context is canceled. It can watch one context at a
@ -10,8 +11,10 @@ type ContextWatcher struct {
onCancel func()
onUnwatchAfterCancel func()
unwatchChan chan struct{}
watchInProgress bool
onCancelWasCalled bool
lock sync.Mutex
watchInProgress bool
onCancelWasCalled bool
}
// NewContextWatcher returns a ContextWatcher. onCancel will be called when a watched context is canceled.
@ -29,6 +32,9 @@ func NewContextWatcher(onCancel func(), onUnwatchAfterCancel func()) *ContextWat
// Watch starts watching ctx. If ctx is canceled then the onCancel function passed to NewContextWatcher will be called.
func (cw *ContextWatcher) Watch(ctx context.Context) {
cw.lock.Lock()
defer cw.lock.Unlock()
if cw.watchInProgress {
panic("Watch already in progress")
}
@ -54,6 +60,9 @@ func (cw *ContextWatcher) Watch(ctx context.Context) {
// Unwatch stops watching the previously watched context. If the onCancel function passed to NewContextWatcher was
// called then onUnwatchAfterCancel will also be called.
func (cw *ContextWatcher) Unwatch() {
cw.lock.Lock()
defer cw.lock.Unlock()
if cw.watchInProgress {
cw.unwatchChan <- struct{}{}
if cw.onCancelWasCalled {

View file

@ -11,6 +11,7 @@ import (
"io"
"math"
"net"
"strconv"
"strings"
"sync"
"time"
@ -44,7 +45,8 @@ type Notification struct {
// DialFunc is a function that can be used to connect to a PostgreSQL server.
type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
// LookupFunc is a function that can be used to lookup IPs addrs from host.
// LookupFunc is a function that can be used to lookup IPs addrs from host. Optionally an ip:port combination can be
// returned in order to override the connection string's port.
type LookupFunc func(ctx context.Context, host string) (addrs []string, err error)
// BuildFrontendFunc is a function that can be used to create Frontend implementation for connection.
@ -196,11 +198,24 @@ func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*Fallba
}
for _, ip := range ips {
configs = append(configs, &FallbackConfig{
Host: ip,
Port: fb.Port,
TLSConfig: fb.TLSConfig,
})
splitIP, splitPort, err := net.SplitHostPort(ip)
if err == nil {
port, err := strconv.ParseUint(splitPort, 10, 16)
if err != nil {
return nil, fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err)
}
configs = append(configs, &FallbackConfig{
Host: splitIP,
Port: uint16(port),
TLSConfig: fb.TLSConfig,
})
} else {
configs = append(configs, &FallbackConfig{
Host: ip,
Port: fb.Port,
TLSConfig: fb.TLSConfig,
})
}
}
}
@ -215,7 +230,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
var err error
network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port)
pgConn.conn, err = config.DialFunc(ctx, network, address)
netConn, err := config.DialFunc(ctx, network, address)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
@ -224,24 +239,27 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
return nil, &connectError{config: config, msg: "dial error", err: err}
}
pgConn.parameterStatuses = make(map[string]string)
pgConn.conn = netConn
pgConn.contextWatcher = newContextWatcher(netConn)
pgConn.contextWatcher.Watch(ctx)
if fallbackConfig.TLSConfig != nil {
if err := pgConn.startTLS(fallbackConfig.TLSConfig); err != nil {
pgConn.conn.Close()
tlsConn, err := startTLS(netConn, fallbackConfig.TLSConfig)
pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS.
if err != nil {
netConn.Close()
return nil, &connectError{config: config, msg: "tls error", err: err}
}
pgConn.conn = tlsConn
pgConn.contextWatcher = newContextWatcher(tlsConn)
pgConn.contextWatcher.Watch(ctx)
}
pgConn.status = connStatusConnecting
pgConn.contextWatcher = ctxwatch.NewContextWatcher(
func() { pgConn.conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
func() { pgConn.conn.SetDeadline(time.Time{}) },
)
pgConn.contextWatcher.Watch(ctx)
defer pgConn.contextWatcher.Unwatch()
pgConn.parameterStatuses = make(map[string]string)
pgConn.status = connStatusConnecting
pgConn.frontend = config.BuildFrontend(pgConn.conn, pgConn.conn)
startupMsg := pgproto3.StartupMessage{
@ -317,7 +335,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
}
}
return pgConn, nil
case *pgproto3.ParameterStatus:
case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse:
// handled by ReceiveMessage
case *pgproto3.ErrorResponse:
pgConn.conn.Close()
@ -329,24 +347,29 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
}
}
func (pgConn *PgConn) startTLS(tlsConfig *tls.Config) (err error) {
err = binary.Write(pgConn.conn, binary.BigEndian, []int32{8, 80877103})
func newContextWatcher(conn net.Conn) *ctxwatch.ContextWatcher {
return ctxwatch.NewContextWatcher(
func() { conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
func() { conn.SetDeadline(time.Time{}) },
)
}
func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) {
err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103})
if err != nil {
return
return nil, err
}
response := make([]byte, 1)
if _, err = io.ReadFull(pgConn.conn, response); err != nil {
return
if _, err = io.ReadFull(conn, response); err != nil {
return nil, err
}
if response[0] != 'S' {
return errors.New("server refused TLS connection")
return nil, errors.New("server refused TLS connection")
}
pgConn.conn = tls.Client(pgConn.conn, tlsConfig)
return nil
return tls.Client(conn, tlsConfig), nil
}
func (pgConn *PgConn) txPasswordMessage(password string) (err error) {
@ -578,7 +601,6 @@ func (pgConn *PgConn) Close(ctx context.Context) error {
//
// See https://github.com/jackc/pgx/issues/637
pgConn.conn.Write([]byte{'X', 0, 0, 0, 4})
pgConn.conn.Read(make([]byte, 1))
return pgConn.conn.Close()
}
@ -605,7 +627,6 @@ func (pgConn *PgConn) asyncClose() {
pgConn.conn.SetDeadline(deadline)
pgConn.conn.Write([]byte{'X', 0, 0, 0, 4})
pgConn.conn.Read(make([]byte, 1))
}()
}
@ -1187,27 +1208,6 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
return nil, &writeError{err: err, safeToRetry: n == 0}
}
// Read until copy in response or error.
var commandTag CommandTag
var pgErr error
pendingCopyInResponse := true
for pendingCopyInResponse {
msg, err := pgConn.receiveMessage()
if err != nil {
pgConn.asyncClose()
return nil, preferContextOverNetTimeoutError(ctx, err)
}
switch msg := msg.(type) {
case *pgproto3.CopyInResponse:
pendingCopyInResponse = false
case *pgproto3.ErrorResponse:
pgErr = ErrorResponseToPgError(msg)
case *pgproto3.ReadyForQuery:
return commandTag, pgErr
}
}
// Send copy data
abortCopyChan := make(chan struct{})
copyErrChan := make(chan error, 1)
@ -1246,6 +1246,7 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
}
}()
var pgErr error
var copyErr error
for copyErr == nil && pgErr == nil {
select {
@ -1282,6 +1283,7 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
}
// Read results
var commandTag CommandTag
for {
msg, err := pgConn.receiveMessage()
if err != nil {
@ -1715,10 +1717,7 @@ func Construct(hc *HijackedConn) (*PgConn, error) {
cleanupDone: make(chan struct{}),
}
pgConn.contextWatcher = ctxwatch.NewContextWatcher(
func() { pgConn.conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
func() { pgConn.conn.SetDeadline(time.Time{}) },
)
pgConn.contextWatcher = newContextWatcher(pgConn.conn)
return pgConn, nil
}

View file

@ -42,6 +42,14 @@ func NewLRU(conn *pgconn.PgConn, mode int, cap int) *LRU {
// Get returns the prepared statement description for sql preparing or describing the sql on the server as needed.
func (c *LRU) Get(ctx context.Context, sql string) (*pgconn.StatementDescription, error) {
if ctx != context.Background() {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
}
// flush an outstanding bad statements
txStatus := c.conn.TxStatus()
if (txStatus == 'I' || txStatus == 'T') && len(c.stmtsToClear) > 0 {