Python脚本接收ROS sensor_msgs/msg/CompressedImage并显示
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import cv2
import numpy as np
class CompressedImageSubscriber(Node):
def __init__(self):
super().__init__('compressed_image_subscriber')
self.subscription = self.create_subscription(
CompressedImage,
'/image/detected/front_01', # Replace with your actual topic name
self.image_callback,
10)
self.subscription # prevent unused variable warning
def image_callback(self, msg):
self.get_logger().info(f'Received compressed image with format: {msg.format}')
try:
# Convert the byte array to a numpy array
np_arr = np.frombuffer(msg.data, np.uint8)
# Decode the compressed image data using OpenCV
# The '1' flag means to load the image in color (BGR format for OpenCV)
cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if cv_image is not None:
# Display the image in a window
cv2.imshow("ROS2 Compressed Image View", cv_image)
cv2.waitKey(1) # Refresh the window every 1ms
else:
self.get_logger().error("Failed to decode image")
except Exception as e:
self.get_logger().error(f"Error processing image: {e}")
def main(args=None):
rclpy.init(args=args)
compressed_image_subscriber = CompressedImageSubscriber()
rclpy.spin(compressed_image_subscriber)
# Destroy the node explicitly
compressed_image_subscriber.destroy_node()
rclpy.shutdown()
cv2.destroyAllWindows() # Close all OpenCV windows
if __name__ == '__main__':
main()
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import cv2
import numpy as np
class CompressedImageSubscriber(Node):
def __init__(self):
super().__init__('compressed_image_subscriber')
self.subscription = self.create_subscription(
CompressedImage,
'/image/detected/front_01', # Replace with your actual topic name
self.image_callback,
10)
self.subscription # prevent unused variable warning
def image_callback(self, msg):
self.get_logger().info(f'Received compressed image with format: {msg.format}')
try:
# Convert the byte array to a numpy array
np_arr = np.frombuffer(msg.data, np.uint8)
# Decode the compressed image data using OpenCV
# The '1' flag means to load the image in color (BGR format for OpenCV)
cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if cv_image is not None:
# Display the image in a window
cv2.imshow("ROS2 Compressed Image View", cv_image)
cv2.waitKey(1) # Refresh the window every 1ms
else:
self.get_logger().error("Failed to decode image")
except Exception as e:
self.get_logger().error(f"Error processing image: {e}")
def main(args=None):
rclpy.init(args=args)
compressed_image_subscriber = CompressedImageSubscriber()
rclpy.spin(compressed_image_subscriber)
# Destroy the node explicitly
compressed_image_subscriber.destroy_node()
rclpy.shutdown()
cv2.destroyAllWindows() # Close all OpenCV windows
if __name__ == '__main__':
main()
评论已关闭