Italiano - English

pkQueueTS - Coda thread safe per OpenCV Mat

A single lock, multiple producer/consumer (MPMC) thread safe queue with wait/timeout Pop. It also provides tools to manage elements like cv::Mat and an interface for custom OnPush event handler.

pkQueueTS is our thread safe queue based on std::queue.


  • It can be used as generic container;
  • Single lock, multiple producer multiple consumer;
  • It uses a condition_variable with time-out to wait for new data;
  • It provides tools to manage OpenCV Mat objects;
  • It provides an abstract interface to create custom OnPush event handler (the event could be used to perform any custom actions like memory analysis on the queue);
There are many implementations of thread safe queue. Some of them are lock free. Overall performance also depends on number of threads, number of processors, cache. Up to 4 processors the single-lock algorithm is quite performant. See  research by Maged M. Michael and Michael L. Scott on non-blocking and blocking concurrent queue.

class pkQueueTS

template <class TElement>
class pkQueueTS
    // Default constructor
    /** \overload
     *  \brief "OnPush" Constructor
     * If used, let's you to define your own OnPush event handler;
     * \param onPushHandler a pointer to a valid stats object see pkQueueOnPushBase
    pkQueueTS(pkQueueOnPushBase* onPushHandler);
    /** \brief Default destructor
     * Calls element destructor for all elements in the queue (if any).
     * \warning If element is a pointer, the pointed to objects are not destroyed.
    /** \brief Retrieves the oldest element in the queue, if any or if wait.
     * Blocks the caller thread until a new element is available or after the specified
     * time-out duration. When/If the element has been retrieved correctly is it's
     * removed from the queue.
     * \param [out] element If PK_QTS_OK is returned, contains the oldest element in the queue.
     * \param timeoutMs You can specify a time (milliseconds) to wait for a valid elements
     *        - `timeoutMs = 0` [default] means no wait at all
     *        - `timeoutMs > 0` means wait for given time out
     * \return pkQueueResults One of the following:
     *         - PK_QTS_OK The element has been retrieved correctly
     *         - PK_QTS_TIMEOUT  The queue is empty but we wont wait
     *         - PK_QTS_EMPTY Time out has reached waiting a valid element
    pkQueueResults Pop(TElement &element, unsigned timeoutMs = 0);
    /** \brief Inserts an element at the end of the queue
    * \param element The element to add.
    * \return The queue size after that the element has been added
    size_t Push(const TElement &element);
    size_t Size();
    bool Empty();
    std::queue<TElement>    m_queue;
    std::condition_variable m_dataReady;
    mutable std::mutex      m_mutex;
    pkQueueOnPushBase*      m_onPushHandler;

You can easily create a queue of your class just writing:

pkQueueTS<MyClass> myQueue;

How to use the OnPush feature

If the OnPush constructor is used, a custom object will be invoked each time pkQueueTS::Push() is called. In other words you can have a custom OnPush event handler that can be used to perform any custom actions on the queue.

The custom object must be derived from the abstract class pkQueueOnPushBase that is defined as below:

class pkQueueOnPushBase
    /** \brief OnPush event handler interface. 
     *  Must be defined in your own derived class.
     * \param queueSize Queue size after current push.
     * \param elementPtr Pointer to the last pushed element in the queue <`&m_queue.back()`>
     * \note This is called by pkQueueTS::Push() within a proper lock on the queue
     * \warning Do not use `elementPtr` out of here because it will becomes NULL on next Pop().
    virtual void OnPush(size_t queueSize, const void *elementPtr) = 0;

Because the pkQueueOnPushBase class is pure virtual, you have to derive your own class and write your own OnPush() .

In details, to use the OnPush feature you have to:

  1. Create your own class and implement the method OnPush(). For example, to know max queue size:
    class myOnPush: public pkQueueOnPushBase
        void OnPush(size_t queueSize, const void *elementPtr)
            lock_guard<mutex> lock(m_mutex);
            m_maxSize= max(m_maxSize,queueSize);
        size_t GetMaxSize()
            lock_guard<mutex> lock(m_mutex);
            return m_maxSize;
        size_t m_maxSize = 0;
        std::mutex m_mutex;
  2. Declare an instance of your own class
    myOnPush onPushObj;
  3. Declare your queue using the OnPush constructor
    pkQueueTS<MyClass> myQueue(&onPushObj);

In this case myQueue.Push() automatically calls onPushObj.OnPush().

Take care of shared resource ! Remember that the queue is used within a multithread environment. pkQueueTS::Push() calls  YourClass::OnPush() within a proper lock on the queue. Therefore the OnPush() method is thread safe vs those threads that are writing /reading the queue (also in case of multiple producer/consumer). The above example uses mutex because it's supposed that you can read m_maxSize  at any time from anywhere. In case you use myOnPush members only after that the queue has been terminated (or you won't access to at all) the mutex can be avoided.

A thread safe queue for OpenCV Mats

pkQueueTS can be used to manage queue of OpenCV Mats. Thanks to Automatic Memory Management of cv::Mat and C++/STL memory recycling, use of std::queue with cv::Mat is memory effective.

The OpenCV Mat data type

Unfortunately cv::Mat can't be used directly as element because its assignment operator cv::Mat::operator= and the copy constructor cv::Mat::Mat(const Mat &m), just copy the Mat header but not the matrix data.

OpenCV Mat Assignment  vs Clone

Given that std::queue::push calls the copy constructor of the element, using cv::Mat as element will push only the mat header but not the matrix data.

std::queue<cv::Mat> can't be used because it doesn't store the matrix data

A simple solution is to encapsulates cv::Mat into a structure or class and to write adequate assignment and constructor. The new schema becomes:

A queue of encapsulated cv::Mat with pkQueueTS

The template class MatCapsule_ is provided for the above reason.

The MatCapsule_ class
template <class TAdditionalStorage = int>
class MatCapsule_
    //! The OpenCV Mat
    Mat mat;
    /** \brief Storage for additional information (Template)
     * You can define you own data type to hold any kind of information.
     * - In case your `TAdditionalStorage` holds just simple types you can forget about copying.
     * - In case `TAdditionalStorage` holds any pointers or complex data type,
     *   you have to take care of copy constructor and assignment operator.
    TAdditionalStorage vars;
    /** \brief Default constructor */
    MatCapsule_() { }
    /** \brief Class destructor. Called by queue::pop */
    ~MatCapsule_() { }
    /** \brief The copy constructor. Called by queue::push
     * Performs a deep copy of the `mat` member and its data.
     * Calls the `TAdditionalStorage` copy constructor if any.
    MatCapsule_(const MatCapsule_& src)
        vars = TAdditionalStorage(src.vars); //Calls the copy constructor for the data type
    /** \brief The assignment operators. Called by queue::front
     * \warning In case you have any member pointers you need to clone them.
     * This is because, next calls to queue::pop calls the class destructor destroying all data.
     * With `cv::Mat` the matrix data is still valid thanks to its internal memory counter.
    MatCapsule_& operator=(const MatCapsule_& src)
        //Sanity check
        if (this == &src) return *this;
        vars = src.vars;    // Calls assignment operator for the data type
        mat = src.mat;      // Just copy the Mat header
        return *this;
/** \brief For your convenience, If you don't need any additional storage */
typedef MatCapsule_<int> MatCapsule;

As you can see MatCapsule_ is a simple structure for an OpenCV Mat with deep copy constructor and assignment operator.

MatCapsule_ is a template class so that you can attach your own data type to hold any kind of information. If you don't need of any additional storage you can use the MatCapsule type alias.

Therefore you can create a thread safe queue for OpenCV Mats easily:

pkQueueTS<MatCapsule> myQueueOfMats;

Additional Storage

To use additional storage, for example to keep some frame info, you can write:

/* Define your own storage class or structure */
struct MyImageInfo
    int64 timestamp = 0;
    int64 frameNum = 0;
/* Create a queue of your enhanced Mat capsule */
pkQueueTS<MatCapsule_<MyImageInfo>> queueOfMatsEx;

Even if it's really useful to define a type alias for your enhanced Mat capsule:

/* Create a Short-cut for your enhanced Mat capsule */
typedef MatCapsule_<MyImageInfo> myMatCapsule;
/* Create a queue of your enhanced Mat capsule */
pkQueueTS<myMatCapsule> queueOfMatsEx;

How to use pkQueueTS with OpenCV

Ok let's go to create a simple test case using:

  • a grabber thread captures frames from a camera and pushes them on the queue;
  • a main thread retrieves the frames from the queue and shows a live video;
pkQueueTS< MatCapsule > queueOfMats;    /* A thread safe queue of Mats */
atomic<bool> grabOn;                    /* Lock free var to control the grab thread */
/* A Simple grabbing thread writes on the queue */
void grabbingThread(int device)
    cv::VideoCapture cap(device);
    grabOn = true;
    MatCapsule elem;
    while (grabOn)
        cap >> elem.mat;
/* Main application thread reads from the queue and shows the image */
int Main_pkQueueTS_TestSimple()
    std::thread thGrabbing(grabbingThread, 0);
    MatCapsule elem;
    char ch = -1;
    while (ch<0)
        // pop with timeout
        pkQueueResults res = queueOfMats.Pop(elem, 1000);
        if (res != PK_QTS_OK)
            if (res == PK_QTS_TIMEOUT)
                cout << "WARNING: time out reading from the queue!" << endl;
            if (res == PK_QTS_EMPTY)
                cout << "INFO: the queue is empty!" << endl;
            // pass the control to other threads
        // process data
        cv::imshow("Image from the queue", elem.mat);
        ch = cv::waitKey(5);
        if (ch < 0)
        // terminate grabbing and flush the queue
        grabOn = false;
        while (PK_QTS_EMPTY != queueOfMats.Pop(elem, 0))
            cv::imshow("Image from the queue", elem.mat);
    return 0;

It looks easy and, trust me, it works fine... try it on your machine !

Memory analysis with pkQueueTS and OpenCV Mats

Using pkQueueTS with cv::Mat it's easy but what about memory usage ? We know that a new element is created on each Push(). In addiction, because we are managing images, a copy of queued images is also created.

Should we be worried about memory requirement considering so many allocations/deallocations ?

Our tests shows that the pkQueueTS is memory effective. Required memory depends on the size (length) of the queue despite of how many "push" we will perform. Details about the test are here: Memory analysis on std::queue as buffer of cv::Mat. Part II, a real test

Vedi anche:

Analisi di memoria pkQueueTS come buffer di cv::Mat. Parte 2, caso reale.

10-12-2016 676

In Analisi di Memoria-Parte 1 si conclude che una coda std::queue con OpenCV Mats utilizza la memoria in modo efficiente. In questo articolo testiamo la nostra coda pkQueueTS con un grabber thread e un processor thread e analizziamo l'utilizzo della memoria per confermare le conclusioni preliminari su riciclo e efficienza.

Vota questa pagina:

0 Commenti:

Lascia il tuo commento:

  • La tua email non è obligatoria e non sarà visibile in alcun modo
  • Si prega di inviare solo commenti relativi a questa pagina
  • Commenti inappropriati o offensivi saranno modificati o eliminati
  • Codici HTML non sono consentiti. Prego usare i BB code:
    [b]bold[/b], [u]underline[/u], [i]italic[/i], [code]code[/code]
Il codice, le illustrazioni e gli esempi riportati in questa pagina sono solo a scopo illustrativo. L'autore non prende alcuna responsabilità per il loro utilizzo da parte dell'utente finale.
Questo materiale è di proprietà di Pk Lab ed è utilizzabile liberamente a condizione di citarne la fonte.