1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-12-24 00:47:17 +01:00

Update ZMQ to current git.

This commit is contained in:
Sandu Liviu Catalin
2021-08-22 20:09:16 +03:00
parent b78b3e8ede
commit fa4644d00f
72 changed files with 1309 additions and 884 deletions

View File

@@ -38,6 +38,7 @@ zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
{
options.type = ZMQ_CLIENT;
options.can_send_hello_msg = true;
options.can_recv_hiccup_msg = true;
}
zmq::client_t::~client_t ()

View File

@@ -126,9 +126,11 @@ static f_compatible_get_tick_count64 my_get_tick_count64 =
init_compatible_get_tick_count64 ();
#endif
#ifndef ZMQ_HAVE_WINDOWS
const uint64_t usecs_per_msec = 1000;
const uint64_t usecs_per_sec = 1000000;
const uint64_t nsecs_per_usec = 1000;
#endif
const uint64_t usecs_per_sec = 1000000;
zmq::clock_t::clock_t () :
_last_tsc (rdtsc ()),
@@ -193,6 +195,7 @@ uint64_t zmq::clock_t::now_us ()
#else
LIBZMQ_UNUSED (nsecs_per_usec);
// Use POSIX gettimeofday function to get precise time.
struct timeval tv;
int rc = gettimeofday (&tv, NULL);

View File

@@ -33,6 +33,7 @@
#include <string>
#include "stdint.hpp"
#include "endpoint.hpp"
#include "platform.hpp"
namespace zmq
{
@@ -44,12 +45,7 @@ class socket_base_t;
// This structure defines the commands that can be sent between threads.
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4324) // C4324: alignment padding warnings
__declspec(align (64))
#endif
struct command_t
struct command_t
{
// Object to process the command.
zmq::object_t *destination;
@@ -216,9 +212,12 @@ __declspec(align (64))
} args;
#ifdef _MSC_VER
};
#pragma warning(pop)
#else
} __attribute__ ((aligned (64)));
}
#ifdef HAVE_POSIX_MEMALIGN
__attribute__ ((aligned (ZMQ_CACHELINE_SIZE)))
#endif
;
#endif
}

View File

@@ -180,6 +180,12 @@ struct curve_client_tools_t
// Create Box [C + vouch + metadata](C'->S')
std::fill (initiate_plaintext.begin (),
initiate_plaintext.begin () + crypto_box_ZEROBYTES, 0);
// False positives due to https://gcc.gnu.org/bugzilla/show_bug.cgi?id=99578
#if __GNUC__ >= 11
#pragma GCC diagnostic ignored "-Warray-bounds"
#pragma GCC diagnostic ignored "-Wstringop-overflow="
#endif
memcpy (&initiate_plaintext[crypto_box_ZEROBYTES], public_key_, 32);
memcpy (&initiate_plaintext[crypto_box_ZEROBYTES + 32], vouch_nonce + 8,
16);
@@ -189,6 +195,10 @@ struct curve_client_tools_t
memcpy (&initiate_plaintext[crypto_box_ZEROBYTES + 48 + 80],
metadata_plaintext_, metadata_length_);
}
#if __GNUC__ >= 11
#pragma GCC diagnostic pop
#pragma GCC diagnostic pop
#endif
memcpy (initiate_nonce, "CurveZMQINITIATE", 16);
put_uint64 (initiate_nonce + 16, cn_nonce_);

View File

@@ -39,6 +39,7 @@ zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
{
options.type = ZMQ_DEALER;
options.can_send_hello_msg = true;
options.can_recv_hiccup_msg = true;
}
zmq::dealer_t::~dealer_t ()

View File

@@ -39,7 +39,6 @@
zmq::dgram_t::dgram_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
_pipe (NULL),
_last_in (NULL),
_more_out (false)
{
options.type = ZMQ_DGRAM;
@@ -71,9 +70,6 @@ void zmq::dgram_t::xattach_pipe (pipe_t *pipe_,
void zmq::dgram_t::xpipe_terminated (pipe_t *pipe_)
{
if (pipe_ == _pipe) {
if (_last_in == _pipe) {
_last_in = NULL;
}
_pipe = NULL;
}
}
@@ -147,7 +143,6 @@ int zmq::dgram_t::xrecv (msg_t *msg_)
errno = EAGAIN;
return -1;
}
_last_in = _pipe;
return 0;
}

View File

@@ -62,8 +62,6 @@ class dgram_t ZMQ_FINAL : public socket_base_t
private:
zmq::pipe_t *_pipe;
zmq::pipe_t *_last_in;
// If true, more outgoing message parts are expected.
bool _more_out;

View File

@@ -64,6 +64,18 @@ void zmq::dist_t::attach (pipe_t *pipe_)
}
}
bool zmq::dist_t::has_pipe (pipe_t *pipe_)
{
std::size_t claimed_index = _pipes.index (pipe_);
// If pipe claims to be outside the available index space it can't be in the distributor.
if (claimed_index >= _pipes.size ()) {
return false;
}
return _pipes[claimed_index] == pipe_;
}
void zmq::dist_t::match (pipe_t *pipe_)
{
// If pipe is already matching do nothing.

View File

@@ -51,6 +51,9 @@ class dist_t
// Adds the pipe to the distributor object.
void attach (zmq::pipe_t *pipe_);
// Checks if this pipe is present in the distributor.
bool has_pipe (zmq::pipe_t *pipe_);
// Activates pipe that have previously reached high watermark.
void activated (zmq::pipe_t *pipe_);

View File

@@ -131,7 +131,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)
{
check_thread ();
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
pe->ev.events &= ~(static_cast<short> (EPOLLIN));
pe->ev.events &= ~(static_cast<uint32_t> (EPOLLIN));
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
@@ -149,7 +149,7 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
{
check_thread ();
poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
pe->ev.events &= ~(static_cast<short> (EPOLLOUT));
pe->ev.events &= ~(static_cast<uint32_t> (EPOLLOUT));
const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}

View File

@@ -33,7 +33,7 @@
#include "err.hpp"
#include "msg.hpp"
zmq::fq_t::fq_t () : _active (0), _last_in (NULL), _current (0), _more (false)
zmq::fq_t::fq_t () : _active (0), _current (0), _more (false)
{
}
@@ -62,10 +62,6 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
_current = 0;
}
_pipes.erase (pipe_);
if (_last_in == pipe_) {
_last_in = NULL;
}
}
void zmq::fq_t::activated (pipe_t *pipe_)
@@ -100,7 +96,6 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
*pipe_ = _pipes[_current];
_more = (msg_->flags () & msg_t::more) != 0;
if (!_more) {
_last_in = _pipes[_current];
_current = (_current + 1) % _active;
}
return 0;

View File

@@ -65,11 +65,6 @@ class fq_t
// beginning of the pipes array.
pipes_t::size_type _active;
// Pointer to the last pipe we received message from.
// NULL when no message has been received or the pipe
// has terminated.
pipe_t *_last_in;
// Index of the next bound pipe to read a message from.
pipes_t::size_type _current;

32
vendor/ZMQ/src/ip.cpp vendored
View File

@@ -868,24 +868,48 @@ void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_)
}
#ifdef ZMQ_HAVE_IPC
#if defined ZMQ_HAVE_WINDOWS
char *widechar_to_utf8 (const wchar_t *widestring)
{
int nch, n;
char *utf8 = 0;
nch = WideCharToMultiByte (CP_UTF8, 0, widestring, -1, 0, 0, NULL, NULL);
if (nch > 0) {
utf8 = (char *) malloc ((nch + 1) * sizeof (char));
n = WideCharToMultiByte (CP_UTF8, 0, widestring, -1, utf8, nch, NULL,
NULL);
utf8[nch] = 0;
}
return utf8;
}
#endif
int zmq::create_ipc_wildcard_address (std::string &path_, std::string &file_)
{
#if defined ZMQ_HAVE_WINDOWS
char buffer[MAX_PATH];
wchar_t buffer[MAX_PATH];
{
const errno_t rc = tmpnam_s (buffer);
const errno_t rc = _wtmpnam_s (buffer);
errno_assert (rc == 0);
}
// TODO or use CreateDirectoryA and specify permissions?
const int rc = _mkdir (buffer);
const int rc = _wmkdir (buffer);
if (rc != 0) {
return -1;
}
path_.assign (buffer);
char *tmp = widechar_to_utf8 (buffer);
if (tmp == 0) {
return -1;
}
path_.assign (tmp);
file_ = path_ + "/socket";
free (tmp);
#else
std::string tmp_path;

View File

@@ -35,7 +35,7 @@
// Mutex class encapsulates OS mutex in a platform-independent way.
#ifdef ZMQ_HAVE_WINDOWS
#if defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_USE_CV_IMPL_PTHREADS)
#include "windows.hpp"

View File

@@ -254,7 +254,10 @@ zmq::options_t::options_t () :
hello_msg (),
can_send_hello_msg (false),
disconnect_msg (),
can_recv_disconnect_msg (false)
can_recv_disconnect_msg (false),
hiccup_msg (),
can_recv_hiccup_msg (false),
busy_poll (0)
{
memset (curve_public_key, 0, CURVE_KEYSIZE);
memset (curve_secret_key, 0, CURVE_KEYSIZE);
@@ -802,6 +805,12 @@ int zmq::options_t::setsockopt (int option_,
}
break;
case ZMQ_BUSY_POLL:
if (is_int) {
busy_poll = value;
return 0;
}
break;
#ifdef ZMQ_HAVE_WSS
case ZMQ_WSS_KEY_PEM:
// TODO: check if valid certificate
@@ -852,6 +861,18 @@ int zmq::options_t::setsockopt (int option_,
}
break;
case ZMQ_HICCUP_MSG:
if (optvallen_ > 0) {
unsigned char *bytes = (unsigned char *) optval_;
hiccup_msg =
std::vector<unsigned char> (bytes, bytes + optvallen_);
} else {
hiccup_msg = std::vector<unsigned char> ();
}
return 0;
#endif
default:
@@ -1285,6 +1306,12 @@ int zmq::options_t::getsockopt (int option_,
return 0;
}
break;
case ZMQ_BUSY_POLL:
if (is_int) {
*value = busy_poll;
}
break;
#endif

View File

@@ -308,6 +308,13 @@ struct options_t
// Disconnect msg
std::vector<unsigned char> disconnect_msg;
bool can_recv_disconnect_msg;
// Hiccup msg
std::vector<unsigned char> hiccup_msg;
bool can_recv_hiccup_msg;
// This option removes several delays caused by scheduling, interrupts and context switching.
int busy_poll;
};
inline bool get_effective_conflate_option (const options_t &options)

View File

@@ -36,8 +36,7 @@
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
_pipe (NULL),
_last_in (NULL)
_pipe (NULL)
{
options.type = ZMQ_PAIR;
}
@@ -67,9 +66,6 @@ void zmq::pair_t::xattach_pipe (pipe_t *pipe_,
void zmq::pair_t::xpipe_terminated (pipe_t *pipe_)
{
if (pipe_ == _pipe) {
if (_last_in == _pipe) {
_last_in = NULL;
}
_pipe = NULL;
}
}
@@ -117,7 +113,6 @@ int zmq::pair_t::xrecv (msg_t *msg_)
errno = EAGAIN;
return -1;
}
_last_in = _pipe;
return 0;
}

View File

@@ -62,8 +62,6 @@ class pair_t ZMQ_FINAL : public socket_base_t
private:
zmq::pipe_t *_pipe;
zmq::pipe_t *_last_in;
ZMQ_NON_COPYABLE_NOR_MOVABLE (pair_t)
};
}

View File

@@ -42,6 +42,7 @@ zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
options.type = ZMQ_PEER;
options.can_send_hello_msg = true;
options.can_recv_disconnect_msg = true;
options.can_recv_hiccup_msg = true;
}
uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_)

View File

@@ -597,7 +597,7 @@ void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
void zmq::pipe_t::send_disconnect_msg ()
{
if (_disconnect_msg.size () > 0) {
if (_disconnect_msg.size () > 0 && _out_pipe) {
// Rollback any incomplete message in the pipe, and push the disconnect message.
rollback ();
@@ -615,3 +615,15 @@ void zmq::pipe_t::set_disconnect_msg (
_disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
errno_assert (rc == 0);
}
void zmq::pipe_t::send_hiccup_msg (const std::vector<unsigned char> &hiccup_)
{
if (!hiccup_.empty () && _out_pipe) {
msg_t msg;
const int rc = msg.init_buffer (&hiccup_[0], hiccup_.size ());
errno_assert (rc == 0);
_out_pipe->write (msg, false);
flush ();
}
}

View File

@@ -150,6 +150,8 @@ class pipe_t ZMQ_FINAL : public object_t,
void send_disconnect_msg ();
void set_disconnect_msg (const std::vector<unsigned char> &disconnect_);
void send_hiccup_msg (const std::vector<unsigned char> &hiccup_);
private:
// Type of the underlying lock-free pipe.
typedef ypipe_base_t<msg_t> upipe_t;

View File

@@ -46,7 +46,7 @@ template <typename T, size_t S> class fast_vector_t
explicit fast_vector_t (const size_t nitems_)
{
if (nitems_ > S) {
_buf = static_cast<T *> (malloc (nitems_ * sizeof (T)));
_buf = new (std::nothrow) T[nitems_];
// TODO since this function is called by a client, we could return errno == ENOMEM here
alloc_assert (_buf);
} else {
@@ -59,7 +59,7 @@ template <typename T, size_t S> class fast_vector_t
~fast_vector_t ()
{
if (_buf != _static_buf)
free (_buf);
delete[] _buf;
}
private:
@@ -109,9 +109,9 @@ timeout_t
compute_timeout (bool first_pass_, long timeout_, uint64_t now_, uint64_t end_);
#elif defined ZMQ_POLL_BASED_ON_SELECT
#if defined ZMQ_HAVE_WINDOWS
inline size_t valid_pollset_bytes (const fd_set &pollset_)
{
#if defined ZMQ_HAVE_WINDOWS
// On Windows we don't need to copy the whole fd_set.
// SOCKETS are continuous from the beginning of fd_array in fd_set.
// We just need to copy fd_count elements of fd_array.
@@ -119,10 +119,14 @@ inline size_t valid_pollset_bytes (const fd_set &pollset_)
return reinterpret_cast<const char *> (
&pollset_.fd_array[pollset_.fd_count])
- reinterpret_cast<const char *> (&pollset_);
#else
return sizeof (fd_set);
#endif
}
#else
inline size_t valid_pollset_bytes (const fd_set & /*pollset_*/)
{
return sizeof (fd_set);
}
#endif
#if defined ZMQ_HAVE_WINDOWS
// struct fd_set {

View File

@@ -558,7 +558,7 @@ visit_keys (node_t node_,
for (size_t i = 0, edgecount = node_.edgecount (); i < edgecount; ++i) {
visit_keys (node_.node_at (i), buffer_, func_, arg_);
}
buffer_.resize (buffer_.size () - prefix_length);
buffer_.resize (static_cast<uint32_t> (buffer_.size () - prefix_length));
}
void zmq::radix_tree_t::apply (

View File

@@ -151,8 +151,13 @@ static void manage_random (bool init_)
if (init_) {
int rc = sodium_init ();
zmq_assert (rc != -1);
#if defined(ZMQ_LIBSODIUM_RANDOMBYTES_CLOSE)
} else {
// randombytes_close either a no-op or not threadsafe
// doing this without refcounting can cause crashes
// if called while a context is active
randombytes_close ();
#endif
}
#else
LIBZMQ_UNUSED (init_);

View File

@@ -460,14 +460,18 @@ void zmq::session_base_t::engine_error (bool handshaked_,
if (_pipe) {
clean_pipes ();
#ifdef ZMQ_BUILD_DRAFT_API
// Only send disconnect message if socket was accepted and handshake was completed
if (!_active && handshaked_ && options.can_recv_disconnect_msg
&& !options.disconnect_msg.empty ()) {
_pipe->set_disconnect_msg (options.disconnect_msg);
_pipe->send_disconnect_msg ();
}
#endif
// Only send hiccup message if socket was connected and handshake was completed
if (_active && handshaked_ && options.can_recv_hiccup_msg
&& !options.hiccup_msg.empty ()) {
_pipe->send_hiccup_msg (options.hiccup_msg);
}
}
zmq_assert (reason_ == i_engine::connection_error

View File

@@ -341,6 +341,21 @@ void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
#endif
}
void zmq::tune_tcp_busy_poll (fd_t socket_, int busy_poll_)
{
#if defined(ZMQ_HAVE_BUSY_POLL)
if (busy_poll_ > 0) {
const int rc =
setsockopt (socket_, SOL_SOCKET, SO_BUSY_POLL,
reinterpret_cast<char *> (&busy_poll_), sizeof (int));
assert_success_or_recoverable (socket_, rc);
}
#else
LIBZMQ_UNUSED (socket_);
LIBZMQ_UNUSED (busy_poll_);
#endif
}
zmq::fd_t zmq::tcp_open_socket (const char *address_,
const zmq::options_t &options_,
bool local_,
@@ -398,6 +413,9 @@ zmq::fd_t zmq::tcp_open_socket (const char *address_,
if (options_.rcvbuf >= 0)
set_tcp_receive_buffer (s, options_.rcvbuf);
// This option removes several delays caused by scheduling, interrupts and context switching.
if (options_.busy_poll)
tune_tcp_busy_poll (s, options_.busy_poll);
return s;
setsockopt_error:

View File

@@ -68,6 +68,8 @@ int tcp_read (fd_t s_, void *data_, size_t size_);
void tcp_tune_loopback_fast_path (fd_t socket_);
void tune_tcp_busy_poll (fd_t socket_, int busy_poll_);
// Resolves the given address_ string, opens a socket and sets socket options
// according to the passed options_. On success, returns the socket
// descriptor and assigns the resolved address to out_tcp_addr_. In case of

View File

@@ -28,7 +28,9 @@
*/
#include "precompiled.hpp"
#include "ip.hpp"
#include "vmci.hpp"
#include "vmci_address.hpp"
#if defined ZMQ_HAVE_VMCI
@@ -97,4 +99,23 @@ void zmq::tune_vmci_connect_timeout (ctx_t *context_,
#endif
}
zmq::fd_t zmq::vmci_open_socket (const char *address_,
const zmq::options_t &options_,
zmq::vmci_address_t *out_vmci_addr_)
{
// Convert the textual address into address structure.
int rc = out_vmci_addr_->resolve (address_);
if (rc != 0)
return retired_fd;
// Create the socket.
fd_t s = open_socket (out_vmci_addr_->family (), SOCK_STREAM, 0);
if (s == retired_fd) {
return retired_fd;
}
return s;
}
#endif

View File

@@ -59,6 +59,10 @@ void tune_vmci_connect_timeout (ctx_t *context_,
fd_t sockfd_,
struct timeval timeout_);
#endif
fd_t vmci_open_socket (const char *address_,
const options_t &options_,
vmci_address_t *out_vmci_addr_);
}
#endif

View File

@@ -39,6 +39,11 @@
#include "err.hpp"
zmq::vmci_address_t::vmci_address_t ()
{
memset (&address, 0, sizeof address);
}
zmq::vmci_address_t::vmci_address_t (ctx_t *parent_) : parent (parent_)
{
memset (&address, 0, sizeof address);
@@ -56,10 +61,6 @@ zmq::vmci_address_t::vmci_address_t (const sockaddr *sa,
memcpy (&address, sa, sa_len);
}
zmq::vmci_address_t::~vmci_address_t ()
{
}
int zmq::vmci_address_t::resolve (const char *path_)
{
// Find the ':' at end that separates address from the port number.
@@ -125,7 +126,7 @@ int zmq::vmci_address_t::resolve (const char *path_)
return 0;
}
int zmq::vmci_address_t::to_string (std::string &addr_)
int zmq::vmci_address_t::to_string (std::string &addr_) const
{
if (address.svm_family != parent->get_vmci_socket_family ()) {
addr_.clear ();
@@ -164,4 +165,13 @@ socklen_t zmq::vmci_address_t::addrlen () const
return static_cast<socklen_t> (sizeof address);
}
#if defined ZMQ_HAVE_WINDOWS
unsigned short zmq::vmci_address_t::family () const
#else
sa_family_t zmq::vmci_address_t::family () const
#endif
{
return parent->get_vmci_socket_family ();
}
#endif

View File

@@ -43,16 +43,21 @@ namespace zmq
class vmci_address_t
{
public:
vmci_address_t ();
vmci_address_t (ctx_t *parent_);
vmci_address_t (const sockaddr *sa, socklen_t sa_len, ctx_t *parent_);
~vmci_address_t ();
// This function sets up the address for VMCI transport.
int resolve (const char *path_);
// The opposite to resolve()
int to_string (std::string &addr_);
int to_string (std::string &addr_) const;
#if defined ZMQ_HAVE_WINDOWS
unsigned short family () const;
#else
sa_family_t family () const;
#endif
const sockaddr *addr () const;
socklen_t addrlen () const;
@@ -60,8 +65,6 @@ class vmci_address_t
struct sockaddr_vm address;
ctx_t *parent;
vmci_address_t ();
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_address_t)
};
}

View File

@@ -35,69 +35,41 @@
#include <new>
#include "stream_engine.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "random.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "address.hpp"
#include "session_base.hpp"
#include "vmci_address.hpp"
#include "vmci.hpp"
#include "session_base.hpp"
zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_,
const options_t &options_,
const address_t *addr_,
address_t *addr_,
bool delayed_start_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
addr (addr_),
s (retired_fd),
handle_valid (false),
delayed_start (delayed_start_),
timer_started (false),
session (session_),
current_reconnect_ivl (options.reconnect_ivl)
stream_connecter_base_t (
io_thread_, session_, options_, addr_, delayed_start_),
_connect_timer_started (false)
{
zmq_assert (addr);
zmq_assert (addr->protocol == "vmci");
addr->to_string (endpoint);
socket = session->get_socket ();
zmq_assert (_addr->protocol == protocol_name::vmci);
}
zmq::vmci_connecter_t::~vmci_connecter_t ()
{
zmq_assert (!timer_started);
zmq_assert (!handle_valid);
zmq_assert (s == retired_fd);
}
void zmq::vmci_connecter_t::process_plug ()
{
if (delayed_start)
add_reconnect_timer ();
else
start_connecting ();
zmq_assert (!_connect_timer_started);
}
void zmq::vmci_connecter_t::process_term (int linger_)
{
if (timer_started) {
cancel_timer (reconnect_timer_id);
timer_started = false;
if (_connect_timer_started) {
cancel_timer (connect_timer_id);
_connect_timer_started = false;
}
if (handle_valid) {
rm_fd (handle);
handle_valid = false;
}
if (s != retired_fd)
close ();
own_t::process_term (linger_);
stream_connecter_base_t::process_term (linger_);
}
void zmq::vmci_connecter_t::in_event ()
@@ -110,9 +82,26 @@ void zmq::vmci_connecter_t::in_event ()
void zmq::vmci_connecter_t::out_event ()
{
fd_t fd = connect ();
rm_fd (handle);
handle_valid = false;
if (_connect_timer_started) {
cancel_timer (connect_timer_id);
_connect_timer_started = false;
}
// TODO this is still very similar to (t)ipc_connecter_t, maybe the
// differences can be factored out
rm_handle ();
const fd_t fd = connect ();
if (fd == retired_fd
&& ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
&& errno == ECONNREFUSED)) {
send_conn_failed (_session);
close ();
terminate ();
return;
}
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
@@ -135,148 +124,154 @@ void zmq::vmci_connecter_t::out_event ()
#endif
}
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (
fd, options, make_unconnected_bind_endpoint_pair (endpoint));
alloc_assert (engine);
create_engine (
fd, zmq::vmci_connecter_t::get_socket_name (fd, socket_end_local));
}
// Attach the engine to the corresponding session object.
send_attach (session, engine);
std::string
zmq::vmci_connecter_t::get_socket_name (zmq::fd_t fd_,
socket_end_t socket_end_) const
{
struct sockaddr_storage ss;
const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss);
if (sl == 0) {
return std::string ();
}
// Shut the connecter down.
terminate ();
socket->event_connected (make_unconnected_bind_endpoint_pair (endpoint),
fd);
const vmci_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl,
this->get_ctx ());
std::string address_string;
addr.to_string (address_string);
return address_string;
}
void zmq::vmci_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id);
timer_started = false;
start_connecting ();
if (id_ == connect_timer_id) {
_connect_timer_started = false;
rm_handle ();
close ();
add_reconnect_timer ();
} else
stream_connecter_base_t::timer_event (id_);
}
void zmq::vmci_connecter_t::start_connecting ()
{
// Open the connecting socket.
int rc = open ();
const int rc = open ();
// Connect may succeed in synchronous manner.
if (rc == 0) {
handle = add_fd (s);
handle_valid = true;
_handle = add_fd (_s);
out_event ();
}
// Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
// add userspace connect timeout
add_connect_timer ();
}
// Handle any other error condition by eventual reconnect.
else {
if (s != retired_fd)
if (_s != retired_fd)
close ();
add_reconnect_timer ();
}
}
void zmq::vmci_connecter_t::add_reconnect_timer ()
void zmq::vmci_connecter_t::add_connect_timer ()
{
if (options.reconnect_ivl > 0) {
int rc_ivl = get_new_reconnect_ivl ();
add_timer (rc_ivl, reconnect_timer_id);
socket->event_connect_retried (
make_unconnected_bind_endpoint_pair (endpoint), rc_ivl);
timer_started = true;
if (options.connect_timeout > 0) {
add_timer (options.connect_timeout, connect_timer_id);
_connect_timer_started = true;
}
}
int zmq::vmci_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
int this_interval =
current_reconnect_ivl + (generate_random () % options.reconnect_ivl);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval
current_reconnect_ivl = current_reconnect_ivl * 2;
if (current_reconnect_ivl >= options.reconnect_ivl_max) {
current_reconnect_ivl = options.reconnect_ivl_max;
}
}
return this_interval;
}
int zmq::vmci_connecter_t::open ()
{
zmq_assert (s == retired_fd);
zmq_assert (_s == retired_fd);
int family = this->get_ctx ()->get_vmci_socket_family ();
if (family == -1)
return -1;
// Resolve the address
if (_addr->resolved.vmci_addr != NULL) {
LIBZMQ_DELETE (_addr->resolved.vmci_addr);
}
// Create the socket.
s = open_socket (family, SOCK_STREAM, 0);
#ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) {
errno = wsa_error_to_errno (WSAGetLastError ());
_addr->resolved.vmci_addr =
new (std::nothrow) vmci_address_t (this->get_ctx ());
alloc_assert (_addr->resolved.vmci_addr);
_s = vmci_open_socket (_addr->address.c_str (), options,
_addr->resolved.vmci_addr);
if (_s == retired_fd) {
// TODO we should emit some event in this case!
LIBZMQ_DELETE (_addr->resolved.vmci_addr);
return -1;
}
#else
if (s == -1)
return -1;
#endif
zmq_assert (_addr->resolved.vmci_addr != NULL);
// Set the socket to non-blocking mode so that we get async connect().
unblock_socket (_s);
const vmci_address_t *const vmci_addr = _addr->resolved.vmci_addr;
int rc;
// Connect to the remote peer.
int rc = ::connect (s, addr->resolved.vmci_addr->addr (),
addr->resolved.vmci_addr->addrlen ());
// Connect was successful immediately.
if (rc == 0)
return 0;
// Forward the error.
return -1;
}
void zmq::vmci_connecter_t::close ()
{
zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
#if defined ZMQ_HAVE_VXWORKS
rc = ::connect (_s, (sockaddr *) vmci_addr->addr (), vmci_addr->addrlen ());
#else
const int rc = ::close (s);
errno_assert (rc == 0);
rc = ::connect (_s, vmci_addr->addr (), vmci_addr->addrlen ());
#endif
socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s);
s = retired_fd;
// Connect was successful immediately.
if (rc == 0) {
return 0;
}
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS
const int last_error = WSAGetLastError ();
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
errno = EINPROGRESS;
else
errno = wsa_error_to_errno (last_error);
#else
if (errno == EINTR)
errno = EINPROGRESS;
#endif
return -1;
}
zmq::fd_t zmq::vmci_connecter_t::connect ()
{
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
// Async connect has finished. Check whether an error occurred
int err = 0;
#if defined ZMQ_HAVE_HPUX
int len = sizeof (err);
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof (err);
socklen_t len = sizeof err;
#endif
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0);
if (err != 0) {
if (err != WSAECONNREFUSED && err != WSAETIMEDOUT
&& err != WSAECONNABORTED && err != WSAEHOSTUNREACH
&& err != WSAENETUNREACH && err != WSAENETDOWN && err != WSAEACCES
&& err != WSAEINVAL && err != WSAEADDRINUSE
&& err != WSAECONNRESET) {
if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
|| err == WSAENOBUFS) {
wsa_assert_no (err);
}
errno = wsa_error_to_errno (err);
return retired_fd;
}
#else
@@ -286,16 +281,20 @@ zmq::fd_t zmq::vmci_connecter_t::connect ()
err = errno;
if (err != 0) {
errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
|| errno == ETIMEDOUT || errno == EHOSTUNREACH
|| errno == ENETUNREACH || errno == ENETDOWN
|| errno == EINVAL);
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
errno_assert (errno != EBADF && errno != ENOPROTOOPT
&& errno != ENOTSOCK && errno != ENOBUFS);
#else
errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
&& errno != ENOBUFS);
#endif
return retired_fd;
}
#endif
fd_t result = s;
s = retired_fd;
// Return the newly connected socket.
const fd_t result = _s;
_s = retired_fd;
return result;
}

View File

@@ -38,6 +38,7 @@
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "stream_connecter_base.hpp"
namespace zmq
{
@@ -45,8 +46,7 @@ class io_thread_t;
class session_base_t;
struct address_t;
// TODO consider refactoring this to derive from stream_connecter_base_t
class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
class vmci_connecter_t ZMQ_FINAL : public stream_connecter_base_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
@@ -54,19 +54,21 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
vmci_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_,
const options_t &options_,
const address_t *addr_,
address_t *addr_,
bool delayed_start_);
~vmci_connecter_t ();
protected:
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
private:
// ID of the timer used to delay the reconnection.
// ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id.
enum
{
reconnect_timer_id = 1
connect_timer_id = 2
};
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
@@ -77,8 +79,8 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
// Internal function to start the actual connection establishment.
void start_connecting ();
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Internal function to add a connect timer
void add_connect_timer ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
@@ -90,43 +92,12 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
// EAGAIN errno if async connect was launched.
int open ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful.
fd_t connect ();
// Address to connect to. Owned by session_base_t.
const address_t *addr;
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
// Socket
zmq::socket_base_t *socket;
bool _connect_timer_started;
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_connecter_t)
};

View File

@@ -35,7 +35,7 @@
#include <new>
#include "stream_engine.hpp"
//#include "stream_engine.hpp"
#include "vmci_address.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
@@ -55,40 +55,18 @@
zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_,
const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
s (retired_fd),
socket (socket_)
stream_listener_base_t (io_thread_, socket_, options_)
{
}
zmq::vmci_listener_t::~vmci_listener_t ()
{
zmq_assert (s == retired_fd);
}
void zmq::vmci_listener_t::process_plug ()
{
// Start polling for incoming connections.
handle = add_fd (s);
set_pollin (handle);
}
void zmq::vmci_listener_t::process_term (int linger_)
{
rm_fd (handle);
close ();
own_t::process_term (linger_);
}
void zmq::vmci_listener_t::in_event ()
{
fd_t fd = accept ();
// If connection was reset by the peer in the meantime, just ignore it.
if (fd == retired_fd) {
socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (endpoint), zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
@@ -107,41 +85,24 @@ void zmq::vmci_listener_t::in_event ()
}
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (
fd, options, make_unconnected_bind_endpoint_pair (endpoint));
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
socket->event_accepted (make_unconnected_bind_endpoint_pair (endpoint), fd);
create_engine (fd);
}
int zmq::vmci_listener_t::get_local_address (std::string &addr_)
std::string
zmq::vmci_listener_t::get_socket_name (zmq::fd_t fd_,
socket_end_t socket_end_) const
{
struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_HPUX
int sl = sizeof (ss);
#else
socklen_t sl = sizeof (ss);
#endif
int rc = getsockname (s, (sockaddr *) &ss, &sl);
if (rc != 0) {
addr_.clear ();
return rc;
const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss);
if (sl == 0) {
return std::string ();
}
vmci_address_t addr ((struct sockaddr *) &ss, sl, this->get_ctx ());
return addr.to_string (addr_);
const vmci_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl,
this->get_ctx ());
std::string address_string;
addr.to_string (address_string);
return address_string;
}
int zmq::vmci_listener_t::set_local_address (const char *addr_)
@@ -156,7 +117,7 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
return -1;
// Create a listening socket.
s =
_s =
open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0);
#ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) {
@@ -165,18 +126,18 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
}
#if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
BOOL brc = SetHandleInformation ((HANDLE) _s, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
#endif
#else
if (s == -1)
if (_s == -1)
return -1;
#endif
address.to_string (endpoint);
address.to_string (_endpoint);
// Bind the socket.
rc = bind (s, address.addr (), address.addrlen ());
rc = bind (_s, address.addr (), address.addrlen ());
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno (WSAGetLastError ());
@@ -188,7 +149,7 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
#endif
// Listen for incoming connections.
rc = listen (s, options.backlog);
rc = listen (_s, options.backlog);
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno (WSAGetLastError ());
@@ -199,7 +160,8 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
goto error;
#endif
socket->event_listening (make_unconnected_bind_endpoint_pair (endpoint), s);
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0;
error:
@@ -209,27 +171,13 @@ error:
return -1;
}
void zmq::vmci_listener_t::close ()
{
zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = ::close (s);
errno_assert (rc == 0);
#endif
socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s);
s = retired_fd;
}
zmq::fd_t zmq::vmci_listener_t::accept ()
{
// Accept one connection and deal with different failure modes.
// The situation where connection cannot be accepted due to insufficient
// resources is considered valid and treated by ignoring the connection.
zmq_assert (s != retired_fd);
fd_t sock = ::accept (s, NULL, NULL);
zmq_assert (_s != retired_fd);
fd_t sock = ::accept (_s, NULL, NULL);
#ifdef ZMQ_HAVE_WINDOWS
if (sock == INVALID_SOCKET) {

View File

@@ -37,57 +37,37 @@
#include <string>
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "vmci_address.hpp"
#include "stream_listener_base.hpp"
namespace zmq
{
class io_thread_t;
class socket_base_t;
// TODO consider refactoring this to derive from stream_listener_base_t
class vmci_listener_t ZMQ_FINAL : public own_t, public io_object_t
class vmci_listener_t ZMQ_FINAL : public stream_listener_base_t
{
public:
vmci_listener_t (zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_,
const options_t &options_);
~vmci_listener_t ();
// Set address to listen on.
int set_local_address (const char *addr_);
// Get the bound address for use with wildcards
int get_local_address (std::string &addr_);
protected:
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
private:
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
// Close the listening socket.
void close ();
// Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd
// if the connection was dropped while waiting in the listen backlog.
fd_t accept ();
// Underlying socket.
fd_t s;
int create_socket (const char *addr_);
// Handle corresponding to the listening socket.
handle_t handle;
// Socket the listerner belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
// Address to listen on.
vmci_address_t _address;
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_listener_t)
};

View File

@@ -453,20 +453,20 @@ bool zmq::ws_engine_t::server_handshake ()
if (strcasecmp ("upgrade", _header_name) == 0)
_header_upgrade_websocket =
strcasecmp ("websocket", _header_value) == 0;
else if (strcasecmp ("connection", _header_name) == 0){
char *element = strtok (_header_value, ",");
while (element != NULL){
else if (strcasecmp ("connection", _header_name) == 0) {
char *rest = NULL;
char *element = strtok_r (_header_value, ",", &rest);
while (element != NULL) {
while (*element == ' ')
element++;
if (strcasecmp ("upgrade", element) == 0){
if (strcasecmp ("upgrade", element) == 0) {
_header_connection_upgrade = true;
break;
}
element = strtok (NULL, ",");
element = strtok_r (NULL, ",", &rest);
}
}
else if (strcasecmp ("Sec-WebSocket-Key", _header_name)
== 0)
} else if (strcasecmp ("Sec-WebSocket-Key", _header_name)
== 0)
strcpy_s (_websocket_key, _header_value);
else if (strcasecmp ("Sec-WebSocket-Protocol", _header_name)
== 0) {
@@ -474,7 +474,7 @@ bool zmq::ws_engine_t::server_handshake ()
// Sec-WebSocket-Protocol can appear multiple times or be a comma separated list
// if _websocket_protocol is already set we skip the check
if (_websocket_protocol[0] == '\0') {
char *rest = 0;
char *rest = NULL;
char *p = strtok_r (_header_value, ",", &rest);
while (p != NULL) {
if (*p == ' ')

View File

@@ -272,6 +272,12 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
// care of by the manual call above. subscriptions is the real mtrie,
// so the pipe must be removed from there or it will be left over.
_subscriptions.rm (pipe_, stub, static_cast<void *> (NULL), false);
// In case the pipe is currently set as last we must clear it to prevent
// subscriptions from being re-added.
if (pipe_ == _last_pipe) {
_last_pipe = NULL;
}
} else {
// Remove the pipe from the trie. If there are topics that nobody
// is interested in anymore, send corresponding unsubscriptions
@@ -348,6 +354,12 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
if (_manual && !_pending_pipes.empty ()) {
_last_pipe = _pending_pipes.front ();
_pending_pipes.pop_front ();
// If the distributor doesn't know about this pipe it must have already
// been terminated and thus we can't allow manual subscriptions.
if (_last_pipe != NULL && !_dist.has_pipe (_last_pipe)) {
_last_pipe = NULL;
}
}
int rc = msg_->close ();

View File

@@ -35,6 +35,7 @@
#include "err.hpp"
#include "atomic_ptr.hpp"
#include "platform.hpp"
namespace zmq
{
@@ -50,7 +51,7 @@ namespace zmq
// T is the type of the object in the queue.
// N is granularity of the queue (how many pushes have to be done till
// actual memory allocation is required).
#ifdef HAVE_POSIX_MEMALIGN
#if defined HAVE_POSIX_MEMALIGN
// ALIGN is the memory alignment size to use in the case where we have
// posix_memalign available. Default value is 64, this alignment will
// prevent two queue chunks from occupying the same CPU cache line on
@@ -181,7 +182,7 @@ template <typename T, int N> class yqueue_t
static inline chunk_t *allocate_chunk ()
{
#ifdef HAVE_POSIX_MEMALIGN
#if defined HAVE_POSIX_MEMALIGN
void *pv;
if (posix_memalign (&pv, ALIGN, sizeof (chunk_t)) == 0)
return (chunk_t *) pv;

View File

@@ -69,11 +69,13 @@
#define ZMQ_HELLO_MSG 110
#define ZMQ_DISCONNECT_MSG 111
#define ZMQ_PRIORITY 112
#define ZMQ_BUSY_POLL 113
#define ZMQ_HICCUP_MSG 114
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
#define ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED 0x2
#define ZMQ_RECONNECT_STOP_AFTER_DISCONNECT 0x3
#define ZMQ_RECONNECT_STOP_AFTER_DISCONNECT 0x4
/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10