Program Listing for File async_subscriber.cpp
↰ Return to documentation for file (src/o3ds/async_subscriber.cpp
)
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <time.h>
#include "async_subscriber.h"
namespace O3DS
{
bool AsyncSubscriber::start(const char *url)
{
int ret;
ret = nng_sub0_open(&mSocket);
NNG_ERROR("Async Subscriber open socket");
ret = nng_setopt(mSocket, NNG_OPT_SUB_SUBSCRIBE, "", 0);
NNG_ERROR("Setting subscribe flag")
ret = nng_dialer_create(&mDialer, mSocket, url);
NNG_ERROR("Creating dialer")
//ret = nng_pipe_notify(mSocket, nng_pipe_ev::NNG_PIPE_EV_ADD_POST, AsyncSubscriber::pipeEvent, this);
//NNG_ERROR("Setting pipe notify")
// Async dial - pipe will be created on connection
ret = nng_dialer_start(mDialer, NNG_FLAG_NONBLOCK);
NNG_ERROR("Subscriber async dialier")
ret = nng_aio_alloc(&aio, AsyncSubscriber::callback, this);
NNG_ERROR("Subscriber async aio alloc")
nng_recv_aio(mSocket, aio);
mState = Connector::STARTED;
return true;
}
void AsyncSubscriber::pipeEvent_(nng_pipe pipe, nng_pipe_ev pipe_ev)
{
// int ret;
if (pipe_ev == nng_pipe_ev::NNG_PIPE_EV_ADD_POST)
{
nng_socket s= nng_pipe_socket(pipe);
//int ret = nng_ctx_open(&ctx, mSocket);
//if (ret != 0) return;
//nng_ctx_recv(ctx, aio);
//in_pipe();
}
if (pipe_ev == nng_pipe_ev::NNG_PIPE_EV_REM_POST)
return; // TODO
}
}