Custom Tasks
Users can create their custom tasks for their application following a specific template. When creating these tasks, the user should begin by creating a python file in the /opt/onav/<onav_version>/app/custom_tasks/ directory, where onav_version is the currently running version of OutdoorNav. The file should be written following the instructions provided below:
- Import the
custom_task_base
package.
#!/usr/bin/python3
from onav_tasks.custom_task_base import *
- The user should then create a class name to replace
CustomTask
and initialize it with theCustomTaskBase
's init function and the action server name for the task.
class CustomTask(CustomTaskBase):
def __init__(self):
# The derived class must always call the super() and provide the action server name.
# This name will need to be unique among custom tasks and must match what is in the
# UI.
super().__init__("custom_task_name")
The CustomTaskBase
exposes a SimpleActionServer
(see here
for further details) object as _as
, a UITaskFeedback
object as _feedback
and a UITaskResults
object as _result
to be used as part of
the tasks functionality.
- The last requirement is that the
CustomTask
needs to have therun_task(self, goal)
function be defined, which takes in the UITaskGoal.
def run_task(self, goal):
When running the task users may handle errors through the action servers set_aborted()
function. When this function is called the entire mission
will be aborted.
- Restarting the UGV will trigger the system to load the custom task that was created, making it available for mission use. If a custom task is not configured properly the custom task manager will ignore the task and proceed loading in the next task while logging an error with ROS.
Sample Custom Tasks
Input Looper
Below is a sample template file named "input_looper.py" which iterates throught the two data variables and publishes them to the feedback topic. If neither of the variables have any data in them the task is aborted.
#!/usr/bin/python3
from onav_tasks.custom_task_base import *
class InputLooperTask(CustomTaskBase):
def __init__(self):
# The derived class must always call the super() and provide the action server name.
# This name will need to be unique among custom tasks and must match what is in the
# UI.
super().__init__("input_looper")
def run_task(self, goal):
if len(goal.strings) == 0 and len(goal.floats) == 0:
# Task and running mission will be aborted in this case
self._as.set_aborted()
return False
# Loop through the strings and float values and publish them each to the /input_looper/feedback topic
for string in goal.strings:
self._feedback.state = string
self._as.publish_feedback(self._feedback)
for num in goal.floats:
self._feedback.state = str(num)
self._as.publish_feedback(self._feedback)
# Returning True or False will not currently impact the mission but will write the current state to the
# /task/result topic accordingly.
return True
Record GNSS Data
Below is a sample custom task file named "record_gnss.py" which outputs the current GNSS data to the console.
#!/usr/bin/python3
from onav_tasks.custom_task_base import *
from sensor_msgs.msg import NavSatFix
from threading import Lock
import rospy
class RecorGNSSTask(CustomTaskBase):
def __init__(self):
super().__init__("record_gnss")
self._sub = rospy.Subscriber("/sensors/gps_0/fix", NavSatFix, self.gpsSubscriberCallback)
self.gps_lat = 0.0
self.gps_lon = 0.0
self._gps_coordinates_lock = Lock()
def run_task(self, goal):
feedback = UITaskFeedback()
feedback.state = 'Recording GNSS lat/lon'
self._as.publish_feedback(feedback)
msg = ""
with self._gps_coordinates_lock:
msg = "ID: %f Name: %s Latitude: %f Longitude: %f" % (
goal.floats[0], goal.strings[0], self.gps_lat, self.gps_lon)
rospy.loginfo(msg)
return True
def gpsSubscriberCallback(self, msg):
with self._gps_coordinates_lock:
self.gps_lat = msg.latitude
self.gps_lon = msg.longitude
Move PTZ camera to a Lat/Lon
Below is a more advanced custom task. The file is "move_ptz_lat_lon.py" which pans a PTZ camera to point to a specific lat/lon coordinate.
from onav_tasks.custom_task_base import *
import actionlib
from clearpath_localization_msgs.srv import *
from clearpath_navigation_msgs.msg import *
from nav_msgs.msg import Odometry
from ptz_action_server_msgs.msg import PtzAction
import ptz_action_server_msgs.msg
import math
from math import remainder, tau
import rospy
from sensor_msgs import *
from tf.transformations import euler_from_quaternion, quaternion_from_euler
class MovePtzLatLon(CustomTaskBase):
def __init__(self):
super().__init__("move_ptz_lat_lon")
self.localization_subscriber_ = rospy.Subscriber("/localization/odom", Odometry, self.localizationCallback)
self.move_ptz_client_ = actionlib.SimpleActionClient('/sensors/camera_0/move_ptz', PtzAction)
self.service_ = rospy.ServiceProxy('/localization/lat_lon_to_xy', ConvertLatLonToCartesian)
self.current_pose = Odometry()
def localizationCallback(self, odom_msg):
self.current_pose = odom_msg
def run_task(self, goal):
if len(goal.strings) == 0 and len(goal.floats) == 0:
rospy.logwarn('Warning')
self._as.set_aborted()
return False
goal_latitude = goal.floats[0]
goal_longitude = goal.floats[1]
goal_zoom = goal.floats[2]
str2 = 'Received goal latitude: ' + str(goal_latitude) + ', goal longitude: ' + str(goal_longitude) + ', zoom: ' + str(goal_zoom)
feedback = UITaskFeedback()
feedback.state = 'Aiming camera at lat-lon (' + str(goal_latitude) + ', ' + str(goal_longitude)+')'
self._as.publish_feedback(feedback)
orientation_q = self.current_pose.pose.pose.orientation
orientation_list = [orientation_q.x, orientation_q.y, orientation_q.z, orientation_q.w]
(roll, pitch, yaw) = euler_from_quaternion (orientation_list)
gps_msg = sensor_msgs.msg.NavSatFix()
gps_msg.latitude = goal_latitude
gps_msg.longitude = goal_longitude
goal_utm = self.service_(gps_msg)
goal_x = goal_utm.pose.pose.position.x
goal_y = goal_utm.pose.pose.position.y
goal_angle = math.atan2(goal_y - self.current_pose.pose.pose.position.y, goal_x - self.current_pose.pose.pose.position.x)
pan_angle = math.remainder(goal_angle - yaw, math.tau)
print(pan_angle)
self.move_ptz_client_.wait_for_server()
goal = ptz_action_server_msgs.msg.PtzGoal()
goal.pan=pan_angle
goal.tilt=0
goal.zoom=goal_zoom
self.move_ptz_client_.send_goal(goal)
self.move_ptz_client_.wait_for_result()
print(self.move_ptz_client_.get_result())
return True