Compute depth maps for images in the input folder or from QGC video stream.
import os
import torch
import cv2
import argparse
import time
import numpy as np
from midas.model_loader import default_models, load_model
first_execution = True
def process(device, model, model_type, image, input_size, target_size, optimize, use_camera):
global first_execution
if "openvino" in model_type:
if first_execution or not use_camera:
print(f" Input resized to {input_size[0]}x{input_size[1]} before entering the encoder")
first_execution = False
sample = [np.reshape(image, (1, 3, *input_size))]
prediction = model(sample)[model.output(0)][0]
prediction = cv2.resize(prediction, dsize=target_size, interpolation=cv2.INTER_CUBIC)
sample = torch.from_numpy(image).to(device).unsqueeze(0)
if optimize and device == torch.device("cuda"):
if first_execution:
print(" Optimization to half-floats activated. Use with caution, because models like Swin require\n"
" float precision to work properly and may yield non-finite depth values to some extent for\n"
" half-floats.")
sample = sample.to(memory_format=torch.channels_last)
sample = sample.half()
if first_execution or not use_camera:
height, width = sample.shape[2:]
print(f" Input resized to {width}x{height} before entering the encoder")
first_execution = False
prediction = model.forward(sample)
prediction = (
return prediction
def create_side_by_side(image, depth, grayscale):
Combine RGB image and depth map side-by-side for visualization.
depth_min = depth.min()
depth_max = depth.max()
normalized_depth = 255 * (depth - depth_min) / (depth_max - depth_min)
normalized_depth *= 3
right_side = np.repeat(np.expand_dims(normalized_depth, 2), 3, axis=2) / 3
if not grayscale:
right_side = cv2.applyColorMap(np.uint8(right_side), cv2.COLORMAP_INFERNO)
if image is None:
return right_side
return np.concatenate((image, right_side), axis=1)
def run_qgc_stream(output_path, model_path, model_type="dpt_swin2_tiny_256", optimize=False, side=False, height=None,
square=False, grayscale=False, stream_url="udp://"):
Process video stream from QGC (QGroundControl) for depth estimation.
output_path (str): Directory to save output images.
model_path (str): Path to model weights.
model_type (str): Model type.
optimize (bool): Optimize model for half-precision floats on CUDA?
side (bool): Show side-by-side RGB and depth map in output images?
height (int): Preferred image height for inference.
square (bool): Resize input to square?
grayscale (bool): Use grayscale colormap?
stream_url (str): URL of the QGC video stream.
# Select device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device: %s" % device)
model, transform, net_w, net_h = load_model(device, model_path, model_type, optimize, height, square)
# Create output directory
if output_path is not None:
os.makedirs(output_path, exist_ok=True)
print("Start processing video stream from QGC")
with torch.no_grad():
fps = 1
video = cv2.VideoCapture(stream_url) # Connect to QGC video stream
if not video.isOpened():
print("Failed to open video stream.")
time_start = time.time()
frame_index = 0
# Read the first frame to determine resolution
ret, frame = video.read()
if not ret:
raise ValueError("Failed to read video frame. Check your QGC video stream URL.")
# Get frame dimensions
height, width = frame.shape[:2]
# Initialize video writer
if output_path is not None:
video_filename = os.path.join(output_path, f"output_{model_type}.avi")
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(video_filename, fourcc, 20.0, (width, height))
while True:
ret, frame = video.read()
if not ret:
original_image_rgb = np.flip(frame, 2) # Convert BGR to RGB
image = transform({"image": original_image_rgb / 255})["image"]
prediction = process(device, model, model_type, image, (net_w, net_h),
original_image_rgb.shape[1::-1], optimize, True)
# Generate side-by-side visualization
original_image_bgr = np.flip(original_image_rgb, 2) if side else None
content = create_side_by_side(original_image_bgr, prediction, grayscale)
cv2.imshow('QGC Depth Estimation - Press ESC to close', content / 255)
# Write video output
if output_path is not None:
frame_to_write = (np.clip(content, 0, 1) * 255).astype(np.uint8)
if frame_to_write.shape[1::-1] != (width, height): # Resize frame if needed
frame_to_write = cv2.resize(frame_to_write, (width, height))
# Update FPS
alpha = 0.1
if time.time() - time_start > 0:
fps = (1 - alpha) * fps + alpha * 1 / (time.time() - time_start)
time_start = time.time()
print(f"\rFPS: {round(fps, 2)}", end="")
# Check for ESC key
if cv2.waitKey(1) == 27 or cv2.getWindowProperty(
'QGC Depth Estimation - Press ESC to close', cv2.WND_PROP_VISIBLE) < 1:
frame_index += 1
# Release resources
if output_path is not None:
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output_path',
help='Directory to save output images.')
parser.add_argument('-m', '--model_weights',
help='Path to trained model weights.')
parser.add_argument('-t', '--model_type',
help='Model type: dpt_beit_large_512, dpt_swin2_large_384, etc.')
parser.add_argument('--optimize', dest='optimize', action='store_true', help='Optimize model for half-precision.')
parser.add_argument('--height', type=int, default=None,
help='Preferred image height for inference.')
parser.add_argument('--square', action='store_true', help='Resize input to square.')
parser.add_argument('--grayscale', action='store_true', help='Use grayscale colormap.')
parser.add_argument('--stream_url', type=str, default="udp://",
help='QGC video stream URL (e.g., udp://')
args = parser.parse_args()
if args.model_weights is None:
args.model_weights = default_models[args.model_type]
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
# Run depth estimation
run_qgc_stream(args.output_path, args.model_weights, args.model_type, args.optimize,
args.height, args.square, args.grayscale, args.stream_url)