Skip to main content
Version: Next

Mission with Custom Tasks

Before being able to run the examples similar to the one explained below, it is required that you have written your own custom tasks. See the Custom Tasks section for information on how to develop tasks for your application.

The Code

#! /usr/bin/env python3

import rospy
import actionlib
from cpr_outdoornav_api_examples_lib.ros_node import RosNode
from cpr_outdoornav_api_examples_lib.waypoint import Waypoint
from cpr_outdoornav_api_examples_lib.mission import Mission
from clearpath_navigation_msgs.msg import Task, UITaskResult, UITaskAction


CUSTOM_ACTION_NAME = "/record_gnss"
UNSPECIFIED_HEADING = -1


class MissionWithCustomTask(RosNode):
"""Create and run a mission with 3 waypoints, each of which executes a custom task.

Our goal is to set up 3 waypoints, then create a mission such that the robot drives
to each waypoint in order. In addition, at each of the waypoints, a custom task will be
called to record the timestamp, goal name, and GPS coordinates. Since this is a custom task,
it is necessary to create an actionlib server to implement the custom task.

The main component of this example is the mission builder, which builds up the set of goals,
including information on the custom tasks, and runs the mission, to execute the custom task.

The coordinates used in this example are based on the cpr_agriculture_gazebo
world and should be updated to match the location in which the user's
robot will be operating.
"""

class MissionBuilder:
"""Creates and runs the mission, including custom tasks."""

def __init__(self):
"""Creates the mission with 3 waypoints and custom tasks."""

# First waypoint, with custom task
waypoint_a_name = "A"
custom_task_a = Task()
custom_task_a.name = "my_custom_task_a" # choose any name here
custom_task_a.action_server_name = CUSTOM_ACTION_NAME # must match server name
custom_task_a.version = "1.0" # choose any version
custom_task_a.floats = [1000] # param: unique ID
custom_task_a.strings = [waypoint_a_name] # param: goal name

# Second waypoint, with custom task
waypoint_b_name = "B"
custom_task_b = Task()
custom_task_b.name = "my_custom_task_b" # choose any name here
custom_task_b.action_server_name = CUSTOM_ACTION_NAME # must match server name
custom_task_b.version = "1.0" # choose any version
custom_task_b.floats = [1001] # param: unique ID
custom_task_b.strings = [waypoint_b_name] # param: goal name

# Third waypoint, with custom task
waypoint_c_name = "C"
custom_task_c = Task()
custom_task_c.name = "my_custom_task_c" # choose any name here
custom_task_c.action_server_name = CUSTOM_ACTION_NAME # must match server name
custom_task_c.version = "1.0" # choose any version
custom_task_c.floats = [1002] # param: unique ID
custom_task_c.strings = [waypoint_c_name] # param: goal name

waypoints = [
Waypoint(waypoint_a_name, "uuid-waypoint-01", 43.50076203007681, -80.546296281104, UNSPECIFIED_HEADING),
Waypoint(waypoint_b_name, "uuid-waypoint-02", 43.50073600330896, -80.54621215801687, UNSPECIFIED_HEADING, [custom_task_b]),
Waypoint(waypoint_c_name, "uuid-waypoint-03", 43.50067256664828, -80.5462159469465, UNSPECIFIED_HEADING, [custom_task_c]),
]

# Build mission from two waypoints with custom tasks
self._mission = Mission("Custom task mission", "uuid-mission-1", waypoints)

def getMission(self):
"""Gets a reference to the mission.

Returns
-------
A reference to the mission.
"""
return self._mission

def __init__(self):
"""Initializes ROS, the custom task server, GPS subscriber and mission."""

RosNode.__init__(self, 'mission_with_custom_task')
self._mission_builder = MissionWithCustomTask.MissionBuilder()


def run(self):
"""Execute the mission.

This function blocks until the mission is complete.

Returns
-------
True on successful execution of the mission; else False on any error.
"""

if not self._mission_builder.getMission().startMission():
return False
while not self._mission_builder.getMission().isMissionComplete():
rospy.sleep(1.0)
return self._mission_builder.getMission().getMissionSuccess()


if __name__ == '__main__':
if MissionWithCustomTask().run():
print("Mission completed successfully")
else:
print("Mission failed")
rospy.signal_shutdown('Done')

The Code Explained

import rospy
import actionlib
from cpr_outdoornav_api_examples_lib.ros_node import RosNode
from cpr_outdoornav_api_examples_lib.waypoint import Waypoint
from cpr_outdoornav_api_examples_lib.mission import Mission
from clearpath_navigation_msgs.msg import Task, UITaskResult, UITaskAction

Import the RosNode, Waypoint and Mission classes which will be used to create the mission to be executed. We also import the Task and UITask messages which are used to generate the action servers.

    class MissionBuilder:
"""Creates and runs the mission, including custom tasks."""

def __init__(self):
"""Creates the mission with 3 waypoints and custom tasks."""

# First waypoint, with custom task
waypoint_a_name = "A"
custom_task_a = Task()
custom_task_a.name = "my_custom_task_a" # choose any name here
custom_task_a.action_server_name = CUSTOM_ACTION_NAME # must match server name
custom_task_a.version = "1.0" # choose any version
custom_task_a.floats = [1000] # param: unique ID
custom_task_a.strings = [waypoint_a_name] # param: goal name

# Second waypoint, with custom task
waypoint_b_name = "B"
custom_task_b = Task()
custom_task_b.name = "my_custom_task_b" # choose any name here
custom_task_b.action_server_name = CUSTOM_ACTION_NAME # must match server name
custom_task_b.version = "1.0" # choose any version
custom_task_b.floats = [1001] # param: unique ID
custom_task_b.strings = [waypoint_b_name] # param: goal name

# Third waypoint, with custom task
waypoint_c_name = "C"
custom_task_c = Task()
custom_task_c.name = "my_custom_task_c" # choose any name here
custom_task_c.action_server_name = CUSTOM_ACTION_NAME # must match server name
custom_task_c.version = "1.0" # choose any version
custom_task_c.floats = [1002] # param: unique ID
custom_task_c.strings = [waypoint_c_name] # param: goal name


waypoints = [
Waypoint(waypoint_a_name, "uuid-waypoint-01", 50.1095255, -97.3192484, UNSPECIFIED_HEADING, [custom_task_a]),
Waypoint(waypoint_b_name, "uuid-waypoint-02", 50.1094938, -97.3191085, UNSPECIFIED_HEADING, [custom_task_b]),
Waypoint(waypoint_c_name, "uuid-waypoint-03", 50.1094600, -97.3192100, UNSPECIFIED_HEADING, [custom_task_c]),
]

# Build mission from two waypoints with custom tasks
self._mission = Mission("Custom task mission", "uuid-mission-1", waypoints)

We create the MissionBuilder subclass that will create the mission to be executed. We initialize custom task objects where we specify the action_server_name field with the specific action of the custom task. These are then added to the Waypoints as a list of tasks and the Mission object is created.

    def __init__(self):
"""Initializes ROS, the custom task server, GPS subscriber and mission."""

RosNode.__init__(self, 'mission_with_custom_task')
self._mission_builder = MissionWithCustomTask.MissionBuilder()

We initialize the example class by starting a ROS node and initialize the mission to be executed.

    def run(self):
"""Execute the mission.

This function blocks until the mission is complete.

Returns
-------
True on successful execution of the mission; else False on any error.
"""

if not self._mission_builder.getMission().startMission():
return False
while not self._mission_builder.getMission().isMissionComplete():
time.sleep(1.0)
return self._mission_builder.getMission().getMissionSuccess()

The run() function starts the mission and checks for completion. Once complete we terminate the example code.

if __name__ == '__main__':
if MissionWithCustomTask().run():
print("Mission completed successfully")
else:
print("Mission failed")
rospy.signal_shutdown('Done')

The main python execution block. We first initialize the example class (runs the init function) and then execute the run() function which executes the mission and outputs the status information.