"""
Compute depth maps for images in the input folder or from QGC video stream.
"""
import os
import torch
import cv2
import argparse
import time
import numpy as np
from midas.model_loader import default_models, load_model
first_execution = True
def process(device, model, model_type, image, input_size, target_size, optimize, use_camera):
global first_execution
if "openvino" in model_type:
if first_execution or not use_camera:
print(f" Input resized to {input_size[0]}x{input_size[1]} before entering the encoder")
first_execution = False
sample = [np.reshape(image, (1, 3, *input_size))]
prediction = model(sample)[model.output(0)][0]
prediction = cv2.resize(prediction, dsize=target_size, interpolation=cv2.INTER_CUBIC)
else:
sample = torch.from_numpy(image).to(device).unsqueeze(0)
if optimize and device == torch.device("cuda"):
if first_execution:
print(" Optimization to half-floats activated. Use with caution, because models like Swin require\n"
" float precision to work properly and may yield non-finite depth values to some extent for\n"
" half-floats.")
sample = sample.to(memory_format=torch.channels_last)
sample = sample.half()
if first_execution or not use_camera:
height, width = sample.shape[2:]
print(f" Input resized to {width}x{height} before entering the encoder")
first_execution = False
prediction = model.forward(sample)
prediction = (
torch.nn.functional.interpolate(
prediction.unsqueeze(1),
size=target_size[::-1],
mode="bicubic",
align_corners=False,
)
.squeeze()
.cpu()
.numpy()
)
return prediction
def create_side_by_side(image, depth, grayscale):
"""
Combine RGB image and depth map side-by-side for visualization.
"""
depth_min = depth.min()
depth_max = depth.max()
normalized_depth = 255 * (depth - depth_min) / (depth_max - depth_min)
normalized_depth *= 3
right_side = np.repeat(np.expand_dims(normalized_depth, 2), 3, axis=2) / 3
if not grayscale:
right_side = cv2.applyColorMap(np.uint8(right_side), cv2.COLORMAP_INFERNO)
if image is None:
return right_side
else:
return np.concatenate((image, right_side), axis=1)
def run_qgc_stream(output_path, model_path, model_type="dpt_swin2_tiny_256", optimize=False, side=False, height=None,
square=False, grayscale=False, stream_url="udp://192.168.2.1:5600"):
"""
Process video stream from QGC (QGroundControl) for depth estimation.
Args:
output_path (str): Directory to save output images.
model_path (str): Path to model weights.
model_type (str): Model type.
optimize (bool): Optimize model for half-precision floats on CUDA?
side (bool): Show side-by-side RGB and depth map in output images?
height (int): Preferred image height for inference.
square (bool): Resize input to square?
grayscale (bool): Use grayscale colormap?
stream_url (str): URL of the QGC video stream.
"""
print("Initialize")
# Select device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device: %s" % device)
model, transform, net_w, net_h = load_model(device, model_path, model_type, optimize, height, square)
# Create output directory
if output_path is not None:
os.makedirs(output_path, exist_ok=True)
print("Start processing video stream from QGC")
with torch.no_grad():
fps = 1
video = cv2.VideoCapture(stream_url) # Connect to QGC video stream
if not video.isOpened():
print("Failed to open video stream.")
exit(1)
time_start = time.time()
frame_index = 0
# Read the first frame to determine resolution
ret, frame = video.read()
if not ret:
raise ValueError("Failed to read video frame. Check your QGC video stream URL.")
# Get frame dimensions
height, width = frame.shape[:2]
# Initialize video writer
if output_path is not None:
video_filename = os.path.join(output_path, f"output_{model_type}.avi")
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(video_filename, fourcc, 20.0, (width, height))
while True:
ret, frame = video.read()
if not ret:
break
original_image_rgb = np.flip(frame, 2) # Convert BGR to RGB
image = transform({"image": original_image_rgb / 255})["image"]
prediction = process(device, model, model_type, image, (net_w, net_h),
original_image_rgb.shape[1::-1], optimize, True)
# Generate side-by-side visualization
original_image_bgr = np.flip(original_image_rgb, 2) if side else None
content = create_side_by_side(original_image_bgr, prediction, grayscale)
cv2.imshow('QGC Depth Estimation - Press ESC to close', content / 255)
# Write video output
if output_path is not None:
frame_to_write = (np.clip(content, 0, 1) * 255).astype(np.uint8)
if frame_to_write.shape[1::-1] != (width, height): # Resize frame if needed
frame_to_write = cv2.resize(frame_to_write, (width, height))
video_writer.write(frame_to_write)
# Update FPS
alpha = 0.1
if time.time() - time_start > 0:
fps = (1 - alpha) * fps + alpha * 1 / (time.time() - time_start)
time_start = time.time()
print(f"\rFPS: {round(fps, 2)}", end="")
# Check for ESC key
if cv2.waitKey(1) == 27 or cv2.getWindowProperty(
'QGC Depth Estimation - Press ESC to close', cv2.WND_PROP_VISIBLE) < 1:
break
frame_index += 1
# Release resources
if output_path is not None:
video_writer.release()
video.release()
print("\nFinished")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output_path',
default='output/qgc',
help='Directory to save output images.')
parser.add_argument('-m', '--model_weights',
default=None,
help='Path to trained model weights.')
parser.add_argument('-t', '--model_type',
default='dpt_swin2_tiny_256',
help='Model type: dpt_beit_large_512, dpt_swin2_large_384, etc.')
parser.add_argument('--optimize', dest='optimize', action='store_true', help='Optimize model for half-precision.')
parser.set_defaults(optimize=False)
parser.add_argument('--height', type=int, default=None,
help='Preferred image height for inference.')
parser.add_argument('--square', action='store_true', help='Resize input to square.')
parser.add_argument('--grayscale', action='store_true', help='Use grayscale colormap.')
parser.add_argument('--stream_url', type=str, default="udp://192.168.2.1:5600",
help='QGC video stream URL (e.g., udp://192.168.2.1:5600).')
args = parser.parse_args()
if args.model_weights is None:
args.model_weights = default_models[args.model_type]
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
# Run depth estimation
run_qgc_stream(args.output_path, args.model_weights, args.model_type, args.optimize,
args.height, args.square, args.grayscale, args.stream_url)
The above is the code I used to get the QGC video stream using a python script (using midas for depth estimation), but it always reports the following error, which puzzles me. Is it because UDP:5600 is occupied by the QGC port that the video stream cannot be captured? If so, how can I capture the video stream correctly?