Monitor Status
The Code
#! /usr/bin/env python3
import rospy
from cpr_outdoornav_api_examples_lib.ros_node import RosNode
from cpr_outdoornav_api_examples_lib.waypoint import Waypoint
from cpr_outdoornav_api_examples_lib.mission import Mission
from cpr_outdoornav_api_examples_lib.platform_status import PlatformStatusMonitor
from cpr_outdoornav_api_examples_lib.control_status import ControlStatusMonitor
from cpr_outdoornav_api_examples_lib.localization_status import LocalizationStatusMonitor
from cpr_outdoornav_api_examples_lib.navigation_status import NavigationStatusMonitor
class MonitorStatus(RosNode):
"""Run a simple mission and report status throughout.
The coordinates used in this example are based on the cpr_agriculture_gazebo
world and should be updated to match the location in which the user's
robot will be operating.
"""
def __init__(self):
"""Initialize the mission details and server connection."""
RosNode.__init__(self, 'monitor_status')
waypoints = [
Waypoint("A", "uuid-waypoint-1", 50.1094938, -97.3191085),
Waypoint("B", "uuid-waypoint-2", 50.1095100, -97.3192000),
Waypoint("C", "uuid-waypoint-3", 50.1095255, -97.3192484),
]
self.mission = Mission("Monitor status mission", "uuid-mission-1", waypoints)
self._platform_status = PlatformStatusMonitor()
self._control_status = ControlStatusMonitor()
self._localization_status = LocalizationStatusMonitor()
self._navigation_status = NavigationStatusMonitor()
def _reportStatus(self):
"""Report the current status."""
rospy.loginfo("--------------------------------------------------------")
self._platform_status.report()
self._control_status.report()
self._localization_status.report()
self._navigation_status.report()
def run(self):
"""Execute the mission and report status."""
if not self.mission.startMission():
rospy.logerr("Failed to start mission")
return False
while not self.mission.isMissionComplete():
self._reportStatus()
rospy.sleep(1.0)
return self.mission.getMissionSuccess()
if __name__ == '__main__':
if MonitorStatus().run():
print("Mission completed successfully")
else:
print("Mission failed")
The Code Explained
import rospy
from cpr_outdoornav_api_examples_lib.ros_node import RosNode
from cpr_outdoornav_api_examples_lib.waypoint import Waypoint
from cpr_outdoornav_api_examples_lib.mission import Mission
from cpr_outdoornav_api_examples_lib.platform_status import PlatformStatusMonitor
from cpr_outdoornav_api_examples_lib.control_status import ControlStatusMonitor
from cpr_outdoornav_api_examples_lib.localization_status import LocalizationStatusMonitor
from cpr_outdoornav_api_examples_lib.navigation_status import NavigationStatusMonitor
Import the RosNode
, Waypoint
, Mission
classes which will be used to create the mission to be executed. We also import the
PlatformStatusMonitor
, ControlStatusMonitor
, LocalizationStatusMonitor
, NavigationStatusMonitor
classes which are set up to monitor the topics
relavant to the platform, localization, contrle selection an navigation modules respectively.
waypoints = [
Waypoint("A", "uuid-waypoint-1", 50.1094938, -97.3191085),
Waypoint("B", "uuid-waypoint-2", 50.1095100, -97.3192000),
Waypoint("C", "uuid-waypoint-3", 50.1095255, -97.3192484),
]
self.mission = Mission("Monitor status mission", "uuid-mission-1", waypoints)
After initializing the ROS node, we create a mission manually by first creating a list of Waypoint objects and then adding then using the waypoint list to construct the Mission object.
self._platform_status = PlatformStatusMonitor()
self._control_status = ControlStatusMonitor()
self._localization_status = LocalizationStatusMonitor()
self._navigation_status = NavigationStatusMonitor()
We then initialize the platform, control, localization and navigation monitors, which subscribes to several API topics.
def _reportStatus(self):
"""Report the current status."""
rospy.loginfo("--------------------------------------------------------")
self._platform_status.report()
self._control_status.report()
self._localization_status.report()
self._navigation_status.report()
The _reportStatus()
function outputs the results of the monitors to the ROS logs. This includes both the internal ROS logs as well as terminal output.
def run(self):
"""Execute the mission and report status."""
if not self.mission.startMission():
rospy.logerr("Failed to start mission")
return False
while not self.mission.isMissionComplete():
self._reportStatus()
rospy.sleep(1.0)
return self.mission.getMissionSuccess()
The run()
function starts the mission and checks for completion. Every seconds we run the reportStatus()
function to output the status information.
Once complete we terminate the example code.
if __name__ == '__main__':
if MonitorStatus().run():
print("Mission completed successfully")
else:
print("Mission failed")
The main python execution block. We first initialize the example class (runs the init function) and then execute the run() function which executes the mission and outputs the status information.