I can't get the video read by the ROV's camera to be processed by two threads simultaneously

I use Bluerov2 for development.I want the video read by the camera to be processed simultaneously by two Python threads with different functions. I instantiated two objects, cap1 and cap2, based on the class “Video” in the website OpenCV · GitBook. But when I run this code, the terminal outputs an error message. It looks like I can’t create two objects based on the “Video” class. How can I make the Video read by the camera be processed by both threads?
My code is below:

"""
BlueRov video capture class
"""

import cv2
import gi
import numpy as np

gi.require_version('Gst', '1.0')
from gi.repository import Gst


class Video():
    """BlueRov video capture class constructor

    Attributes:
        port (int): Video UDP port
        video_codec (string): Source h264 parser
        video_decode (string): Transform YUV (12bits) to BGR (24bits)
        video_pipe (object): GStreamer top-level pipeline
        video_sink (object): Gstreamer sink element
        video_sink_conf (string): Sink configuration
        video_source (string): Udp source ip and port
        latest_frame (np.ndarray): Latest retrieved video frame
    """

    def __init__(self, port=5600):
        """Summary

        Args:
            port (int, optional): UDP port
        """

        Gst.init(None)

        self.port = port
        self.latest_frame = self._new_frame = None

        # [Software component diagram](https://www.ardusub.com/software/components.html)
        # UDP video stream (:5600)
        self.video_source = 'udpsrc port={}'.format(self.port)
        # [Rasp raw image](http://picamera.readthedocs.io/en/release-0.7/recipes2.html#raw-image-capture-yuv-format)
        # Cam -> CSI-2 -> H264 Raw (YUV 4-4-4 (12bits) I420)
        self.video_codec = '! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264'
        # Python don't have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)
        self.video_decode = \
            '! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert'
        # Create a sink to get data
        self.video_sink_conf = \
            '! appsink emit-signals=true sync=false max-buffers=2 drop=true'

        self.video_pipe = None
        self.video_sink = None

        self.run()

    def start_gst(self, config=None):
        """ Start gstreamer pipeline and sink
        Pipeline description list e.g:
            [
                'videotestsrc ! decodebin', \
                '! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
                '! appsink'
            ]

        Args:
            config (list, optional): Gstreamer pileline description list
        """

        if not config:
            config = \
                [
                    'videotestsrc ! decodebin',
                    '! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
                    '! appsink'
                ]

        command = ' '.join(config)
        self.video_pipe = Gst.parse_launch(command)
        self.video_pipe.set_state(Gst.State.PLAYING)
        self.video_sink = self.video_pipe.get_by_name('appsink0')

    @staticmethod
    def gst_to_opencv(sample):
        """Transform byte array into np array

        Args:
            sample (TYPE): Description

        Returns:
            TYPE: Description
        """
        buf = sample.get_buffer()
        caps_structure = sample.get_caps().get_structure(0)
        array = np.ndarray(
            (
                caps_structure.get_value('height'),
                caps_structure.get_value('width'),
                3
            ),
            buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8)
        return array

    def frame(self):
        """ Get Frame

        Returns:
            np.ndarray: latest retrieved image frame
        """
        if self.frame_available:
            self.latest_frame = self._new_frame
            # reset to indicate latest frame has been 'consumed'
            self._new_frame = None
        return self.latest_frame

    def frame_available(self):
        """Check if a new frame is available

        Returns:
            bool: true if a new frame is available
        """
        return self._new_frame is not None

    def run(self):
        """ Get frame to update _new_frame
        """

        self.start_gst(
            [
                self.video_source,
                self.video_codec,
                self.video_decode,
                self.video_sink_conf
            ])

        self.video_sink.connect('new-sample', self.callback)

    def callback(self, sink):
        sample = sink.emit('pull-sample')
        self._new_frame = self.gst_to_opencv(sample)

        return Gst.FlowReturn.OK


cap1=Video()
cap2=Video()

The following is an error message from my terminal:


Thank you for your reply!

Hi @robotmark,

You can’t connect to the same port with two things at once - if you could they would both only get some of the packets, and the signal wouldn’t be usable.

The simplest approach is for each thread to read the latest frame from the same Video instance whenever they need a new frame.

That approach means it’s possible for one processing thread to miss some frames, or if it’s very fast it could end up processing the same frame multiple times. If you want to guarantee that each processing thread processes every frame and/or only processes each frame once then you can modify the Video class to pass each new frame to a separate queue.Queue for each processing thread, and the processing threads would just get their frames from their respective Queues.

While @EliotBR is mostly right, you can sometimes share udp ports. Check the “reuse” flag in udpsrc documentation documentation.

That still means you are decoding the video twice, which is not ideal.

2 Likes

Thank you for your answer. I copy down the python code from OpenCV · GitBook , try to reuse in it’s ports, but ended in failure. I don’t know how to use the “reuse” API, can you give me an example? Thank you again.

Try changing self.video_source = 'udpsrc port={}'.format(self.port) to self.video_source = 'udpsrc reuse=true port={}'.format(self.port)

But it does say in there that ‘reuse=true’ is the default. so I’m not sure if that will work.

1 Like

Thank you for your answer and please forgive me for replying to your message so late. I tried to do it the way you said, but I failed. Do you have any other ideas? Thank you very much.

Have you tried either of the approaches I mentioned in my first response?
Both of them should work, although which one works best for you depends on your usage requirements.

1 Like

Thanks for your reply. I tried as you suggested and created an instance using the Video class. I read the latest frame from this instance in two threads, but I failed, maybe I didn’t understand what you meant.

Here is the code I used to create the instance using the Video class:

"""
BlueRov video capture class
"""

import cv2
import gi
import numpy as np

gi.require_version('Gst', '1.0')
from gi.repository import Gst


class Video():
    """BlueRov video capture class constructor

    Attributes:
        port (int): Video UDP port
        video_codec (string): Source h264 parser
        video_decode (string): Transform YUV (12bits) to BGR (24bits)
        video_pipe (object): GStreamer top-level pipeline
        video_sink (object): Gstreamer sink element
        video_sink_conf (string): Sink configuration
        video_source (string): Udp source ip and port
        latest_frame (np.ndarray): Latest retrieved video frame
    """

    def __init__(self, port=5600):
        """Summary

        Args:
            port (int, optional): UDP port
        """

        Gst.init(None)

        self.port = port
        self.latest_frame = self._new_frame = None

        # [Software component diagram](https://www.ardusub.com/software/components.html)
        # UDP video stream (:5600)
        self.video_source = 'udpsrc port={}'.format(self.port)
        # [Rasp raw image](http://picamera.readthedocs.io/en/release-0.7/recipes2.html#raw-image-capture-yuv-format)
        # Cam -> CSI-2 -> H264 Raw (YUV 4-4-4 (12bits) I420)
        self.video_codec = '! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264'
        # Python don't have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)
        self.video_decode = \
            '! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert'
        # Create a sink to get data
        self.video_sink_conf = \
            '! appsink emit-signals=true sync=false max-buffers=2 drop=true'

        self.video_pipe = None
        self.video_sink = None

        self.run()

    def start_gst(self, config=None):
        """ Start gstreamer pipeline and sink
        Pipeline description list e.g:
            [
                'videotestsrc ! decodebin', \
                '! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
                '! appsink'
            ]

        Args:
            config (list, optional): Gstreamer pileline description list
        """

        if not config:
            config = \
                [
                    'videotestsrc ! decodebin',
                    '! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
                    '! appsink'
                ]

        command = ' '.join(config)
        self.video_pipe = Gst.parse_launch(command)
        self.video_pipe.set_state(Gst.State.PLAYING)
        self.video_sink = self.video_pipe.get_by_name('appsink0')

    @staticmethod
    def gst_to_opencv(sample):
        """Transform byte array into np array

        Args:
            sample (TYPE): Description

        Returns:
            TYPE: Description
        """
        buf = sample.get_buffer()
        caps_structure = sample.get_caps().get_structure(0)
        array = np.ndarray(
            (
                caps_structure.get_value('height'),
                caps_structure.get_value('width'),
                3
            ),
            buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8)
        return array

    def frame(self):
        """ Get Frame

        Returns:
            np.ndarray: latest retrieved image frame
        """
        if self.frame_available:
            self.latest_frame = self._new_frame
            # reset to indicate latest frame has been 'consumed'
            self._new_frame = None
        return self.latest_frame

    def frame_available(self):
        """Check if a new frame is available

        Returns:
            bool: true if a new frame is available
        """
        return self._new_frame is not None

    def run(self):
        """ Get frame to update _new_frame
        """

        self.start_gst(
            [
                self.video_source,
                self.video_codec,
                self.video_decode,
                self.video_sink_conf
            ])

        self.video_sink.connect('new-sample', self.callback)

    def callback(self, sink):
        sample = sink.emit('pull-sample')
        self._new_frame = self.gst_to_opencv(sample)

        return Gst.FlowReturn.OK



cap=Video()


""" 
if __name__ == '__main__':
    # Create the video object
    # Add port= if is necessary to use a different one
    video = Video()

    while True:
        # Wait for the next frame
        if not video.frame_available():
            continue

        frame = video.frame()
        cv2.imshow('frame', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

 """

Here is a function from one of my threads. I used the instance ‘cap’ that I created:

import warnings
import queue
import cv2
import numpy
import time

import numpy as np

from video import *
q1=queue.LifoQueue(-1)

def TubeDilate(src, dilate_size, dilate_times):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (dilate_size, dilate_size))
    for i in range(dilate_times):
        if dilate_times==0:
            continue
        src = cv2.dilate(src, kernel)
    return src

def TubeErode(src, erode_size, erode_times):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (erode_size, erode_size))
    for i in range(erode_times):
        if erode_times==0:
            continue
        src = cv2.erode(src, kernel)
    return src

def FollowMode_GetBottomTarget(src):

    dst = src.copy()
    # cv2.imshow('test', dst)

    bImg, gImg, rImg = cv2.split(dst)

    color_img1 = cv2.subtract(rImg, bImg)
    # color_img2 = cv2.subtract(color_img1, gImg)
    ret, bin = cv2.threshold(color_img1, 110, 255, cv2.THRESH_BINARY)
    # ret, bin2 = cv2.threshold(color_img1, 0, 255, cv2.THRESH_OTSU)

    gray_img = cv2.cvtColor(dst, cv2.COLOR_BGR2GRAY)
    ret, gray_bin = cv2.threshold(gray_img, 50, 255, cv2.THRESH_BINARY)


    bin = bin&gray_bin

    # cv2.imshow('test2', bin)

    bin = TubeErode(bin, 4, 1)
    # cv2.imshow('test3', bin)

    bin = TubeDilate(bin, 4, 3)
    cv2.imshow('test4', bin)


    no_father_contours = []


    binary, contours, hierarchy = cv2.findContours(bin, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)

    if contours:
        for i in range(len(contours)):
            if hierarchy[0][i][3] == -1:
                contour_area = cv2.contourArea(contours[i])

                if (contour_area < 2000 or contour_area > 70000):
                    continue
                no_father_contours.append(contours[i])

        print('number', len(no_father_contours))

    if len(no_father_contours) != 0:
        hull_no_father_contours = [None]*len(no_father_contours)
        # cv2.drawContours(src, no_father_contours, -1, (255, 255, 0), 2)

        for i in range(len(no_father_contours)):
            contour_area = cv2.contourArea(no_father_contours[i])

            hull_no_father_contours[i] = cv2.convexHull(no_father_contours[i], False)

        lowest_ps = []

        for i in range(len(hull_no_father_contours)):
            if hull_no_father_contours[i].any():
                print("here", len(hull_no_father_contours[i]))
                cv2.polylines(src, [hull_no_father_contours[i]], True, (255, 255, 0), 2)

                for j in range(len(hull_no_father_contours[i])):
                    if j == 0:
                        temp_p = hull_no_father_contours[i][j][0]
                    else:

                        if hull_no_father_contours[i][j][0][1] > temp_p[1]:
                            temp_p = hull_no_father_contours[i][j][0]

                lowest_ps.append(temp_p)

        for i in range(len(lowest_ps)):            
            if i == 0:
                lowest_p = lowest_ps[i]
            else:
                if lowest_ps[i][1] > lowest_p[1]:
                    lowest_p = lowest_ps[i]

        #cv2.circle(src, lowest_p, 3, (0, 255, 255), 2, lineType=2)

        spectrum = 2 * (lowest_p[0] / src.shape[1]) - 1
        print('result', spectrum)


    else:
        spectrum = 0.0
        print('no tube')

    cv2.imshow('t', src)
    q1.put(spectrum)
    return spectrum

def patrol(Boolean):

    t = 0.1
    counter = 0
    fps = 0
    start_time = time.time()
    if Boolean==True:

        if cap.frame_available():
            frame = cap.frame()
            cv2.imshow("cap", frame)

            Spectrum = FollowMode_GetBottomTarget(frame)
            counter += 1
            if (time.time() - start_time) > t:
                fps = counter / (time.time() - start_time)
                fps = str(fps)
                counter = 0
                start_time = time.time()
            cv2.putText(frame, "FPS {0}".format(fps), (10, 30), 1, 1.5, (255, 0, 255), 2)

            cv2.imshow("cap", frame)
            cv2.waitKey(1)

            Spectrum = FollowMode_GetBottomTarget(frame)
    
    # cap.release()
    # cv2.destroyAllWindows()
if __name__=='__main__':
    while 1:
        patrol(True)

Maybe I didn’t catch your meaning, could you give me some more hints?Thank you very much.

What do you mean by “failed”? Was there an error message, or did something else occur?

The code you’ve presented there should work fine in a single thread, but because you’re checking for if a new frame is available before getting the latest frame if you run that (or similar code) in multiple threads they will effectively fight for images, and can never receive the same image (it’s possible that one thread could end up getting most of the images).

As mentioned, if you want to receive frames in multiple threads then your main options are either

  • just use cap.frame() (with no guard to check for new frames) in each thread, at the cost of potentially processing the same image multiple times, and/or missing some images in one or more threads if they process too slowly
  • use a queue.Queue per thread, so that each thread processes every frame exactly once, with the risk that one or more threads may get behind if they process slower than the frames arrive
    • can be implemented by a modified Video class that takes in a list of Queues at initialisation (__init__) and dispatches each frame into all of the Queues as it arrives (in the callback function, instead of the current self._new_frame / self.frame() internal storage approach)

Separately, the code you provided doesn’t use threading.Thread, and isn’t a single function that could be called in a thread - presumably that’s something you’re doing in code that isn’t shown? If not, how are you creating your threads?

Also, I believe OpenCV doesn’t tend to enjoy handling user interface components from multiple threads, so there may be issues if you’re trying to use imshow and/or waitKey in more than one thread.