Program Listing for File nng_connector.cpp

Return to documentation for file (src/o3ds/nng_connector.cpp)

#include "nng_connector.h"

#include <string>
#include <string.h>
#include "nng/nng.h"


namespace O3DS
{
    BlockingNngConnector::~BlockingNngConnector()
    {
        std::lock_guard<std::mutex> guard(mutex);
        nng_close(mSocket);
    }

    void BlockingNngConnector::setError(const char* msg, int ret)
    {
        mError = msg;
        mError += ": ";
        mError += nng_strerror(ret);
        mState = Connector::STATE_ERROR;
    }

    bool BlockingNngConnector::write(const char* data, size_t len)
    {
        int ret;
        nng_msg* msg;

        std::lock_guard<std::mutex> guard(mutex);

        ret = nng_msg_alloc(&msg, 0);
        NNG_ERROR("Creating message")

        ret = nng_msg_append(msg, data, len);
        NNG_ERROR("Appending message")

        ret = nng_sendmsg(mSocket, msg, 0);
        NNG_ERROR("Sending message")

        return true;
    }

    // Read bytes - len is the fixed size of data
    size_t BlockingNngConnector::read(char* data, size_t len)
    {
        int ret;

        nng_msg* msg = nullptr;

        ret = nng_recvmsg(mSocket, &msg, 0);
        NNG_ERROR("Receiving message");

        size_t msglen = nng_msg_len(msg);
        if (msglen > len)
        {
            nng_msg_free(msg);
            Connector::setError("Message too large");
            return false;
        }

        void* msgBody = nng_msg_body(msg);
        if (!msgBody)
        {
            Connector::setError("Invalid Message");
            nng_msg_free(msg);
            return false;
        }

        memcpy(data, msgBody, msglen);

        nng_msg_free(msg);

        return msglen;
    }

    size_t BlockingNngConnector::read(char** data, size_t* len)
    {
        if (data == nullptr || len == nullptr)
        {
            Connector::setError("Invalid parameter");
            return 0;
        }

        int ret;

        nng_msg* msg = nullptr;

        ret = nng_recvmsg(mSocket, &msg, 0);
        NNG_ERROR("Receiving message");

        size_t msglen = nng_msg_len(msg);
        if (msglen > *len)
        {
            char* buf = (char*)realloc(*data, msglen);
            if (!buf) return 0;
            *len = msglen;
        }

        void* msgBody = nng_msg_body(msg);
        if (!msgBody)
        {
            Connector::setError("Invalid Message");
            nng_msg_free(msg);
            return false;
        }

        memcpy(*data, msgBody, msglen);

        nng_msg_free(msg);

        return msglen;

    }

    /*  ASYNC */

    void AsyncNngConnector::stop()
    {
        if(mSocket.id == 0)
            return;

        nng_close(mSocket);
        mSocket = NNG_SOCKET_INITIALIZER;
        nng_dialer_close(mDialer);
        if (aio) nng_aio_stop(aio);
        aio = nullptr;
    }

    void AsyncNngConnector::setError(const char* msg, int ret)
    {
        mError = msg;
        mError += ": ";
        mError += nng_strerror(ret);
        mState = Connector::STATE_ERROR;
    }


    bool AsyncNngConnector::write(const char* data, size_t len)
    {
        int ret;
        nng_msg* msg;

        std::lock_guard<std::mutex> guard(mutex);

        ret = nng_msg_alloc(&msg, 0);
        NNG_ERROR("Message alloc");

        ret = nng_msg_append(msg, data, len);
        NNG_ERROR("Creating message")

            ret = nng_sendmsg(mSocket, msg, NNG_FLAG_NONBLOCK);
        NNG_ERROR("Sending message")

            return true;
    }


    size_t AsyncNngConnector::read(char* data, size_t len)  // Read bytes - len is the size of data
    {
        int ret;
        std::lock_guard<std::mutex> guard(mutex);

        nng_msg* msg;

        ret = nng_recvmsg(mSocket, &msg, NNG_FLAG_NONBLOCK);
        if (ret == NNG_EAGAIN) { return 0; }
        NNG_ERROR("Receiving message");

        if (msg == nullptr)
        {
            return 0;
        }

        size_t msglen = nng_msg_len(msg);
        if (msglen > len)
        {
            Connector::setError("Message too large");
            return false;
        }

        void* msgBody = nng_msg_body(msg);
        if (!msgBody)
        {
            Connector::setError("Invalid Message");
            nng_msg_free(msg);
            return false;
        }

        memcpy(data, msgBody, len);

        nng_msg_free(msg);

        mState = Connector::READING;

        return msglen;
    }

    size_t AsyncNngConnector::read(char** data, size_t* len)
    {
        if (data == nullptr || len == nullptr)
        {
            Connector::setError("Invalid parameter");
            return 0;
        }

        int ret;

        nng_msg* msg = nullptr;

        ret = nng_recvmsg(mSocket, &msg, NNG_FLAG_NONBLOCK);
        if (ret == NNG_EAGAIN) { return 0; }
        NNG_ERROR("Receiving message");

        if (msg == nullptr)
            return 0;

        size_t msglen = nng_msg_len(msg);
        if (msglen > *len)
        {
            char* buf = (char*)realloc(*data, msglen);
            if (!buf) return 0;
            *len = msglen;
        }

        void* msgBody = nng_msg_body(msg);
        if (!msgBody)
        {
            Connector::setError("Invalid Message");
            nng_msg_free(msg);
            return false;
        }

        memcpy(*data, msgBody, msglen);

        nng_msg_free(msg);

        return msglen;
    }


    bool AsyncNngConnector::asyncReadMsg()
    {
        // Only calls nng_recv_aio if the message was okay.
        int ret;

        std::lock_guard<std::mutex> guard(mutex);

        ret = nng_aio_result(aio);
        if (ret != 0)
        {
            setError("Async read", ret);
            mState = Connector::STATE_ERROR;
            return false;
        }

        nng_msg* msg = nng_aio_get_msg(aio);
        if (msg == nullptr)
        {
            Connector::setError("No message wile doing an async read");
            mState = Connector::STATE_ERROR;
            return false;
        }

        if (mInDataFunc) mInDataFunc(mContext, nng_msg_body(msg), nng_msg_len(msg));

        nng_msg_free(msg);

        nng_recv_aio(mSocket, aio);

        mState = Connector::READING;

        return true;

    }
} // namespace