Basic ESC Response Time (Ardusub and Pymavlink)

Hi!

I have a setup similar to the Blue ROV2 Heavy, with 8 T200 thrusters and 8 Basic ESCs. I am trying to control the thrusters directly using Ardusub and Pymavlink. I would prefer to control the thruster independently, so I am using the MAV_CMD_DO_SET_SERVO function. I have the following code:

start_time = time.time()
for thruster_index in range(self.num_thrusters):
      self.thruster_cmd(thruster_index+1, pwm[thruster_index])

action_published = False
while not action_published:                
      if np.array_equal(pwm, np.array(get_thruster_outputs())):
            action_published = True

delay = time.time() - start_time

The get_thruster_outputs() function checks the SERVO_OUTPUT_RAW message.

However, I am observing a significant and inconsistent delay (never less than 0.1-0.2 s and that can go up to 0.8s) in applying the PWMs in the thrusters, and I need a high and consistent control frequency. Is this a physical characteristic of the ESCs/thrusters, or are there Ardusub parameters that can be adjusted to improve this response time?

Thank you in advance!

Best regards!

Hi @DionisioEng -
Welcome to the forums!

If you’re using ArduSub, use it! If you want to command each motor with your own software, I’d suggest something like Navigator web assistant to minimize latency / complexity (disabling ArduSub)

I’m very curious why controlling individual motors is desirable for your application - are the built in stabilize / depth hold ArduSub modes not desirable for your vehicle? Abstracting similar functionality to another layer will definitely introduce latency… I would bet your pymavlink code is not processing the firehose of mavlink messages fast enough to achieve the latency you desire. Is it possible to share your entire script?

Hi @tony-white!

Thank you for the response!

I want to test custom control code, that is why I need to control the thrusters individually. But I also require Ardusub to control the vehicle manually and have all its functionalities, while I am not testing my code. What I provided is basically the code I want to use to control the thrusters. So, do you think that with pymavlink I will not be able to achieve low latency?

Hi @DionisioEng -
Please share the portions of your code where you receive mavlink messages, and where they are sent. Improving the latency should definitely be possible.

If the vehicle operates satisfactorily in stabilize/alt_hold mode when running ArduSub, your custom control code may be simplified by commanding “pilot input” - or virtually moving the joysticks (sending the same signals that a user would) to achieve the desired position or heading change. This lets the ArduSub control loops to run, which do so at hundreds of Hz! Tuning these control loops is also already possible if you need to adjust the “feel” of the responses to fit your vehicle. This also applies to auto mode navigation control loop gains…

So, basically I am using standard pymavlink code. This is my function to sent PWM commands to the motors:

def thruster_cmd(servo_id, pwm, conn):
    rospy.loginfo(f'Sending {pwm} PWM for thruster {servo_id}')
    cmd = conn.mav.command_long_encode(
          conn.target_system, conn.target_component,
         mavutil.mavlink.MAV_CMD_DO_SET_SERVO,
         0,
         servo_id,
         pwm,
         0,0,0,0,0
     )
     conn.mav.send(cmd)

Since the motors need to be disabled from Ardusub to be able to send commands, I don’t think those parameters have an effect (but correct me if I am wrong).

And this function to get the thrusters outputs:

def get_thruster_outputs(self):
     servo_outputs = self.recv_match(type='SERVO_OUTPUT_RAW',
                                         blocking=True).to_dict()
     thruster_outputs = [servo_outputs[f'servo{i+1}_raw'] - 1500
                             for i in range(self.thrusters)]
     return thruster_outputs

Essentially, I want to set the PWM of the thrusters as fast as possible, since the thruster commands will change dinamically at a rate of about 4 to 5 Hz.

Hi @DionisioEng -
Correct, those parameters won’t effect your script based control, I was just pointing out they were available as an alternative to scripting at all - still chasing an understand of what the application is that makes custom control necessary!

Mavlink messages from the vehicle are sent as quite a firehose of data, so how your parse and throw away what you’re not using will affect the rates /latency your script will send and receive data at. This thread may be relevant!

@tony-white, thank you very much for the suggestions, I will definitely check them out and give feedback.

In that sense, you believe the latency comes solely from the MAVLink communication when receiving the messages? You don’t think the delay could stem from the time between sending the commands and the thrusters actually applying them (physical or software delay)?

Hi @DionisioEng -
Yes, I believe your latency issue stems from how you’re accomplishing mavlink communications, although I still don’t know if you’ve shared the most relevant code to that - the latency from a mavlink command to motor action should be on the order of <~100ms, not the 0.8s you sometimes see. That said, the latency will be higher with this approach than one that relies on the autopilot for motor control, and sends higher level position or target orientation commands - not necessarily to the response to this command, but to the system’s speed in responding to external forces.

Hi, @tony-white!

I tried to create an example of what I am trying to do:

class MAVLinkTest:
    def __init__(self):
        # Define initial parameters #
        self.num_thrusters = 8
        self.loop_time = 5.0 # seconds
        self.action = 1530
        self.active = True

        # Create a ROS node #
        rospy.init_node('mavlink_actions_test_node', anonymous=False)

        # Define the loop rate #
        rospy.Rate(1000)

        # Create the connection with Kame #
        self.master = mavutil.mavlink_connection('udp:192.168.2.1:14550')

        # Make sure the connection is valid #
        self.master.wait_heartbeat()
        print('Connection established!!!')

        # Initialize the thread #
        self.keeper_thread = threading.Thread(target=self._keeper_loop, daemon=True)
        self.keeper_thread.start()

        # Wait until arming confirmed #
        print("Waiting for the vehicle to arm...")
        self.master.motors_armed_wait()
        print('Armed!')

        # Set the rate of the SERVO_OUTPUT_RAW message #
        self.request_message_interval(50)
        time.sleep(2)

        # Set the thrusters to autonomous mode #
        self.set_thrusters_mode()
        self.set_thrusters_mode('autonomous')

    def _keeper_loop(self):
        while self.active:
            self.servo_outputs = self.master.recv_match(type='SERVO_OUTPUT_RAW', blocking=True)

    def request_message_interval(self, frequency_hz: float):
        self.master.mav.command_long_send(
            self.master.target_system,
            self.master.target_component,
            mavutil.mavlink.MAV_CMD_SET_MESSAGE_INTERVAL,
            0,
            mavutil.mavlink.MAVLINK_MSG_ID_SERVO_OUTPUT_RAW, # The MAVLink message ID
            1e6 / frequency_hz, # The interval between two messages in microseconds. Set to -1 to disable and 0 to request default rate.
            0, 0, 0, 0, # Unused parameters
            0, # Target address of message stream (if message has target address fields). 0: Flight-stack default (recommended), 1: address of requestor, 2: broadcast.
        )

    # Function to set motors to autonomous mode #
    def set_thrusters_mode(self, mode='normal'):
        for thruster_index in range(self.num_thrusters):
            if mode == 'autonomous':
                self.master.mav.param_set_send(self.master.target_system, 
                                               self.master.target_component, 
                                               ("SERVO" + str(thruster_index+1) + "_FUNCTION\0").encode(), 
                                               0, 
                                               mavutil.mavlink.MAV_PARAM_TYPE_INT32)
            elif mode == 'normal':
                self.master.mav.param_set_send(self.master.target_system, 
                                               self.master.target_component,
                                               ("SERVO" + str(thruster_index+1) + "_FUNCTION\0").encode(), 
                                               33+thruster_index, 
                                               mavutil.mavlink.MAV_PARAM_TYPE_INT32)
                
    # Function to send a PWM command to a thruster #
    def thruster_cmd(self, servo_id):
        cmd = self.master.mav.command_long_encode(
            self.master.target_system, 
            self.master.target_component,
            mavutil.mavlink.MAV_CMD_DO_SET_SERVO,
            0,
            servo_id,
            self.action,
            0,0,0,0,0
        )

        # Send the command to the thruster #
        self.master.mav.send(cmd)
                
    def finish_test(self):
        # Return the thrusters to normal mode #
        self.set_thrusters_mode()

        # Stop the thread cleanly #
        self.active = False
        self.keeper_thread.join()

    def run(self):
        loop_start_time = time.time()
        while (time.time() - loop_start_time) < self.loop_time:
            # Change the action #
            self.action += 2

            # Send the action to the thrusters #
            action_start_time = time.time()
            for i in range(8):
                thr_id = i + 1
                self.thruster_cmd(thr_id)
            action_send_time = time.time()
            print(f"Send time -> {np.around(action_send_time - action_start_time, 4)}")

            while True:
                count_check = 0
                # Request the SERVO_OUTPUT_RAW message #
                if self.servo_outputs is not None:
                    servo_outputs = self.servo_outputs.to_dict()
                    thruster_outputs = [servo_outputs[f'servo{i+1}_raw'] for i in range(self.num_thrusters)]

                # Check if the action was applied in the thrusters #
                for i in range(8):
                    thr = thruster_outputs[i]
                    if (int(self.action) - 1 < int(thr) < int(self.action) + 1):
                        count_check += 1
                if count_check == 8:
                    break

            action_apply_time = time.time()
            print(f"Apply time -> {np.around(action_apply_time - action_send_time, 4)}")
            print()

        # Finish the test #
        self.finish_test()

if __name__ == '__main__':
    # Initialize the test #
    mavlink_test = MAVLinkTest()

    # Run the test #
    mavlink_test.run()

I checked your suggestions and increased the loop time. However, my apply_time is still considerable and superior to 0.2 s, which is too much for me. Do you think there is a way of reducing this time? Preferably the 0.1 s you talked about.

Thank you very much for the attention!

How are you measuring this?

This is a measurement of your code/communication performance, and is not at all related to the response time of the actual ESCs.

Is this relevant to your program? It doesn’t seem to be used elsewhere in your posted code, but if you happen to have other ROS code running elsewhere then it’s possible for the ROS communications to compete with the MAVLink communications for bus time / processing attention, which could potentially intermittently increase your communication latency.

1 Like

Hi @EliotBR!

Thank you for the response!

I am measuring the delay through those time counters, comparing the PWM I asked for and the PWM applied to the thrusters (SERVO_OUTPUT_RAW). Just like @tony-white explained, part of my delay is due to the communications and part due to the ESCs. What I am trying to reduce as much as possible is the communication time (my knowledge is not vast, as I am fairly new in Ardusub, sorry for that!).

No, ROS is not used in any other portion of the code (the code presented represents what I want to do). In fact, I was not even using ROS, I just used to try to increase the control loop, as I seen in one of the threads @tony-white recommended.

Basically, what I am trying to achieve is a control loop where I change the thruster commands every ~0.2 s, and the action persists during that time, preferably using Pymavlink, as it is easily and fastly applied. Do you have recommendations on how I can improve my control loop with this framework?

The poster in that thread specifically wanted to read MAVLink messages to integrate telemetry data with their ROS project, and the solution was making sure they were receiving messages quickly enough while also allocating time for the ROS part of their code. If ROS isn’t already part of your project then you don’t need to add it to try to solve a MAVLink issue.

It’s perhaps worth noting that the loop_time in your example is the full duration you’re running your test loop for, before you stop the test, whereas the loop time in their example was the time for each iteration, which was particularly relevant because they were reading messages and doing their own processing sequentially, in a single thread.

If you want performant control over individual thrusters then the best way of achieving that is code running on the autopilot, which has minimal latency and high frequency access to both the onboard sensors and the output controls.

Your example receives the MAVLink messages in a separate thread, with no meaningful added delay between messages, so reducing latency for you is likely more about your network connection, and perhaps digging into where the extra time is being taken. As an example, if you receive the command acknowledgements from the autopilot for all your DO_SET_SERVO commands (which you’re not currently checking for), and then there’s still a significant delay before the command results are confirmed in the SERVO_OUTPUT_RAW message then the delay is confined to inside the autopilot and/or on the return side of the network.

1 Like

Thank you for the response, @EliotBR!

I removed the ROS portion of my code and increased the frequency of the SERVO_OUTPUT_RAW message to 50 Hz (through the request_message_interval() function). However, what I observed was that some messages took considerably longer time to arrive than others (80 to 160 ms), and I don’t understand why.

Since then, I also tried to use the send_rc() function, which sends all the thruster commands simultaneously, but obtained the same results. Another attempt I tried was to use MAVROS (purely based on C++), through the /mavros/rc/override topic, but the behavior is similar. And even without checking the message, it seems to be a delay in the motor actuation.

All the tests were performed on a topside computer, which communicates with the vehicle via tether. But testing on the onboard computer didn’t result in considerable improvements.

I am a little bit lost to where the problem might originate from, but it seems to me it might be the Ardupilot, like you suggested. Considering that as true (which I am not sure), is there anything to be done to solve this problem? You suggested running the code on Ardupilot (which I don’t think it will be possible in my case), but how would that be done? Is there other option?

It’s a bit of a stretch, but this issue may also be relevant - you can try disabling MAVLink from the LOG_BACKEND parameter, or changing which MAVLink router you’re using in BlueOS.

  1. The firmware is open source, so you can access the code and build a custom version of the firmware, either replacing the existing control system or adding an additional one as a separate operating mode (depending on what you’re actually trying to achieve).
  2. Alternatively there is also Lua scripting functionality, which can be moved onto and off the vehicle without needing to modify the firmware.

The documentation for these is currently focused on ArduPilot more generally, so may include some information that’s not directly relevant. We’re hoping/intending to improve that documentation quite soon, but have some other higher priorities at the moment.

That depends largely on what you’re trying to achieve, and what functionality you need.

As a simple example, if you don’t need the provided autopilot functionality then you could write a simpler custom control program using our Navigator Library to control the outputs directly.