Skip to main content
Version: 0.9.0

Custom Tasks

Users can create their custom tasks for their application following a specific template. When creating these tasks, the user should begin by creating a python file in the ~/cpr_outdoornav_launch/custom_tasks directory. The file should be written following the instructions provided below:

  1. Import the custom_task_base package.
#!/usr/bin/python3

from onav_tasks.custom_task_base import *
  1. The user should then create a class name to replace CustomTask and initialize it with the CustomTaskBase's init function and the action server name for the task.
class CustomTask(CustomTaskBase):
def __init__(self):
# The derived class must always call the super() and provide the action server name.
# This name will need to be unique among custom tasks and must match what is in the
# UI.
super().__init__("custom_task_name")
note

The CustomTaskBase exposes a SimpleActionServer (see here for further details) object as _as, a UITaskFeedback object as _feedback and a UITaskResults object as _result to be used as part of the tasks functionality.

  1. The last requirement is that the CustomTask needs to have the run_task(self, goal) function be defined, which takes in the UITaskGoal.
  def run_task(self, goal):
note

When running the task users may handle errors through the action servers set_aborted() function. When this function is called the entire mission will be aborted.

  1. Restarting the UGV will trigger the system to load the custom task that was created, making it available for mission use. If a custom task is not configured properly the custom task manager will ignore the task and proceed loading in the next task while logging an error with ROS.

Sample Custom Tasks

Input Looper

Below is a sample template file named "input_looper.py" which iterates throught the two data variables and publishes them to the feedback topic. If neither of the variables have any data in them the task is aborted.

#!/usr/bin/python3

from onav_tasks.custom_task_base import *

class InputLooperTask(CustomTaskBase):
def __init__(self):
# The derived class must always call the super() and provide the action server name.
# This name will need to be unique among custom tasks and must match what is in the
# UI.
super().__init__("input_looper")

def run_task(self, goal):
if len(goal.strings) == 0 and len(goal.floats) == 0:
# Task and running mission will be aborted in this case
self._as.set_aborted()
return False

# Loop through the strings and float values and publish them each to the /input_looper/feedback topic
for string in goal.strings:
self._feedback.state = string
self._as.publish_feedback(self._feedback)

for num in goal.floats:
self._feedback.state = str(num)
self._as.publish_feedback(self._feedback)

# Returning True or False will not currently impact the mission but will write the current state to the
# /task/result topic accordingly.
return True

Record GNSS Data

Below is a sample custom task file named "record_gnss.py" which outputs the current GNSS data to the console.

#!/usr/bin/python3

from onav_tasks.custom_task_base import *
from sensor_msgs.msg import NavSatFix
from threading import Lock
import rospy

class RecorGNSSTask(CustomTaskBase):
def __init__(self):
super().__init__("record_gnss")
self._sub = rospy.Subscriber("/sensors/gps_0/fix", NavSatFix, self.gpsSubscriberCallback)
self.gps_lat = 0.0
self.gps_lon = 0.0
self._gps_coordinates_lock = Lock()

def run_task(self, goal):
feedback = UITaskFeedback()
feedback.state = 'Recording GNSS lat/lon'
self._as.publish_feedback(feedback)
msg = ""
with self._gps_coordinates_lock:
msg = "ID: %f Name: %s Latitude: %f Longitude: %f" % (
goal.floats[0], goal.strings[0], self.gps_lat, self.gps_lon)
rospy.loginfo(msg)
return True

def gpsSubscriberCallback(self, msg):
with self._gps_coordinates_lock:
self.gps_lat = msg.latitude
self.gps_lon = msg.longitude

Move PTZ camera to a Lat/Lon

Below is a more advanced custom task. The file is "move_ptz_lat_lon.py" which pans a PTZ camera to point to a specific lat/lon coordinate.

from onav_tasks.custom_task_base import *
import actionlib
from clearpath_localization_msgs.srv import *
from clearpath_navigation_msgs.msg import *
from nav_msgs.msg import Odometry
from ptz_action_server_msgs.msg import PtzAction
import ptz_action_server_msgs.msg
import math
from math import remainder, tau
import rospy
from sensor_msgs import *
from tf.transformations import euler_from_quaternion, quaternion_from_euler



class MovePtzLatLon(CustomTaskBase):
def __init__(self):
super().__init__("move_ptz_lat_lon")
self.localization_subscriber_ = rospy.Subscriber("/localization/odom", Odometry, self.localizationCallback)
self.move_ptz_client_ = actionlib.SimpleActionClient('/sensors/camera_0/move_ptz', PtzAction)
self.service_ = rospy.ServiceProxy('/localization/lat_lon_to_xy', ConvertLatLonToCartesian)
self.current_pose = Odometry()

def localizationCallback(self, odom_msg):
self.current_pose = odom_msg

def run_task(self, goal):
if len(goal.strings) == 0 and len(goal.floats) == 0:
rospy.logwarn('Warning')
self._as.set_aborted()
return False
goal_latitude = goal.floats[0]
goal_longitude = goal.floats[1]
goal_zoom = goal.floats[2]
str2 = 'Received goal latitude: ' + str(goal_latitude) + ', goal longitude: ' + str(goal_longitude) + ', zoom: ' + str(goal_zoom)
feedback = UITaskFeedback()
feedback.state = 'Aiming camera at lat-lon (' + str(goal_latitude) + ', ' + str(goal_longitude)+')'
self._as.publish_feedback(feedback)
orientation_q = self.current_pose.pose.pose.orientation
orientation_list = [orientation_q.x, orientation_q.y, orientation_q.z, orientation_q.w]
(roll, pitch, yaw) = euler_from_quaternion (orientation_list)

gps_msg = sensor_msgs.msg.NavSatFix()
gps_msg.latitude = goal_latitude
gps_msg.longitude = goal_longitude
goal_utm = self.service_(gps_msg)

goal_x = goal_utm.pose.pose.position.x
goal_y = goal_utm.pose.pose.position.y

goal_angle = math.atan2(goal_y - self.current_pose.pose.pose.position.y, goal_x - self.current_pose.pose.pose.position.x)
pan_angle = math.remainder(goal_angle - yaw, math.tau)
print(pan_angle)

self.move_ptz_client_.wait_for_server()
goal = ptz_action_server_msgs.msg.PtzGoal()
goal.pan=pan_angle
goal.tilt=0
goal.zoom=goal_zoom
self.move_ptz_client_.send_goal(goal)
self.move_ptz_client_.wait_for_result()
print(self.move_ptz_client_.get_result())
return True