NOTE: These gozmq bindings are in maintenance mode. Only critical bugs will be fixed. Henceforth I would suggest using @pebbe's actively maintained bindings for zmq2, zmq3 and zmq4.
This package implements Go (golang) bindings for the 0mq C API.
GoZMQ does not support zero-copy.
A full list of examples is included in the zguide.
Note that this is not the same as this implementation or this implementation.
GoZMQ has made some public changes that will break old code. Fortunately, we've also written a tool based on go fix that will upgrade your code for you! Here's how to run it over your source (after making a backup of course):
go get github.com/alecthomas/gozmq/gozmqfix
cd $YOUR_SOURCE_DIR
gozmqfix .
GoZMQ currently supports ZMQ 2.1.x, 2.2.x, 3.x and 4.x. Following are instructions on how to compile against these versions.
For ZeroMQ 2.2.x install with:
go get github.com/alecthomas/gozmq
For 2.1.x install with:
go get -tags zmq_2_1 github.com/alecthomas/gozmq
For 3.x install with:
go get -tags zmq_3_x github.com/alecthomas/gozmq
For 4.x install with:
go get -tags zmq_4_x github.com/alecthomas/gozmq
If the go tool can't find zmq and you know it is installed, you may need to override the C compiler/linker flags.
eg. If you installed zmq into /opt/zmq you might try:
CGO_CFLAGS=-I/opt/zmq/include CGO_LDFLAGS=-L/opt/zmq/lib
go get github.com/alecthomas/gozmq
If you get errors like this with 'go get' or 'go build':
1: error: 'ZMQ_FOO' undeclared (first use in this function)
There are two possibilities:
- Your version of zmq is very old. In this case you will need to download and build zmq yourself.
- You are building gozmq against the wrong version of zmq. See the installation instructions for details on how to target the correct version.
The API implemented by this package does not attempt to expose
zmq_msg_t at all. Instead, Recv() and Send() both operate on byte
slices, allocating and freeing the memory automatically. Currently this
requires copying to/from C malloced memory, but a future implementation
may be able to avoid this to a certain extent.
All major features are supported: contexts, sockets, devices, and polls.
Here are direct translations of some of the examples from this blog post.
A simple echo server:
package main
import zmq "github.com/alecthomas/gozmq"
func main() {
context, _ := zmq.NewContext()
socket, _ := context.NewSocket(zmq.REP)
socket.Bind("tcp://127.0.0.1:5000")
socket.Bind("tcp://127.0.0.1:6000")
for {
msg, _ := socket.Recv(0)
println("Got", string(msg))
socket.Send(msg, 0)
}
}A simple client for the above server:
package main
import "fmt"
import zmq "github.com/alecthomas/gozmq"
func main() {
context, _ := zmq.NewContext()
socket, _ := context.NewSocket(zmq.REQ)
socket.Connect("tcp://127.0.0.1:5000")
socket.Connect("tcp://127.0.0.1:6000")
for i := 0; i < 10; i++ {
msg := fmt.Sprintf("msg %d", i)
socket.Send([]byte(msg), 0)
println("Sending", msg)
socket.Recv(0)
}
}GoZMQ does not support zero-copy.
GoZMQ does not attempt to expose zmq_msg_t at all. Instead, Recv() and Send()
both operate on byte slices, allocating and freeing the memory automatically.
Currently this requires copying to/from C malloced memory, but a future
implementation may be able to avoid this to a certain extent.
It's not entirely clear from the 0mq documentation how memory for
zmq_msg_t and packet data is managed once 0mq takes ownership. After
digging into the source a little, this package operates under the
following (educated) assumptions:
- References to
zmq_msg_tstructures are not held by the C API beyond the duration of any function call. - Packet data is reference counted internally by the C API. The count is incremented when a packet is queued for delivery to a destination (the inference being that for delivery to N destinations, the reference count will be incremented N times) and decremented once the packet has either been delivered or errored.
const (
// NewSocket types
PAIR = SocketType(C.ZMQ_PAIR)
PUB = SocketType(C.ZMQ_PUB)
SUB = SocketType(C.ZMQ_SUB)
REQ = SocketType(C.ZMQ_REQ)
REP = SocketType(C.ZMQ_REP)
DEALER = SocketType(C.ZMQ_DEALER)
ROUTER = SocketType(C.ZMQ_ROUTER)
PULL = SocketType(C.ZMQ_PULL)
PUSH = SocketType(C.ZMQ_PUSH)
XPUB = SocketType(C.ZMQ_XPUB)
XSUB = SocketType(C.ZMQ_XSUB)
// Deprecated aliases
XREQ = DEALER
XREP = ROUTER
UPSTREAM = PULL
DOWNSTREAM = PUSH
// NewSocket options
AFFINITY = UInt64SocketOption(C.ZMQ_AFFINITY)
IDENTITY = StringSocketOption(C.ZMQ_IDENTITY)
SUBSCRIBE = StringSocketOption(C.ZMQ_SUBSCRIBE)
UNSUBSCRIBE = StringSocketOption(C.ZMQ_UNSUBSCRIBE)
RATE = Int64SocketOption(C.ZMQ_RATE)
RECOVERY_IVL = Int64SocketOption(C.ZMQ_RECOVERY_IVL)
SNDBUF = UInt64SocketOption(C.ZMQ_SNDBUF)
RCVBUF = UInt64SocketOption(C.ZMQ_RCVBUF)
FD = Int64SocketOption(C.ZMQ_FD)
EVENTS = UInt64SocketOption(C.ZMQ_EVENTS)
TYPE = UInt64SocketOption(C.ZMQ_TYPE)
LINGER = IntSocketOption(C.ZMQ_LINGER)
RECONNECT_IVL = IntSocketOption(C.ZMQ_RECONNECT_IVL)
RECONNECT_IVL_MAX = IntSocketOption(C.ZMQ_RECONNECT_IVL_MAX)
BACKLOG = IntSocketOption(C.ZMQ_BACKLOG)
// Send/recv options
SNDMORE = SendRecvOption(C.ZMQ_SNDMORE)
)const (
POLLIN = PollEvents(C.ZMQ_POLLIN)
POLLOUT = PollEvents(C.ZMQ_POLLOUT)
POLLERR = PollEvents(C.ZMQ_POLLERR)
)const (
STREAMER = DeviceType(C.ZMQ_STREAMER)
FORWARDER = DeviceType(C.ZMQ_FORWARDER)
QUEUE = DeviceType(C.ZMQ_QUEUE)
)const (
RCVTIMEO = IntSocketOption(C.ZMQ_RCVTIMEO)
SNDTIMEO = IntSocketOption(C.ZMQ_SNDTIMEO)
)const (
RCVMORE = UInt64SocketOption(C.ZMQ_RCVMORE)
RECOVERY_IVL_MSEC = Int64SocketOption(C.ZMQ_RECOVERY_IVL_MSEC)
SWAP = Int64SocketOption(C.ZMQ_SWAP)
MCAST_LOOP = Int64SocketOption(C.ZMQ_MCAST_LOOP)
HWM = UInt64SocketOption(C.ZMQ_HWM)
NOBLOCK = SendRecvOption(C.ZMQ_NOBLOCK)
// Forwards-compatible aliases:
DONTWAIT = NOBLOCK
)const (
RCVMORE = IntSocketOption(C.ZMQ_RCVMORE)
SNDHWM = IntSocketOption(C.ZMQ_SNDHWM)
RCVHWM = IntSocketOption(C.ZMQ_RCVHWM)
// TODO Not documented in the man page...
//LAST_ENDPOINT = UInt64SocketOption(C.ZMQ_LAST_ENDPOINT)
FAIL_UNROUTABLE = BoolSocketOption(C.ZMQ_FAIL_UNROUTABLE)
TCP_KEEPALIVE = IntSocketOption(C.ZMQ_TCP_KEEPALIVE)
TCP_KEEPALIVE_CNT = IntSocketOption(C.ZMQ_TCP_KEEPALIVE_CNT)
TCP_KEEPALIVE_IDLE = IntSocketOption(C.ZMQ_TCP_KEEPALIVE_IDLE)
TCP_KEEPALIVE_INTVL = IntSocketOption(C.ZMQ_TCP_KEEPALIVE_INTVL)
TCP_ACCEPT_FILTER = StringSocketOption(C.ZMQ_TCP_ACCEPT_FILTER)
// Message options
MORE = MessageOption(C.ZMQ_MORE)
// Send/recv options
DONTWAIT = SendRecvOption(C.ZMQ_DONTWAIT)
// Deprecated aliases
NOBLOCK = DONTWAIT
)var (
// Additional ZMQ errors
ENOTSOCK error = zmqErrno(C.ENOTSOCK)
EFSM error = zmqErrno(C.EFSM)
EINVAL error = zmqErrno(C.EINVAL)
ENOCOMPATPROTO error = zmqErrno(C.ENOCOMPATPROTO)
ETERM error = zmqErrno(C.ETERM)
EMTHREAD error = zmqErrno(C.EMTHREAD)
)func Device(t DeviceType, in, out *Socket) errorrun a zmq_device passing messages between in and out
func Poll(items []PollItem, timeout time.Duration) (count int, err error)Poll ZmqSockets and file descriptors for I/O readiness. Timeout is in time.Duration. The smallest possible timeout is time.Millisecond for ZeroMQ version 3 and above, and time.Microsecond for earlier versions.
func Proxy(in, out, capture *Socket) errorrun a zmq_proxy with in, out and capture sockets
func Version() (int, int, int)void zmq_version (int *major, int *minor, int *patch);
type BoolSocketOption inttype Context struct {
}- A context handles socket creation and asynchronous message delivery. * There should generally be one context per application.
func NewContext() (*Context, error)Create a new context.
func (c *Context) Close()func (c *Context) IOThreads() (int, error)Get a context option.
func (c *Context) MaxSockets() (int, error)func (c *Context) NewSocket(t SocketType) (*Socket, error)Create a new socket. void *zmq_socket (void *context, int type);
func (c *Context) SetIOThreads(value int) errorSet a context option.
func (c *Context) SetMaxSockets(value int) errortype DeviceType inttype Int64SocketOption inttype IntSocketOption inttype MessageOption inttype PollEvents C.shorttype PollItem struct {
Socket *Socket // socket to poll for events on
Fd ZmqOsSocketType // fd to poll for events on as returned from os.File.Fd()
Events PollEvents // event set to poll for
REvents PollEvents // events that were present
}Item to poll for read/write events on, either a *Socket or a file descriptor
type PollItems []PollItema set of items to poll for events on
type SendRecvOption inttype Socket struct {
}func (s *Socket) Affinity() (uint64, error)ZMQ_AFFINITY: Retrieve I/O thread affinity.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc7
func (s *Socket) Backlog() (int, error)ZMQ_BACKLOG: Retrieve maximum length of the queue of outstanding connections.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc18
func (s *Socket) Bind(address string) errorBind the socket to a listening address. int zmq_bind (void *s, const char *addr);
func (s *Socket) Close() errorShutdown the socket. int zmq_close (void *s);
func (s *Socket) Connect(address string) errorConnect the socket to an address. int zmq_connect (void *s, const char *addr);
func (s *Socket) Events() (uint64, error)ZMQ_EVENTS: Retrieve socket event state.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc20
func (s *Socket) GetSockOptBool(option BoolSocketOption) (value bool, err error)func (s *Socket) GetSockOptInt(option IntSocketOption) (value int, err error)Get an int option from the socket. int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
func (s *Socket) GetSockOptInt64(option Int64SocketOption) (value int64, err error)Get an int64 option from the socket. int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
func (s *Socket) GetSockOptString(option StringSocketOption) (value string, err error)Get a string option from the socket. int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
func (s *Socket) GetSockOptUInt64(option UInt64SocketOption) (value uint64, err error)Get a uint64 option from the socket. int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
func (s *Socket) HWM() (uint64, error)ZMQ_HWM: Retrieve high water mark.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc5
func (s *Socket) Identity() (string, error)ZMQ_IDENTITY: Retrieve socket identity.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc8
func (s *Socket) Linger() (time.Duration, error)ZMQ_LINGER: Retrieve linger period for socket shutdown.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc15
func (s *Socket) McastLoop() (bool, error)ZMQ_MCAST_LOOP: Control multicast loop-back.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc12
func (s *Socket) Rate() (int64, error)ZMQ_RATE: Retrieve multicast data rate.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc9
func (s *Socket) RcvBuf() (uint64, error)ZMQ_RCVBUF: Retrieve kernel receive buffer size.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc14
func (s *Socket) RcvHWM() (int, error)ZMQ_RCVHWM: Retrieve high water mark for inbound messages.
See: http://api.zeromq.org/3.2:zmq-getsockopt#toc6
func (s *Socket) RcvMore() (bool, error)ZMQ_RCVMORE: More message parts to follow.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc4
func (s *Socket) RcvTimeout() (time.Duration, error)ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN.
See: http://api.zeromq.org/2.2:zmq-getsockopt#toc6
func (s *Socket) ReconnectIvl() (time.Duration, error)ZMQ_RECONNECT_IVL: Retrieve reconnection interval.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc16
func (s *Socket) ReconnectIvlMax() (time.Duration, error)ZMQ_RECONNECT_IVL_MAX: Retrieve maximum reconnection interval.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc17
func (s *Socket) RecoveryIvl() (time.Duration, error)ZMQ_RECOVERY_IVL_MSEC: Get multicast recovery interval in milliseconds.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc11
func (s *Socket) Recv(flags SendRecvOption) (data []byte, err error)Receive a message from the socket. int zmq_recv (void *s, zmq_msg_t *msg, int flags);
func (s *Socket) RecvMultipart(flags SendRecvOption) (parts [][]byte, err error)Receive a multipart message.
func (s *Socket) Send(data []byte, flags SendRecvOption) errorSend a message to the socket. int zmq_send (void *s, zmq_msg_t *msg, int flags);
func (s *Socket) SendMultipart(parts [][]byte, flags SendRecvOption) (err error)Send a multipart message.
func (s *Socket) SetAffinity(value uint64) errorZMQ_AFFINITY: Set I/O thread affinity.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc5
func (s *Socket) SetBacklog(value int) errorZMQ_BACKLOG: Set maximum length of the queue of outstanding connections.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc18
func (s *Socket) SetHWM(value uint64) errorZMQ_HWM: Set high water mark.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc3
func (s *Socket) SetIdentity(value string) errorZMQ_IDENTITY: Set socket identity.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc6
func (s *Socket) SetLinger(value time.Duration) errorZMQ_LINGER: Set linger period for socket shutdown.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc15
func (s *Socket) SetMcastLoop(value bool) errorZMQ_MCAST_LOOP: Control multicast loop-back.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc12
func (s *Socket) SetRate(value int64) errorZMQ_RATE: Set multicast data rate.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc9
func (s *Socket) SetRcvBuf(value uint64) errorZMQ_RCVBUF: Set kernel receive buffer size.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc14
func (s *Socket) SetRcvHWM(value int) errorZMQ_RCVHWM: Set high water mark for inbound messages.
See: http://api.zeromq.org/3.2:zmq-setsockopt#toc4
func (s *Socket) SetRcvTimeout(value time.Duration) errorZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN.
See: http://api.zeromq.org/2.2:zmq-setsockopt#toc9
func (s *Socket) SetReconnectIvl(value time.Duration) errorZMQ_RECONNECT_IVL: Set reconnection interval.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc16
func (s *Socket) SetReconnectIvlMax(value time.Duration) errorZMQ_RECONNECT_IVL_MAX: Set maximum reconnection interval.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc17
func (s *Socket) SetRecoveryIvl(value time.Duration) errorZMQ_RECOVERY_IVL_MSEC: Set multicast recovery interval in milliseconds.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc11
func (s *Socket) SetSndBuf(value uint64) errorZMQ_SNDBUF: Set kernel transmit buffer size.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc13
func (s *Socket) SetSndHWM(value int) errorZMQ_SNDHWM: Set high water mark for outbound messages.
See: http://api.zeromq.org/3.2:zmq-setsockopt#toc3
func (s *Socket) SetSndTimeout(value time.Duration) errorZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN.
See: http://api.zeromq.org/2.2:zmq-setsockopt#toc10
func (s *Socket) SetSockOptInt(option IntSocketOption, value int) errorSet an int option on the socket. int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
func (s *Socket) SetSockOptInt64(option Int64SocketOption, value int64) errorSet an int64 option on the socket. int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
func (s *Socket) SetSockOptString(option StringSocketOption, value string) errorSet a string option on the socket. int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
func (s *Socket) SetSockOptStringNil(option StringSocketOption) errorSet a string option on the socket to nil. int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
func (s *Socket) SetSockOptUInt64(option UInt64SocketOption, value uint64) errorSet a uint64 option on the socket. int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
func (s *Socket) SetSubscribe(value string) errorZMQ_SUBSCRIBE: Establish message filter.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc7
func (s *Socket) SetSwap(value int64) errorZMQ_SWAP: Set disk offload size.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc4
func (s *Socket) SetTCPKeepalive(value int) errorZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option.
See: http://api.zeromq.org/3.2:zmq-setsockopt#toc25
func (s *Socket) SetTCPKeepaliveCnt(value int) errorZMQ_TCP_KEEPALIVE_CNT: Override TCP_KEEPCNT socket option.
See: http://api.zeromq.org/3.2:zmq-setsockopt#toc27
func (s *Socket) SetTCPKeepaliveIdle(value int) errorZMQ_TCP_KEEPALIVE_IDLE: Override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS).
See: http://api.zeromq.org/3.2:zmq-setsockopt#toc26
func (s *Socket) SetTCPKeepaliveIntvl(value int) errorZMQ_TCP_KEEPALIVE_INTVL: Override TCP_KEEPINTVL socket option.
See: http://api.zeromq.org/3.2:zmq-setsockopt#toc28
func (s *Socket) SetUnsubscribe(value string) errorZMQ_UNSUBSCRIBE: Remove message filter.
See: http://api.zeromq.org/2.1:zmq-setsockopt#toc8
func (s *Socket) SndBuf() (uint64, error)ZMQ_SNDBUF: Retrieve kernel transmit buffer size.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc13
func (s *Socket) SndHWM() (int, error)ZMQ_SNDHWM: Retrieves high water mark for outbound messages.
See: http://api.zeromq.org/3.2:zmq-getsockopt#toc5
func (s *Socket) SndTimeout() (time.Duration, error)ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN.
See: http://api.zeromq.org/2.2:zmq-getsockopt#toc7
func (s *Socket) Swap() (int64, error)ZMQ_SWAP: Retrieve disk offload size.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc6
func (s *Socket) TCPKeepalive() (int, error)ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option.
See: http://api.zeromq.org/3.2:zmq-getsockopt#toc26
func (s *Socket) TCPKeepaliveCnt() (int, error)ZMQ_TCP_KEEPALIVE_CNT: Override TCP_KEEPCNT socket option.
See: http://api.zeromq.org/3.2:zmq-getsockopt#toc28
func (s *Socket) TCPKeepaliveIdle() (int, error)ZMQ_TCP_KEEPALIVE_IDLE: Override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS).
See: http://api.zeromq.org/3.2:zmq-getsockopt#toc27
func (s *Socket) TCPKeepaliveIntvl() (int, error)ZMQ_TCP_KEEPALIVE_INTVL: Override TCP_KEEPINTVL socket option.
See: http://api.zeromq.org/3.2:zmq-getsockopt#toc29
func (s *Socket) Type() (SocketType, error)ZMQ_TYPE: Retrieve socket type.
See: http://api.zeromq.org/2.1:zmq-getsockopt#toc3
type SocketType inttype StringSocketOption inttype UInt64SocketOption inttype ZmqOsSocketType C.SOCKETfunc (self ZmqOsSocketType) ToRaw() C.SOCKET(generated from .godocdown.md with godocdown github.com/alecthomas/gozmq > README.md)
