Vision-based obstacle avoidance in autonomous mode

Hello guys, I am trying to develop new capabilities for ROV by my own( Raspberry PI 3 + pihawk2.4.8 ), such as vision-based obstacle avoidance.
The basic idea is like, reference to drones, in autonomous mode ROV will follow a series of pre-planned waypoints, it will adjust it’s waypoint to avoid collision when it recognize the obstacle by depth camera.

The problems are:

  1. Is the depth camera available in underwater environment( let’s suppose ROV drive in clear water like swimming pool)? I heard about the infrared light peforms badly and the camera is really expensive.
  2. Is one camera enough? In my plan the ROV should recognize a object( like humanbeing, fishes ) besides get depth information of obstacle.
  3. Where can I custom obstacle avoidance algorithm if I get the depth information successfully? In Raspberry PI or pixhawk? Can anybody show me the specific position of code file? It seems like I need to use these information to re-planned local waypoints.

I’ ve tried to find these answers in the Interent but get nothing, and I’m a new guy in ROV developments. Thank you very much for your help!

1 Like

Hi @Alex001, welcome to the forum :slight_smile:

To follow waypoints the vehicle needs a position estimate, which requires some form of positioning sensor.

You’ll likely need to use a stereo camera for this, and get the depth from the stereo difference. The camera itself can be inside an enclosure, so doesn’t necessarily need to have its own waterproofing.

There’s a thread here about OAK cameras, which may be of interest.

That depends what approach you’re taking. A stereo camera has (at least) two cameras. If you’re doing your own stereo differencing then that could be two separate cameras, or you could have a package that includes the cameras and does the differencing for you.

Object detection requires some form of classification model. It may be possible to run that on the Raspberry Pi at low framerates and resolutions, but that kind of functionality is also possible on an OAK camera. I’d recommend avoiding excessive processing on the Raspberry Pi, because it could overheat and end up throttling the performance of the vehicle operation components.

ArduPilot has support for this kind of thing, although I don’t believe it has been transferred to ArduSub yet.

hey there i hope you are doing good, how is the update on your vision based obstacle avoidance in autonomous mode and can you please share the code that you have implemented for your auv.
Thank you
best regards,
Sharath

Hello, I am also doing related research (visual obstacle avoidance navigation), and I plan to use midas for underwater depth estimation (in a swimming pool I built myself), but an error occurred when I used the following code to capture the video stream of QGC:
I would be grateful if you could help me

"""
Compute depth maps for images in the input folder or from QGC video stream.
"""
import os
import torch
import cv2
import argparse
import time

import numpy as np
from midas.model_loader import default_models, load_model

first_execution = True

def process(device, model, model_type, image, input_size, target_size, optimize, use_camera):
    global first_execution

    if "openvino" in model_type:
        if first_execution or not use_camera:
            print(f"    Input resized to {input_size[0]}x{input_size[1]} before entering the encoder")
            first_execution = False

        sample = [np.reshape(image, (1, 3, *input_size))]
        prediction = model(sample)[model.output(0)][0]
        prediction = cv2.resize(prediction, dsize=target_size, interpolation=cv2.INTER_CUBIC)
    else:
        sample = torch.from_numpy(image).to(device).unsqueeze(0)

        if optimize and device == torch.device("cuda"):
            if first_execution:
                print("  Optimization to half-floats activated. Use with caution, because models like Swin require\n"
                      "  float precision to work properly and may yield non-finite depth values to some extent for\n"
                      "  half-floats.")
            sample = sample.to(memory_format=torch.channels_last)
            sample = sample.half()

        if first_execution or not use_camera:
            height, width = sample.shape[2:]
            print(f"    Input resized to {width}x{height} before entering the encoder")
            first_execution = False

        prediction = model.forward(sample)
        prediction = (
            torch.nn.functional.interpolate(
                prediction.unsqueeze(1),
                size=target_size[::-1],
                mode="bicubic",
                align_corners=False,
            )
            .squeeze()
            .cpu()
            .numpy()
        )

    return prediction


def create_side_by_side(image, depth, grayscale):
    """
    Combine RGB image and depth map side-by-side for visualization.
    """
    depth_min = depth.min()
    depth_max = depth.max()
    normalized_depth = 255 * (depth - depth_min) / (depth_max - depth_min)
    normalized_depth *= 3

    right_side = np.repeat(np.expand_dims(normalized_depth, 2), 3, axis=2) / 3
    if not grayscale:
        right_side = cv2.applyColorMap(np.uint8(right_side), cv2.COLORMAP_INFERNO)

    if image is None:
        return right_side
    else:
        return np.concatenate((image, right_side), axis=1)


def run_qgc_stream(output_path, model_path, model_type="dpt_swin2_tiny_256", optimize=False, side=False, height=None,
                   square=False, grayscale=False, stream_url="udp://192.168.2.1:5600"):
    """
    Process video stream from QGC (QGroundControl) for depth estimation.

    Args:
        output_path (str): Directory to save output images.
        model_path (str): Path to model weights.
        model_type (str): Model type.
        optimize (bool): Optimize model for half-precision floats on CUDA?
        side (bool): Show side-by-side RGB and depth map in output images?
        height (int): Preferred image height for inference.
        square (bool): Resize input to square?
        grayscale (bool): Use grayscale colormap?
        stream_url (str): URL of the QGC video stream.
    """
    print("Initialize")

    # Select device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Device: %s" % device)

    model, transform, net_w, net_h = load_model(device, model_path, model_type, optimize, height, square)

    # Create output directory
    if output_path is not None:
        os.makedirs(output_path, exist_ok=True)

    print("Start processing video stream from QGC")

    with torch.no_grad():
        fps = 1
        video = cv2.VideoCapture(stream_url)  # Connect to QGC video stream
        if not video.isOpened():
            print("Failed to open video stream.")
            exit(1)
        time_start = time.time()

        frame_index = 0

        # Read the first frame to determine resolution
        ret, frame = video.read()
        if not ret:
            raise ValueError("Failed to read video frame. Check your QGC video stream URL.")

        # Get frame dimensions
        height, width = frame.shape[:2]

        # Initialize video writer
        if output_path is not None:
            video_filename = os.path.join(output_path, f"output_{model_type}.avi")
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            video_writer = cv2.VideoWriter(video_filename, fourcc, 20.0, (width, height))

        while True:
            ret, frame = video.read()
            if not ret:
                break

            original_image_rgb = np.flip(frame, 2)  # Convert BGR to RGB
            image = transform({"image": original_image_rgb / 255})["image"]
            prediction = process(device, model, model_type, image, (net_w, net_h),
                                 original_image_rgb.shape[1::-1], optimize, True)

            # Generate side-by-side visualization
            original_image_bgr = np.flip(original_image_rgb, 2) if side else None
            content = create_side_by_side(original_image_bgr, prediction, grayscale)
            cv2.imshow('QGC Depth Estimation - Press ESC to close', content / 255)

            # Write video output
            if output_path is not None:
                frame_to_write = (np.clip(content, 0, 1) * 255).astype(np.uint8)
                if frame_to_write.shape[1::-1] != (width, height):  # Resize frame if needed
                    frame_to_write = cv2.resize(frame_to_write, (width, height))
                video_writer.write(frame_to_write)

            # Update FPS
            alpha = 0.1
            if time.time() - time_start > 0:
                fps = (1 - alpha) * fps + alpha * 1 / (time.time() - time_start)
                time_start = time.time()
            print(f"\rFPS: {round(fps, 2)}", end="")

            # Check for ESC key
            if cv2.waitKey(1) == 27 or cv2.getWindowProperty(
                    'QGC Depth Estimation - Press ESC to close', cv2.WND_PROP_VISIBLE) < 1:
                break

            frame_index += 1

        # Release resources
        if output_path is not None:
            video_writer.release()
        video.release()

    print("\nFinished")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    parser.add_argument('-o', '--output_path',
                        default='output/qgc',
                        help='Directory to save output images.')

    parser.add_argument('-m', '--model_weights',
                        default=None,
                        help='Path to trained model weights.')

    parser.add_argument('-t', '--model_type',
                        default='dpt_swin2_tiny_256',
                        help='Model type: dpt_beit_large_512, dpt_swin2_large_384, etc.')

    parser.add_argument('--optimize', dest='optimize', action='store_true', help='Optimize model for half-precision.')
    parser.set_defaults(optimize=False)

    parser.add_argument('--height', type=int, default=None,
                        help='Preferred image height for inference.')
    parser.add_argument('--square', action='store_true', help='Resize input to square.')
    parser.add_argument('--grayscale', action='store_true', help='Use grayscale colormap.')
    parser.add_argument('--stream_url', type=str, default="udp://192.168.2.1:5600",
                        help='QGC video stream URL (e.g., udp://192.168.2.1:5600).')

    args = parser.parse_args()
    if args.model_weights is None:
        args.model_weights = default_models[args.model_type]

    torch.backends.cudnn.enabled = True
    torch.backends.cudnn.benchmark = True

    # Run depth estimation
    run_qgc_stream(args.output_path, args.model_weights, args.model_type, args.optimize,
                   args.height, args.square, args.grayscale, args.stream_url)