💾 Archived View for godocs.io › github.com › mediocregopher › radix › v4 captured on 2024-05-26 at 14:35:51. Gemini links have been rewritten to link to archived content
⬅️ Previous capture (2024-03-21)
-=-=-=-=-=-=-
import "github.com/mediocregopher/radix/v4"
Package radix implements all functionality needed to work with redis and all things related to it, including redis cluster, pubsub, sentinel, scanning, lua scripting, and more.
This package has extensive examples documenting advanced behavior not covered here.
For a single redis instance use PoolConfig to create a connection pool. The connection pool implements the Client interface. It is thread-safe and will automatically create, reuse, and recreate connections as needed:
client, err := (radix.PoolConfig{}).New("tcp", "127.0.0.1:6379") if err != nil { // handle error }
If you're using sentinel or cluster you should use SentinelConfig or ClusterConfig (respectively) to create your Client instead.
Any redis command can be performed by passing a Cmd into a Client's Do method. Each Cmd instance should only be used once. The return from the Cmd can be captured into any appopriate go primitive type, or a slice, map, or struct, if the command returns an array.
// discard the result err := client.Do(ctx, radix.Cmd(nil, "SET", "foo", "someval")) var fooVal string err := client.Do(ctx, radix.Cmd(&fooVal, "GET", "foo")) var fooValB []byte err := client.Do(ctx, radix.Cmd(&fooValB, "GET", "foo")) var barI int err := client.Do(ctx, radix.Cmd(&barI, "INCR", "bar")) var bazEls []string err := client.Do(ctx, radix.Cmd(&bazEls, "LRANGE", "baz", "0", "-1")) var buzMap map[string]string err := client.Do(ctx, radix.Cmd(&buzMap, "HGETALL", "buz"))
FlatCmd can also be used if you wish to use non-string arguments like integers, slices, maps, or structs, and have them automatically be flattened into a single string slice.
Cmd and FlatCmd both implement the Action interface. Other Actions include Pipeline, WithConn, and EvalScript.Cmd. Any of these may be passed into any Client's Do method.
var fooVal string p := radix.NewPipeline() p.Append(radix.FlatCmd(nil, "SET", "foo", 1)) p.Append(radix.Cmd(&fooVal, "GET", "foo")) if err := client.Do(p); err != nil { panic(err) } fmt.Printf("fooVal: %q\n", fooVal)
There are two ways to perform transactions in redis. The first is with the MULTI/EXEC commands, which can be done using the WithConn Action (see its example). The second is using EVAL with lua scripting, which can be done using the EvalScript Action (again, see its example).
EVAL with lua scripting is recommended in almost all cases. It only requires a single round-trip, it's infinitely more flexible than MULTI/EXEC, it's simpler to code, and for complex transactions, which would otherwise need a WATCH statement with MULTI/EXEC, it's significantly faster.
Dialer has fields like AuthPass and SelectDB which can be used to configure Conns at creation.
PoolConfig takes a Dialer as one of its fields, so that all Conns the Pool creates will be created with those settings.
Other Clients which create their own Pools, like Cluster and Sentinel, will take in a PoolConfig which can be used to configure the Pools they create.
For example, to create a Cluster instance which uses a particular AUTH password on all Conns:
cfg := radix.ClusterConfig{ PoolConfig: radix.PoolConfig{ Dialer: radix.Dialer{ AuthPass: "mySuperSecretPassword", }, }, } client, err := cfg.New(ctx, []string{redisAddr1, redisAddr2, redisAddr3})
All interfaces in this package were designed such that they could have custom implementations. There is no dependency within radix that demands any interface be implemented by a particular underlying type, so feel free to create your own Pools or Conns or Actions or whatever makes your life easier.
Errors returned from redis can be explicitly checked for using the the resp3.SimpleError type. Note that the errors.As or errors.Is functions, introduced in go 1.13, should be used.
var redisErr resp3.SimpleError err := client.Do(ctx, radix.Cmd(nil, "AUTH", "wrong password")) if errors.As(err, &redisErr) { log.Printf("redis error returned: %s", redisErr.S) }
var ErrNoStreamEntries = errors.New("no stream entries")
ErrNoStreamEntries is returned by StreamReader's Next method to indicate that there were no stream entries left to be read.
func CRC16(buf []byte) uint16
CRC16 returns checksum for a given set of bytes based on the crc algorithm defined for hashing redis keys in a cluster setup.
func ClusterSlot(key []byte) uint16
ClusterSlot returns the slot number the key belongs to in any redis cluster, taking into account key hash tags.
type Action interface { // Properties returns an ActionProperties value for the Action. Multiple // calls to Properties should always yield the same ActionProperties value. Properties() ActionProperties // Perform actually performs the Action using an existing Conn. Perform(ctx context.Context, c Conn) error }
Action performs a task using a Conn.
func Cmd(rcv interface{}, cmd string, args ...string) Action
Cmd is used to perform a redis command and retrieve a result. It should not be passed into Do more than once.
If the receiver value of Cmd is nil then the result is discarded.
If the receiver value of Cmd is a primitive, a slice/map, or a struct then a pointer must be passed in. It may also be an io.Writer, an encoding.Text/BinaryUnmarshaler, or a resp.Unmarshaler.
The Action returned by Cmd also implements resp.Marshaler.
See CmdConfig's documentation if more configurability is required, e.g. if using commands provided by Redis Modules.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { panic(err) } if err := client.Do(ctx, Cmd(nil, "SET", "foo", "bar")); err != nil { panic(err) } var fooVal string if err := client.Do(ctx, Cmd(&fooVal, "GET", "foo")); err != nil { panic(err) } fmt.Println(fooVal) // Output: bar }
Output:
bar
func FlatCmd(rcv interface{}, cmd string, args ...interface{}) Action
FlatCmd is like Cmd, but the arguments can be of almost any type, and FlatCmd will automatically flatten them into a single array of strings. Like Cmd, a FlatCmd should not be passed into Do more than once.
FlatCmd supports using a resp.LenReader (an io.Reader with a Len() method) as an argument. *bytes.Buffer is an example of a LenReader, and the resp package has a NewLenReader function which can wrap an existing io.Reader.
FlatCmd supports encoding.Text/BinaryMarshalers, big.Float, and big.Int.
The receiver to FlatCmd follows the same rules as for Cmd.
The Action returned by FlatCmd implements resp.Marshaler.
See CmdConfig's documentation if more configurability is required, e.g. if using commands provided by Redis Modules.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { panic(err) } // performs "SET" "foo" "1" err = client.Do(ctx, FlatCmd(nil, "SET", "foo", 1)) if err != nil { panic(err) } // performs "SADD" "fooSet" "1" "2" "3" err = client.Do(ctx, FlatCmd(nil, "SADD", "fooSet", []string{"1", "2", "3"})) if err != nil { panic(err) } // performs "HMSET" "foohash" "a" "1" "b" "2" "c" "3" m := map[string]int{"a": 1, "b": 2, "c": 3} err = client.Do(ctx, FlatCmd(nil, "HMSET", "fooHash", m)) if err != nil { panic(err) } }
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { panic(err) } // FlatCmd can marshal structs into a key/value array. Exported field names // will be used as keys, using similar rules as the json package. type ExampleStruct struct { Foo string // The key "Foo" will be used. Bar string `redis:"BAR"` // The key "BAR" will be used Baz string `redis:"-"` // This field will be skipped } type OuterExampleStruct struct { // adds fields "Foo" and "BAR" to OuterExampleStruct ExampleStruct Biz int } s := OuterExampleStruct{ ExampleStruct: ExampleStruct{ Foo: "1", Bar: "2", Baz: "3", }, Biz: 4, } err = client.Do(ctx, FlatCmd(nil, "HMSET", "barHash", s)) if err != nil { panic(err) } // Cmd and FlatCmd can also unmarshal results into a struct. var s2 OuterExampleStruct err = client.Do(ctx, Cmd(&s2, "HGETALL", "barHash")) if err != nil { panic(err) } fmt.Printf("s2: %+v\n", s2) // Output: s2: {ExampleStruct:{Foo:1 Bar:2 Baz:} Biz:4} }
Output:
s2: {ExampleStruct:{Foo:1 Bar:2 Baz:} Biz:4}
func WithConn(key string, fn func(context.Context, Conn) error) Action
WithConn is used to perform a set of independent Actions on the same Conn.
key should be a key which one or more of the inner Actions is going to act on, or "" if no keys are being acted on or the keys aren't yet known. key is generally only necessary when using Cluster.
The callback function is what should actually carry out the inner actions, and the error it returns will be passed back up immediately.
NOTE that WithConn only ensures all inner Actions are performed on the same Conn, it doesn't make them transactional. Use MULTI/WATCH/EXEC within a WithConn for transactions, or use EvalScript.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { // handle error } // This example retrieves the current integer value of `key` and sets its // new value to be the increment of that, all using the same connection // instance. NOTE that it does not do this atomically like the INCR command // would. key := "someKey" err = client.Do(ctx, WithConn(key, func(ctx context.Context, conn Conn) error { var curr int if err := conn.Do(ctx, Cmd(&curr, "GET", key)); err != nil { return err } curr++ return conn.Do(ctx, FlatCmd(nil, "SET", key, curr)) })) if err != nil { // handle error } }
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { // handle error } // This example retrieves the current value of `key` and then sets a new // value on it in an atomic transaction. key := "someKey" var prevVal string err = client.Do(ctx, WithConn(key, func(ctx context.Context, c Conn) error { // Begin the transaction with a MULTI command if err := c.Do(ctx, Cmd(nil, "MULTI")); err != nil { return err } // If any of the calls after the MULTI call error it's important that // the transaction is discarded. This isn't strictly necessary if the // only possible error is a network error, as the connection would be // closed by the client anyway. var err error defer func() { if err != nil { // The return from DISCARD doesn't matter. If it's an error then // it's a network error and the Conn will be closed by the // client. _ = c.Do(ctx, Cmd(nil, "DISCARD")) } }() // queue up the transaction's commands if err = c.Do(ctx, Cmd(nil, "GET", key)); err != nil { return err } if err = c.Do(ctx, Cmd(nil, "SET", key, "someOtherValue")); err != nil { return err } // execute the transaction, capturing the result in a Tuple. We only // care about the first element (the result from GET), so we discard the // second by setting nil. result := Tuple{&prevVal, nil} return c.Do(ctx, Cmd(&result, "EXEC")) })) if err != nil { // handle error } fmt.Printf("the value of key %q was %q\n", key, prevVal) }
type ActionProperties struct { // Keys describes which redis keys an Action will act on. An empty/nil slice // maybe used if no keys are being acted on. The slice may contain duplicate // values. Keys []string // CanRetry indicates, in the event of a cluster node returning a MOVED or // ASK error, the Action can be retried on a different node. CanRetry bool // CanPipeline indicates that an Action can be pipelined alongside other // Actions for which this property is also true. CanPipeline bool // CanShareConn indicates that an Action can be Perform'd on a Conn // concurrently with other Actions for which this property is also true. CanShareConn bool }
ActionProperties describes various properties of an Action. It should be expected that more fields will be added to this struct as time goes forward, though the zero values of those new fields will have sane default behaviors.
func DefaultActionProperties(cmd string, args ...string) ActionProperties
DefaultActionProperties returns an ActionProperties instance for the given Redis command and it's argument.
The returned ActionProperties should work well with all standard Redis commands, including allowing the command to be used in pipelines and for connection sharing if the command is non-blocking, but may not return correct results for custom commands provided by Redis Modules or unreleased commands.
type Client interface { // Addr returns the address of the redis instance which the Client was // initialized against. Addr() net.Addr // Do performs an Action on a Conn connected to the redis instance. Do(context.Context, Action) error // Once Close() is called all future method calls on the Client will return // an error Close() error }
Client describes an entity which can carry out Actions on a single redis instance. Conn and Pool are Clients.
Implementations of Client are expected to be thread-safe.
type Cluster struct { // contains filtered or unexported fields }
Cluster is a MultiClient which contains all information about a redis cluster needed to interact with it, including a set of pools to each of its instances.
All methods on Cluster are thread-safe.
Cluster will automatically attempt to handle MOVED/ASK errors.
func (c *Cluster) Clients() (map[string]ReplicaSet, error)
Clients implements the method for the MultiClient interface.
func (c *Cluster) Close() error
Close cleans up all goroutines spawned by Cluster and closes all of its Pools.
func (c *Cluster) Do(ctx context.Context, a Action) error
Do performs an Action on a redis instance in the cluster, with the instance being determeined by the keys returned from the Action's Properties() method.
This method handles MOVED and ASK errors automatically in most cases.
func (c *Cluster) DoSecondary(ctx context.Context, a Action) error
DoSecondary implements the method for the MultiClient interface. It will perform the Action on a random secondary for the affected keys, or the primary if no secondary is available.
For DoSecondary to work, all connections must be created in read-only mode by using the READONLY command. See the PoolConfig field of ClusterConfig for more details.
func (c *Cluster) Sync(ctx context.Context) error
Sync will synchronize the Cluster with the actual cluster, making new pools to new instances and removing ones from instances no longer in the cluster. This will be called periodically automatically, but you can manually call it at any time as well.
func (c *Cluster) Topo() ClusterTopo
Topo returns the Cluster's topology as it currently knows it. See ClusterTopo's docs for more on its default order.
type ClusterConfig struct { // PoolConfig is used by Cluster to create Clients for redis instances in // the cluster set. // // If PoolConfig.CustomPool and PoolConfig.Dialer.CustomConn are unset // then all Conns created by Cluster will have the READONLY command // performed on them upon creation. For Conns to primary instances this will // have no effect, but for secondaries this will allow DoSecondary to // function properly. // // If PoolConfig.CustomPool or PoolConfig.Dialer.CustomConn are set then // READONLY must be called by whichever is set in order for DoSecondary to // work. PoolConfig PoolConfig // SyncEvery tells the Cluster to synchronize itself with the cluster's // topology at the given interval. On every synchronization Cluster will ask // the cluster for its topology and make/destroy its Clients as necessary. // // Defaults to 5 * time.Second. Set to -1 to disable. SyncEvery time.Duration // OnDownDelayActionsBy tells the Cluster to delay all commands by the given // duration while the cluster is seen to be in the CLUSTERDOWN state. This // allows fewer Actions to be affected by brief outages, e.g. during a // failover. // // Calls to Sync will not be delayed regardless of this option. // // Defaults to 100 * time.Millisecond. Set to -1 to disable. OnDownDelayActionsBy time.Duration // Trace contains callbacks that a Cluster can use to trace itself. // // All callbacks are blocking. Trace trace.ClusterTrace }
ClusterConfig is used to create Cluster instances with particular settings. All fields are optional, all methods are thread-safe.
func (cfg ClusterConfig) New(ctx context.Context, clusterAddrs []string) (*Cluster, error)
New initializes and returns a Cluster instance using the ClusterConfig. It will try every address given until it finds a usable one. From there it uses CLUSTER SLOTS to discover the cluster topology and make all the necessary connections.
type ClusterNode struct { // older versions of redis might not actually send back the id, so it may be // blank Addr, ID string // start is inclusive, end is exclusive Slots [][2]uint16 // address and id this node is the secondary of, if it's a secondary SecondaryOfAddr, SecondaryOfID string }
ClusterNode describes a single node in a redis cluster at a moment in time.
type ClusterTopo []ClusterNode
ClusterTopo describes the topology of a redis cluster at a given moment. It will be sorted first by slot number of each node and then by secondary status, so primaries will come before secondaries.
func (tt ClusterTopo) Map() map[string]ClusterNode
Map returns the topology as a mapping of node address to its ClusterNode.
func (tt ClusterTopo) MarshalRESP(w io.Writer, o *resp.Opts) error
MarshalRESP implements the resp.Marshaler interface, and will marshal the ClusterTopo in the same format as the return from CLUSTER SLOTS.
func (tt ClusterTopo) Primaries() ClusterTopo
Primaries returns a ClusterTopo instance containing only the primary nodes from the ClusterTopo being called on.
func (tt *ClusterTopo) UnmarshalRESP(br resp.BufferedReader, o *resp.Opts) error
UnmarshalRESP implements the resp.Unmarshaler interface, but only supports unmarshaling the return from CLUSTER SLOTS. The unmarshaled nodes will be sorted before they are returned.
type CmdConfig struct { // ActionProperties is an optional callback that will be called when // creating a new Action using CmdConfig.Cmd or CmdConfig.FlatCmd and is // used to set the ActionProperties for the new Action. // // If ActionProperties is nil, DefaultActionProperties will be used. ActionProperties func(cmd string, args ...string) ActionProperties }
CmdConfig is used to create redis command Actions with particular settings. All fields are optional, all methods are thread-safe.
The global Cmd and FlatCmd functions are shortcuts for using an empty CmdConfig{}.
This can be useful for working with custom commands provided by Redis modules for which the built in logic may return sub-optimal properties or in case some commands are known to be slow in some cases and therefore should not use connection sharing.
All methods on CmdConfig are safe for concurrent use from different goroutines.
ExampleCmdConfig_Cmd shows how to use the CmdConfig type to handle an imaginary CUSTOM.MZADD command which allows adding members to multiple sorted sets at once.
The assumed syntax for the command is:
CUSTOM.MZADD key score member [key score member ...]
Code:
{ cfg := CmdConfig{ // Specify a custom ActionProperties resolver which will be called for // actions created using cfg. ActionProperties: func(cmd string, args ...string) ActionProperties { keys := make([]string, 0, len(args)/3) // The first value of each pair of 3 arguments is the key. for i := 0; i < len(args); i += 3 { keys = append(keys, args[i]) } return ActionProperties{ Keys: keys, CanRetry: true, CanPipeline: true, CanShareConn: true, } }, } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { // handle error } // Use our cfg type to created the Action instead of the global Cmd function. if err := client.Do(ctx, cfg.Cmd(nil, "CUSTOM.MZADD", "key1", "100", "member1", "key2", "200", "member2", "key3", "300", "member3")); err != nil { // handle error } }
func (cfg CmdConfig) Cmd(rcv interface{}, cmd string, args ...string) Action
Cmd works like the global Cmd function but can be additionally configured using fields on CmdConfig. See the global Cmd's documentation for further details.
func (cfg CmdConfig) FlatCmd(rcv interface{}, cmd string, args ...interface{}) Action
FlatCmd works like the global FlatCmd function but can be additionally configured using fields on CmdConfig. See the global FlatCmd's documentation for further details.
type Conn interface { // The Do method merely calls the Action's Perform method with the Conn as // the argument. Client // EncodeDecode will encode marshal onto the connection, then decode a // response into unmarshalInto (see resp3.Marshal and resp3.Unmarshal, // respectively). If either parameter is nil then that step is skipped. // // If EncodeDecode is called concurrently on the same Conn then the order of // decode steps will match the order of encode steps. // // NOTE If ctx is canceled then marshaling, and possibly unmarshaling, might // still occur in the background even though EncodeDecode has returned. EncodeDecode(ctx context.Context, marshal, unmarshalInto interface{}) error }
Conn is a Client wrapping a single network connection which synchronously reads/writes data using redis's RESP protocol.
A Conn can be used directly as a Client, but in general you probably want to use a Pool instead.
func Dial(ctx context.Context, network, addr string) (Conn, error)
Dial is a shortcut for calling Dial on a zero-value Dialer.
func NewPubSubConnStub(remoteNetwork, remoteAddr string, fn func(context.Context, []string) interface{}) (Conn, chan<- PubSubMessage)
NewPubSubConnStub returns a stubbed Conn, much like NewStubConn does, which pretends it is a Conn to a real redis instance, but is instead using the given callback to service requests. It is primarily useful for writing tests.
NewPubSubConnStub differs from NewStubConn in that EncodeDecode calls for the (P)SUBSCRIBE, (P)UNSUBSCRIBE, and PING commands will be intercepted and handled as per redis' expected pubsub functionality. A PubSubMessage may be written to the returned channel at any time, and if the returned Conn has had (P)SUBSCRIBE called matching that PubSubMessage then the PubSubMessage will be written to the Conn's internal buffer.
This is intended to be used for mocking services which can perform both normal redis commands and pubsub (e.g. a real redis instance, redis sentinel). The returned Conn can be passed into NewPubSubConn.
remoteNetwork and remoteAddr can be empty, but if given will be used as the return from the Addr method.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Make a pubsub stub conn which will return nil for everything except // pubsub commands (which will be handled automatically) stub, stubCh := NewPubSubConnStub("tcp", "127.0.0.1:6379", func(context.Context, []string) interface{} { return nil }) // These writes shouldn't do anything, initially, since we haven't // subscribed to anything go func() { for { stubCh <- PubSubMessage{ Channel: "foo", Message: []byte("bar"), } time.Sleep(1 * time.Second) } }() // Use PubSubConfig to wrap the stub like we would for a normal redis // connection pstub := (PubSubConfig{}).New(stub) // Subscribe to "foo" if err := pstub.Subscribe(ctx, "foo"); err != nil { panic(err) } // now msgCh is subscribed the publishes being made by the go-routine above // will start being written to it for { msg, err := pstub.Next(ctx) if errors.Is(err, context.DeadlineExceeded) { break } else if err != nil { panic(err) } log.Printf("read msg: %#v", msg) } }
func NewStubConn(remoteNetwork, remoteAddr string, fn func(context.Context, []string) interface{}) Conn
NewStubConn returns a (fake) Conn which pretends it is a Conn to a real redis instance, but is instead using the given callback to service requests. It is primarily useful for writing tests.
When EncodeDecode is called the value to be marshaled is converted into a []string and passed to the callback. The return from the callback is then marshaled into an internal buffer. The value to be decoded is unmarshaled into using the internal buffer. If the internal buffer is empty at this step then the call will block.
remoteNetwork and remoteAddr can be empty, but if given will be used as the return from the Addr method.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() m := map[string]string{} stub := NewStubConn("tcp", "127.0.0.1:6379", func(_ context.Context, args []string) interface{} { switch args[0] { case "GET": return m[args[1]] case "SET": m[args[1]] = args[2] return nil default: return fmt.Errorf("this stub doesn't support command %q", args[0]) } }) if err := stub.Do(ctx, Cmd(nil, "SET", "foo", "1")); err != nil { panic(err) } var foo int if err := stub.Do(ctx, Cmd(&foo, "GET", "foo")); err != nil { panic(err) } fmt.Printf("foo: %d\n", foo) }
type Dialer struct { // CustomConn indicates that this callback should be used in place of Dial // when Dial is called. All behavior of Dialer/Dial is superceded when this // is set. CustomConn func(ctx context.Context, network, addr string) (Conn, error) // AuthPass will cause Dial to perform an AUTH command once the connection // is created, using AuthUser (if given) and AuthPass. // // If this is set and a redis URI is passed to Dial which also has a password // set, this takes precedence. AuthUser, AuthPass string // SelectDB will cause Dial to perform a SELECT command once the connection // is created, using the given database index. // // If this is set and a redis URI is passed to Dial which also has a // database index set, this takes precedence. SelectDB string // Protocol can be used to automatically set the RESP protocol version. // // If Protocol is not empty the Dialer will send a HELLO command with the // value of Protocol as version, otherwise no HELLO command will be send. Protocol string // NetDialer is used to create the underlying network connection. // // Defaults to net.Dialer. NetDialer interface { DialContext(context.Context, string, string) (net.Conn, error) } // WriteFlushInterval indicates how often the Conn should flush writes // to the underlying net.Conn. // // Conn uses a bufio.Writer to write data to the underlying net.Conn, and so // requires Flush to be called on that bufio.Writer in order for the data to // be fully written. By delaying Flush calls until multiple concurrent // EncodeDecode calls have been made Conn can reduce system calls and // significantly improve performance in that case. // // All EncodeDecode calls will be delayed up to WriteFlushInterval, with one // exception: if more than WriteFlushInterval has elapsed since the last // EncodeDecode call then the next EncodeDecode will Flush immediately. This // allows Conns to behave well during both low and high activity periods. // // Defaults to 0, indicating Flush will be called upon each EncodeDecode // call without delay. WriteFlushInterval time.Duration // NewRespOpts returns a fresh instance of a *resp.Opts to be used by the // underlying connection. This maybe be called more than once. // // Defaults to resp.NewOpts. NewRespOpts func() *resp.Opts }
Dialer is used to create Conns with particular settings. All fields are optional, all methods are thread-safe.
func (d Dialer) Dial(ctx context.Context, network, addr string) (Conn, error)
Dial creates a Conn using the Dialer configuration.
In place of a host:port address, Dial also accepts a URI, as per:
https://www.iana.org/assignments/uri-schemes/prov/redis
If the URI has an AUTH password or db specified Dial will attempt to perform the AUTH and/or SELECT as well.
type EvalScript struct { // contains filtered or unexported fields }
EvalScript contains the body of a script to be used with redis' EVAL functionality. Call Cmd on a EvalScript to actually create an Action which can be run.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // set as a global variable, this script is equivalent to the builtin GETSET // redis command var getSet = NewEvalScript(` local prev = redis.call("GET", KEYS[1]) redis.call("SET", KEYS[1], ARGV[1]) return prev `) client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { // handle error } key := "someKey" var prevVal string if err := client.Do(ctx, getSet.Cmd(&prevVal, []string{key}, "myVal")); err != nil { // handle error } fmt.Printf("value of key %q used to be %q\n", key, prevVal) }
func NewEvalScript(script string) EvalScript
NewEvalScript initializes a EvalScript instance with the given script.
func (es EvalScript) Cmd(rcv interface{}, keys []string, args ...string) Action
Cmd is like the top-level Cmd but it uses the the EvalScript to perform an EVALSHA command (and will automatically fallback to EVAL as necessary).
func (es EvalScript) FlatCmd(rcv interface{}, keys []string, args ...interface{}) Action
FlatCmd is like the top level FlatCmd except it uses the EvalScript to perform an EVALSHA command (and will automatically fallback to EVAL as necessary).
type Maybe struct { // Rcv is the receiver which will be unmarshaled into. Rcv interface{} // Null will be true if a null RESP value is unmarshaled. Null bool // Empty will be true if an empty aggregated RESP type (array, set, map, // push, or attribute) is unmarshaled. Empty bool }
Maybe is a type which wraps a receiver being unmarshaled into. When unmarshaling takes place Maybe will also populate its other fields accordingly.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { // handle error } var rcv int64 mb := Maybe{Rcv: &rcv} if err := client.Do(ctx, Cmd(&mb, "GET", "foo")); err != nil { // handle error } else if mb.Null { fmt.Println("rcv is null") } else { fmt.Printf("rcv is %d\n", rcv) } }
func (mb *Maybe) UnmarshalRESP(br resp.BufferedReader, o *resp.Opts) error
UnmarshalRESP implements the method for the resp.Unmarshaler interface.
type MultiClient interface { // Do performs an Action on a Conn from a primary instance. Do(context.Context, Action) error // DoSecondary performs the Action on a Conn from a secondary instance. If // no secondary instance is available then this is equivalent to Do. DoSecondary(context.Context, Action) error // Clients returns all Clients held by MultiClient, formatted as a mapping // of primary redis instance address to a ReplicaSet instance for that // primary. Clients() (map[string]ReplicaSet, error) // Once Close() is called all future method calls on the Client will return // an error Close() error }
MultiClient wraps one or more underlying Clients for different redis instances. MultiClient methods are thread-safe and may return the same Client instance to different callers at the same time. All returned Clients should _not_ have Close called on them.
If the topology backing a MultiClient changes (e.g. a failover occurs) while the Clients it returned are still being used then those Clients may return errors related to that change.
Sentinel and Cluster are both MultiClients.
func NewMultiClient(rs ReplicaSet) MultiClient
NewMultiClient wraps a ReplicaSet such that it implements MultiClient.
type PersistentPubSubConnConfig struct { // Dialer is used to create new Conns. Dialer Dialer // PubSubConfig is used to create PubSubConns from the Conns created by // Dialer. PubSubConfig PubSubConfig // Trace contains callbacks that a persistent PubSubConn can use to trace // itself. // // All callbacks are blocking. Trace trace.PersistentPubSubTrace }
PersistentPubSubConnConfig is used to create a persistent PubSubConn with particular settings. All fields are optional, all methods are thread-safe.
Code:
{ // Example of how to use a persistent PubSubConn with a Cluster. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Initialize the cluster in any way you see fit cluster, err := (ClusterConfig{}).New(ctx, []string{"127.0.0.1:6379"}) if err != nil { panic(err) } // Have PersistentPubSubConfig pick a random cluster node everytime it wants // to make a new connection. If the node fails PersistentPubSubConfig will // automatically pick a new node to connect to. pconn, err := (PersistentPubSubConnConfig{}).New(ctx, func() (string, string, error) { clients, err := cluster.Clients() if err != nil { return "", "", err } for addr := range clients { return "tcp", addr, nil } return "", "", errors.New("no clients in the cluster") }) if err != nil { panic(err) } // Use the PubSubConn as normal. if err := pconn.Subscribe(ctx, "myChannel"); err != nil { panic(err) } for { msg, err := pconn.Next(ctx) if errors.Is(err, context.DeadlineExceeded) { break } else if err != nil { panic(err) } log.Printf("publish to channel %q received: %q", msg.Channel, msg.Message) } }
func (cfg PersistentPubSubConnConfig) New( ctx context.Context, cb func() (network, addr string, err error), ) ( PubSubConn, error, )
New is like PubSubConfig.New, but instead of taking in an existing Conn to wrap it will create its own using the network/address returned from the given callback.
If the Conn is ever severed then the callback will be re-called, a new Conn will be created, and that Conn will be reset to the previous Conn's state.
This is effectively a way to have a permanent PubSubConn established which supports subscribing/unsubscribing but without the hassle of implementing reconnect/re-subscribe logic.
type Pipeline struct { // contains filtered or unexported fields }
Pipeline is an Action which combines multiple commands into a single network round-trip. Pipeline accumulates commands via its Append method. When Pipeline is performed (i.e. passed into a Client's Do method) it will first write all commands as a single write operation and then read all command responses with a single read operation.
Pipeline may be Reset in order to re-use an instance for multiple sets of commands. A Pipeline may _not_ be performed multiple times without being Reset in between.
NOTE that, while a Pipeline performs all commands on a single Conn, it shouldn't be used by itself for MULTI/EXEC transactions, because if there's an error it won't discard the incomplete transaction. Use WithConn or EvalScript for transactional functionality instead.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") // or any other client if err != nil { // handle error } var fooVal string p := NewPipeline() p.Append(FlatCmd(nil, "SET", "foo", 1)) p.Append(Cmd(&fooVal, "GET", "foo")) if err := client.Do(ctx, p); err != nil { // handle error } fmt.Printf("fooVal: %q\n", fooVal) // At this point the Pipeline cannot be used again, unless Reset is called. var barVal string p.Reset() p.Append(FlatCmd(nil, "SET", "bar", 2)) p.Append(Cmd(&barVal, "GET", "bar")) if err := client.Do(ctx, p); err != nil { // handle error } fmt.Printf("barVal: %q\n", barVal) // Output: fooVal: "1" // barVal: "2" }
Output:
fooVal: "1" barVal: "2"
func NewPipeline() *Pipeline
NewPipeline returns a Pipeline instance to which Actions can be Appended.
func (p *Pipeline) Append(a Action)
Append adds the Action to the end of the list of Actions to pipeline together. This will panic if given an Action without the CanPipeline property set to true.
func (p *Pipeline) Perform(ctx context.Context, c Conn) error
Perform implements the method for the Action interface.
func (p *Pipeline) Properties() ActionProperties
Properties implements the method for the Action interface.
func (p *Pipeline) Reset()
Reset discards all Actions and resets all internal state. A Pipeline with Reset called on it is equivalent to one returned by NewPipeline.
type PoolConfig struct { // CustomPool indicates that this callback should be used in place of New // when New is called. All behavior of New is superceded when this is set. CustomPool func(ctx context.Context, network, addr string) (Client, error) // Dialer is used by Pool to create new Conns to the Pool's redis instance. Dialer Dialer // Size indicates the number of Conns the Pool will attempt to maintain. // // Defaults to 4. Size int // PingInterval specifies the interval at which Pool will pick a random Conn // and call PING on it. // // If not given then the default value is calculated to be: // 5*seconds / Size. // // Can be set to -1 to disable periodic pings. PingInterval time.Duration // MinReconnectInterval describes the minimum amount of time the Pool will // wait between creating new Conns when previous Conns in the Pool have been // closed due to errors. // // Failure to create new Conns will result in the time between creation // attempts increasing exponentially, up to MaxReconnectInterval. // MinReconnectInterval and MaxReconnectInterval can be set to equal values // to disable exponential backoff. // // MinReconnectInterval defaults to 125 * time.Millisecond. // MaxReconnectInterval defaults to 4 * time.Second. MinReconnectInterval, MaxReconnectInterval time.Duration // Trace contains callbacks that a Pool can use to trace itself. // // All callbacks are blocking. Trace trace.PoolTrace }
PoolConfig is used to create Pool instances with particular settings. All fields are optional, all methods are thread-safe.
func (cfg PoolConfig) New(ctx context.Context, network, addr string) (Client, error)
New creates and returns a pool instance using the PoolConfig.
type PubSubConfig struct { // PingInterval is the interval at which PING will be called on the // PubSubConn in the background. // // Defaults to 5 * time.Second. Can be set to -1 to disable periodic pings. PingInterval time.Duration }
PubSubConfig is used to create a PubSubConn with particular settings. All fields are optional, all methods are thread-safe.
func (cfg PubSubConfig) New(conn Conn) PubSubConn
New returns a PubSubConn instance using the given PubSubConfig.
type PubSubConn interface { // Subscribe subscribes the PubSubConn to the given set of channels. Subscribe(ctx context.Context, channels ...string) error // Unsubscribe unsubscribes the PubSubConn from the given set of channels. Unsubscribe(ctx context.Context, channels ...string) error // PSubscribe is like Subscribe, but it subscribes to a set of patterns and // not individual channels. PSubscribe(ctx context.Context, patterns ...string) error // PUnsubscribe is like Unsubscribe, but it unsubscribes from a set of // patterns and not individual channels. PUnsubscribe(ctx context.Context, patterns ...string) error // Ping performs a simple Ping command on the PubSubConn, returning an error // if it failed for some reason. // // Ping will be periodically called by Next in the default PubSubConn // implementations. Ping(ctx context.Context) error // Next blocks until a message is published to the PubSubConn or an error is // encountered. If the context is canceled then the resulting error is // returned immediately. Next(ctx context.Context) (PubSubMessage, error) // Close closes the PubSubConn and cleans up all resources it holds. Close() error }
PubSubConn wraps an existing Conn to support redis' pubsub system. Unlike Conn, a PubSubConn's methods are _not_ thread-safe.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Create a normal redis connection conn, err := Dial(ctx, "tcp", "127.0.0.1:6379") if err != nil { panic(err) } // Pass that connection into PubSub, conn should never get used after this pconn := (PubSubConfig{}).New(conn) defer pconn.Close() // this will close Conn as well // Subscribe to a channel called "myChannel". if err := pconn.Subscribe(ctx, "myChannel"); err != nil { panic(err) } for { msg, err := pconn.Next(ctx) if errors.Is(err, context.DeadlineExceeded) { break } else if err != nil { panic(err) } log.Printf("publish to channel %q received: %q", msg.Channel, msg.Message) } }
type PubSubMessage struct { Type string // "message" or "pmessage" Pattern string // will be set if Type is "pmessage" Channel string Message []byte }
PubSubMessage describes a message being published to a redis pubsub channel.
func (m PubSubMessage) MarshalRESP(w io.Writer, o *resp.Opts) error
MarshalRESP implements the Marshaler interface.
func (m *PubSubMessage) UnmarshalRESP(br resp.BufferedReader, o *resp.Opts) error
UnmarshalRESP implements the Unmarshaler interface.
type ReplicaSet struct { Primary Client Secondaries []Client }
ReplicaSet holds the Clients of a redis replica set, consisting of a single primary (read+write) instance and zero or more secondary (read-only) instances.
type Scanner interface { Next(context.Context, *string) bool Close() error }
Scanner is used to iterate through the results of a SCAN call (or HSCAN, SSCAN, etc...)
Once created, repeatedly call Next() on it to fill the passed in string pointer with the next result. Next will return false if there's no more results to retrieve or if an error occurred, at which point Close should be called to retrieve any error.
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Initialize the cluster in any way you see fit cluster, err := (ClusterConfig{}).New(ctx, []string{"127.0.0.1:6379"}) if err != nil { panic(err) } s := (ScannerConfig{Command: "SCAN"}).NewMulti(cluster) var key string for s.Next(ctx, &key) { log.Printf("key: %q", key) } if err := s.Close(); err != nil { log.Fatal(err) } }
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6739") if err != nil { log.Fatal(err) } s := (ScannerConfig{Command: "HSCAN", Key: "somekey"}).New(client) var key string for s.Next(ctx, &key) { log.Printf("key: %q", key) } if err := s.Close(); err != nil { log.Fatal(err) } }
Code:
{ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() client, err := (PoolConfig{}).New(ctx, "tcp", "127.0.0.1:6379") if err != nil { log.Fatal(err) } s := (ScannerConfig{}).New(client) var key string for s.Next(ctx, &key) { log.Printf("key: %q", key) } if err := s.Close(); err != nil { log.Fatal(err) } }
type ScannerConfig struct { // The scan command to do, e.g. "SCAN", "HSCAN", etc... // // Defaults to "SCAN". Command string // The key to perform the scan on. Only necessary when Command isn't "SCAN" Key string // An optional pattern to filter returned keys by Pattern string // An optional count hint to send to redis to indicate number of keys to // return per call. This does not affect the actual results of the scan // command, but it may be useful for optimizing certain datasets Count int // An optional type name to filter for values of the given type. // The type names are the same as returned by the "TYPE" command. // This if only available in Redis 6 or newer and only works with "SCAN". // If used with an older version of Redis or with a Command other than // "SCAN", scanning will fail. Type string }
ScannerConfig is used to create Scanner instances with particular settings. All fields are optional, all methods are thread-safe.
func (cfg ScannerConfig) New(c Client) Scanner
New creates a new Scanner instance which will iterate over the redis instance's Client using the ScannerConfig.
func (cfg ScannerConfig) NewMulti(mc MultiClient) Scanner
NewMulti returns a Scanner which will scan over every primary instance in the MultiClient. This will panic if the ScanOpt's Command isn't "SCAN".
NOTE this is primarily useful for scanning over all keys in a Cluster. It is not necessary to use this otherwise, unless you have implemented your own MultiClient which holds multiple primary Clients.
type Sentinel struct { // contains filtered or unexported fields }
Sentinel is a MultiClient which contains all information needed to interact with a redis replica set managed by redis sentinel, including a set of pools to each of its instances. All methods on Sentinel are thread-safe.
func (sc *Sentinel) Clients() (map[string]ReplicaSet, error)
Clients implements the method for the MultiClient interface. The returned map will only ever have one key/value pair.
func (sc *Sentinel) Close() error
Close implements the method for the Client interface.
func (sc *Sentinel) Do(ctx context.Context, a Action) error
Do implements the method for the Client interface. It will perform the given Action on the current primary.
func (sc *Sentinel) DoSecondary(ctx context.Context, a Action) error
DoSecondary implements the method for the Client interface. It will perform the given Action on a random secondary, or the primary if no secondary is available.
For DoSecondary to work, replicas must be configured with replica-read-only enabled, otherwise calls to DoSecondary may by rejected by the replica.
func (sc *Sentinel) SentinelAddrs() ([]string, error)
SentinelAddrs returns the addresses of all known sentinels.
type SentinelConfig struct { // PoolConfig is used by Sentinel to create Clients for redis instances in // the replica set. PoolConfig PoolConfig // SentinelDialer is the Dialer instance used to create Conns to sentinels. SentinelDialer Dialer // Trace contains callbacks that a Sentinel can use to trace itself. // // All callbacks are blocking. Trace trace.SentinelTrace }
SentinelConfig is used to create Sentinel instances with particular settings. All fields are optional, all methods are thread-safe.
func (cfg SentinelConfig) New(ctx context.Context, primaryName string, sentinelAddrs []string) (*Sentinel, error)
New creates and returns a *Sentinel instance using the SentinelConfig.
type StreamConfig struct { // After indicates that only entries newer than the given ID will be // returned. If Group is set on the outer StreamReaderConfig then only // pending entries newer than the given ID will be returned. // // The zero StreamEntryID value is a valid value here. After StreamEntryID // Latest indicates that only entries added after the first call to Next // should be returned. If Group is set on the outer StreamReaderConfig then // only entries which haven't been delivered to other consumers will be // returned. Latest bool // PendingThenLatest can only be used if Group is set on the outer // StreamReaderConfig. The reader will first return entries which are marked // as pending for the consumer. Once all pending entries are consumed then // the reader will switch to returning entries which haven't been delivered // to other consumers. PendingThenLatest bool }
StreamConfig is used to configure the reading behavior of individual streams being read by a StreamReader. Exactly one field should be filled in.
type StreamEntries struct { Stream string Entries []StreamEntry }
StreamEntries is a stream name and set of entries as returned by XREAD and XREADGROUP. The results from a call to XREAD(GROUP) can be unmarshaled into a []StreamEntries.
func (s *StreamEntries) UnmarshalRESP(br resp.BufferedReader, o *resp.Opts) error
UnmarshalRESP implements the resp.Unmarshaler interface.
type StreamEntry struct { // ID is the ID of the entry in a stream. ID StreamEntryID // Fields contains the fields and values for the stream entry. Fields [][2]string }
StreamEntry is an entry in a stream as returned by XRANGE, XREAD and XREADGROUP.
func (s *StreamEntry) UnmarshalRESP(br resp.BufferedReader, o *resp.Opts) error
UnmarshalRESP implements the resp.Unmarshaler interface.
type StreamEntryID struct { // Time is the first part of the ID, which is based on the time of the server that Redis runs on. Time uint64 // Seq is the sequence number of the ID for entries with the same Time value. Seq uint64 }
StreamEntryID represents an ID used in a Redis stream with the format <time>-<seq>.
func (s StreamEntryID) Before(o StreamEntryID) bool
Before returns true if s comes before o in a stream (is less than o).
func (s *StreamEntryID) MarshalRESP(w io.Writer, o *resp.Opts) error
MarshalRESP implements the resp.Marshaler interface.
func (s StreamEntryID) Next() StreamEntryID
Next returns the next stream entry ID or s if there is no higher id (s is 18446744073709551615-18446744073709551615).
func (s StreamEntryID) Prev() StreamEntryID
Prev returns the previous stream entry ID or s if there is no prior id (s is 0-0).
func (s StreamEntryID) String() string
String returns the ID in the format <time>-<seq> (the same format used by Redis).
String implements the fmt.Stringer interface.
func (s *StreamEntryID) UnmarshalRESP(br resp.BufferedReader, o *resp.Opts) error
UnmarshalRESP implements the resp.Unmarshaler interface.
type StreamReader interface { // Next returns a new entry for any of the configured streams. If no new // entries are available then Next uses the context's deadline to determine // how long to block for (via the BLOCK argument to XREAD(GROUP)). If the // context has no deadline then Next will block indefinitely. // // Next returns ErrNoStreamEntries if there were no entries to be returned. // In general Next should be called again after receiving this error. // // The StreamReader should not be used again if an error which is not // ErrNoStreamEntries is returned. Next(context.Context) (stream string, entry StreamEntry, err error) }
StreamReader allows reading StreamEntrys sequentially from one or more streams.
type StreamReaderConfig struct { // Group is an optional consumer group name. // // If Group is not empty reads will use XREADGROUP with the Group as the // group name and Consumer as the consumer name. XREAD will be used // otherwise. Group string // Consumer is an optional consumer name for use with Group. Consumer string // NoAck enables passing the NOACK flag to XREADGROUP. NoAck bool // NoBlock disables blocking when no new data is available. NoBlock bool // Count can be used to limit the number of entries retrieved by each // internal redis call to XREAD(GROUP). Can be set to -1 to indicate no // limit. // // Defaults to 20. Count int }
StreamReaderConfig is used to create StreamReader instances with particular settings. All fields are optional, all methods are thread-safe.
func (cfg StreamReaderConfig) New(c Client, streamCfgs map[string]StreamConfig) StreamReader
New returns a new StreamReader for the given Client. The StreamReader will read from the streams given as the keys of the map.
type Tuple []interface{}
Tuple is a helper type which can be used when unmarshaling a RESP array. Each element of Tuple should be a pointer receiver which the corresponding element of the RESP array will be unmarshaled into, or nil to skip that element. The length of Tuple must match the length of the RESP array being unmarshaled.
Tuple is useful when unmarshaling the results from commands like EXEC and EVAL.
func (t Tuple) UnmarshalRESP(br resp.BufferedReader, o *resp.Opts) error
UnmarshalRESP implements the method for the resp.Unmarshaler interface.
Package resp contains types and utilities useful for interacting with RESP protocols, without actually implementing any RESP protocol.
Package resp3 implements the upgraded redis RESP3 protocol, a plaintext protocol which is also binary safe and backwards compatible with the original RESP2 protocol.
Package trace contains all the types provided for tracing within the radix package.