Skip to main content
Version: 2.2.0

All Other Endpoints

Topics

autonomy/config

The autonomy configuration

import rclpy
from rclpy.node import Node

from clearpath_navigation_msgs.msg import AutonomyConfig


class AutonomyConfig(Node):

def __init__(self):
super().__init__('config_sub')
self.config_sub = self.create_subscription(
AutonomyConfig, 'autonomy/config', self.config_cb, 10)
self.config_sub # prevent unused variable warning

def config_cb(self, msg):
self.last_config_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
config_sub = AutonomyConfig()
rclpy.spin(config_sub)
config_sub.destroy_node()
rclpy.shutdown()
autonomy/initial_path

The initial Path computed by autonomy

import rclpy
from rclpy.node import Node

from nav_msgs.msg import Path


class AutonomyInitialPath(Node):

def __init__(self):
super().__init__('path_sub')
self.path_sub = self.create_subscription(
Path, 'autonomy/initial_path', self.path_cb, 10)
self.path_sub # prevent unused variable warning

def path_cb(self, msg):
self.last_path_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
path_sub = AutonomyInitialPath()
rclpy.spin(path_sub)
path_sub.destroy_node()
rclpy.shutdown()
autonomy/status

The status of the autonomy

import rclpy
from rclpy.node import Node

from clearpath_navigation_msgs.msg import AutonomyStatus


class AutonomyStatus(Node):

def __init__(self):
super().__init__('status_sub')
self.status_sub = self.create_subscription(
AutonomyStatus, 'autonomy/status', self.status_cb, 10)
self.status_sub # prevent unused variable warning

def status_cb(self, msg):
self.last_status_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
status_sub = AutonomyStatus()
rclpy.spin(status_sub)
status_sub.destroy_node()
rclpy.shutdown()
control_selection/current_mode

Current control mode (NEUTRAL, MANUAL, AUTONOMY).

import rclpy
from rclpy.node import Node

from clearpath_control_msgs.msg import ControlMode


class CurrentMode(Node):

def __init__(self):
super().__init__('mode_sub')
self.mode_sub = self.create_subscription(
CurrentMode, 'control_selection/current_mode', self.mode_cb, 10)
self.mode_sub # prevent unused variable warning

def mode_cb(self, msg):
self.last_mode_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
mode_sub = CurrentMode()
rclpy.spin(mode_sub)
mode_sub.destroy_node()
rclpy.shutdown()
control_selection/control_state

The complete state of control selection node.

import rclpy
from rclpy.node import Node

from clearpath_control_msgs.msg import ControlSelectionState


class ControlSelectionState(Node):

def __init__(self):
super().__init__('state_sub')
self.state_sub = self.create_subscription(
ControlSelectionState, 'control_selection/control_state', self.state_cb, 10)
self.state_sub # prevent unused variable warning

def state_cb(self, msg):
self.last_status_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
state_sub = ControlSelectionState()
rclpy.spin(state_sub)
state_sub.destroy_node()
rclpy.shutdown()
docking/docking_server/path

The docking path.

import rclpy
from rclpy.node import Node

from nav_msgs.msg import Path


class DockingPath(Node):

def __init__(self):
super().__init__('path_sub')
self.path_sub = self.create_subscription(
Path, 'docking/docking_server/path', self.path_cb, 10)
self.path_sub # prevent unused variable warning

def path_cb(self, msg):
self.last_path_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
path_sub = DockingPath()
rclpy.spin(path_sub)
path_sub.destroy_node()
rclpy.shutdown()
docking/undocking_server/path

The undocking path.

import rclpy
from rclpy.node import Node

from nav_msgs.msg import Path


class UndockingPath(Node):

def __init__(self):
super().__init__('path_sub')
self.path_sub = self.create_subscription(
Path, 'docking/undocking_server/path', self.path_cb, 10)
self.path_sub # prevent unused variable warning

def path_cb(self, msg):
self.last_path_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
path_sub = UndockingPath()
rclpy.spin(path_sub)
path_sub.destroy_node()
rclpy.shutdown()
goto/preview

The preview points for a GoTo execution.

import rclpy
from rclpy.node import Node

from clearpath_navigation_msgs.msg import GoToPreview


class GoToPreview(Node):

def __init__(self):
super().__init__('preview_sub')
self.preview_sub = self.create_subscription(
GoToPreview, 'goto/preview', self.preview_cb, 10)
self.preview_sub # prevent unused variable warning

def preview_cb(self, msg):
self.last_preview_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
preview_sub = GoToPreview()
rclpy.spin(preview_sub)
preview_sub.destroy_node()
rclpy.shutdown()
localization/datum

The lat/lon coordinate of the datum. This value is sotred in persistent memory and loaded at runtime.

import rclpy
from rclpy.node import Node

from sensor_msgs.msg import NavSatFix


class Datum(Node):

def __init__(self):
super().__init__('datum_sub')
self.datum_sub = self.create_subscription(
NavSatFix, 'localization/datum', self.datum_cb, 10)
self.datum_sub # prevent unused variable warning

def datum_cb(self, msg):
self.last_datum_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
datum_sub = Datum()
rclpy.spin(datum_sub)
datum_sub.destroy_node()
rclpy.shutdown()
mission/preview

The preview points for a Mission execution.

import rclpy
from rclpy.node import Node

from clearpath_navigation_msgs.msg import MissionPreview


class MissionPreview(Node):

def __init__(self):
super().__init__('preview_sub')
self.preview_sub = self.create_subscription(
MissionPreview, 'mission/preview', self.preview_cb, 10)
self.preview_sub # prevent unused variable warning

def preview_cb(self, msg):
self.last_preview_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
preview_sub = MissionPreview()
rclpy.spin(preview_sub)
preview_sub.destroy_node()
rclpy.shutdown()
speed_limit

The current navigation speed limit.

import rclpy
from rclpy.node import Node

from nav2_msgs.msg import SpeedLimit


class SpeedLimit(Node):

def __init__(self):
super().__init__('speed_limit_sub')
self.speed_limit_sub = self.create_subscription(
SpeedLimit, 'speed_limit', self.datum_cb, 10)
self.speed_limit_sub # prevent unused variable warning

def datum_cb(self, msg):
self.last_speed_limit_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
speed_limit_sub = SpeedLimit()
rclpy.spin(speed_limit_sub)
speed_limit_sub.destroy_node()
rclpy.shutdown()
ui/heartbeat

The heartbeat of the UI.

import rclpy
from rclpy.node import Node

from std_msgs.msg import Header


class Heartbeat(Node):

def __init__(self):
super().__init__('heartbeat_sub')
self.heartbeat_sub = self.create_subscription(
Header, 'ui/heartbeat', self.heartbeat_cb, 10)
self.heartbeat_sub # prevent unused variable warning

def heartbeat_cb(self, msg):
self.last_heartbeat_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
heartbeat_sub = Heartbeat()
rclpy.spin(heartbeat_sub)
heartbeat_sub.destroy_node()
rclpy.shutdown()
ui_teleop/cmd_vel

The velocity commands from the UI joystick.

import rclpy
from rclpy.node import Node

from geometry_msgs.msg import TwistStamped


class UICmdVel(Node):

def __init__(self):
super().__init__('ui_cmd_vel_sub')
self.ui_cmd_vel_sub = self.create_subscription(
TwistStamped, 'ui_teleop/cmd_vel', self.ui_cmd_vel_cb, 10)
self.ui_cmd_vel_sub # prevent unused variable warning

def ui_cmd_vel_cb(self, msg):
self.last_ui_cmd_vel_msg = msg

if __name__ == '__main__':
rclpy.init(args=args)
ui_cmd_vel_sub = UICmdVel()
rclpy.spin(ui_cmd_vel_sub)
ui_cmd_vel_sub.destroy_node()
rclpy.shutdown()

Services

control_selection/set_mode

Switch the control mode of the platform. Options are: ('AUTONOMY', 'MANUAL', 'NEUTRAL')

import rclpy
from rclpy.node import Node

from clearpath_control_msgs.srv import SetControlMode
from clearpath_control_msgs.msg import ControlMode


class SetControlMode(Node):

def __init__(self):
super().__init__('set_control_mode_client')
self._srv_client = self.create_client(SetControlMode, 'control_selection/set_mode')
while not self._srv_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = SetControlMode.Request()

def send_request(self, mode):
# set to manual mode
self.req.mode.mode = mode
self.future = self._srv_client.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()

if __name__ == '__main__':
rclpy.init(args=args)
set_control_mode_client = SetControlMode()
response = set_control_mode_client.send_request(ControlMode.MANUAL)
set_control_mode_client.get_logger().info('Control mode set to MANUAL mode.'))
set_control_mode_client.destroy_node()
rclpy.shutdown()
docking/dock_manager/clear_data

Clears all dock data.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from std_srvs.srv import Trigger

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ClearDockData'

class ClearDockData(Node):

def __init__(self, namespace: str):
super().__init__('clear_dock_data_client')
self._srv_client = self.create_client(Trigger, f'/{namespace}/docking/dock_manager/clear_data')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/clear_data service not available, waiting again...')

def send_request(self):
req = Trigger.Request()
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.clear_dock_data_srv_response_cb)

def clear_dock_data_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Dock data cleared!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to clear dock data')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

clear_dock_data_client = ClearDockData(namespace)
clear_dock_data_client.send_request()

rclpy.spin(clear_dock_data_client)


if __name__ == '__main__':
main()

docking/dock_manager/delete_dock

Delete a dock by its ID.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import RemoveDock

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'DeleteDock'

# TODO: Enter name of the dock to be removed
DOCK_NAME: str = ''


class DeleteDock(Node):

def __init__(self, namespace: str):
super().__init__('delete_dock_client')
self._srv_client = self.create_client(RemoveDock, f'/{namespace}/docking/dock_manager/delete_dock')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/delete_dock service not available, waiting again...')

def send_request(self):
req = RemoveDock.Request()
req.name = DOCK_NAME
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.delete_dock_srv_response_cb)

def delete_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Deleted dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to delete dock!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

delete_dock_client = DeleteDock(namespace)
delete_dock_client.send_request()

rclpy.spin(delete_dock_client)


if __name__ == '__main__':
main()

docking/dock_manager/export

Delete a dock by its ID.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import ExportData

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ExportDock'


class ExportDock(Node):

def __init__(self, namespace: str):
super().__init__('export_dock_client')
self._srv_client = self.create_client(ExportData, f'/{namespace}/docking/dock_manager/export')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/export service not available, waiting again...')

def send_request(self):
req = ExportData.Request()
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.export_dock_srv_response_cb)

def export_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Exported dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to export dock info!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

export_client = ExportDock(namespace)
export_dock_client.send_request()

rclpy.spin(export_dock_client)


if __name__ == '__main__':
main()

docking/dock_manager/export

Delete a dock by its ID.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import ExportData

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ExportDock'


class ExportDock(Node):

def __init__(self, namespace: str):
super().__init__('export_dock_client')
self._srv_client = self.create_client(ExportData, f'/{namespace}/docking/dock_manager/export')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/export service not available, waiting again...')

def send_request(self):
req = ExportData.Request()
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.export_dock_srv_response_cb)

def export_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Exported dock data succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to export dock data!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

export_dock_client = ExportDock(namespace)
export_dock_client.send_request()

rclpy.spin(export_dock_client)


if __name__ == '__main__':
main()

docking/dock_manager/get_database

Returns the entire dock database.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import GetDockDatabase

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'GetDockDatabase'


class GetDockDatabase(Node):

def __init__(self, namespace: str):
super().__init__('get_dock_database_client')
self._srv_client = self.create_client(GetDockDatabase, f'/{namespace}/docking/dock_manager/get_database')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/get_database service not available, waiting again...')

def send_request(self):
req = GetDockDatabase.Request()
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.get_dock_database_srv_response_cb)

def get_dock_database_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Retrieved dock database succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to retrieve dock database!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

get_dock_database_client = GetDockDatabase(namespace)
get_dock_database_client.send_request()

rclpy.spin(get_dock_database_client)


if __name__ == '__main__':
main()

docking/dock_manager/import

Import a dock.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import ImportData

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ImportDock'

# TODO: Enter your dock info as a JSON string
DOCK_STR: str = ''


class ImportDock(Node):

def __init__(self, namespace: str):
super().__init__('import_dock_client')
self._srv_client = self.create_client(ImportData, f'/{namespace}/docking/dock_manager/import')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/import service not available, waiting again...')

def send_request(self):
req = ImportData.Request()
req.data = DOCK_STR
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.import_dock_srv_response_cb)

def import_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Imported dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to import dock!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

import_dock_client = ImportDock(namespace)
import_dock_client.send_request()

rclpy.spin(import_dock_client)


if __name__ == '__main__':
main()

docking/dock_manager/update_dock

Import a dock.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import UpdateDock

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'UpdateDock'

# TODO: Enter your dock info
DOCK_NAME: str = ''
DOCK_INFO = DockInfo(
name=DOCK_NAME,
dock_template='a300_side_dock',
latitude=,
longitude=,
orientation=Quaternion()
)


class UpdateDock(Node):

def __init__(self, namespace: str):
super().__init__('update_dock_client')
self._srv_client = self.create_client(UpdateDock, f'/{namespace}/docking/dock_manager/update_dock')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/update_dock service not available, waiting again...')

def send_request(self):
req = UpdateDock.Request()
req.name = DOCK_NAME
req.dock = DOCK_INFO
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.update_dock_srv_response_cb)

def update_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Updated dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to update dock!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

update_dock_client = UpdateDock(namespace)
update_dock_client.send_request()

rclpy.spin(update_dock_client)


if __name__ == '__main__':
main()

docking/dock_localizer/add_dock_current_pose

Add a dock with the current pose.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import AddDockCurrentPose

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'AddDockCurrentPose'

# TODO: Enter your dock info
DOCK_NAME: str = ''
DOCK_TEMPLATE: str = 'a300_side_dock'


class AddDockCurrentPose(Node):

def __init__(self, namespace: str):
super().__init__('add_dock_curr_pose_client')
self._srv_client = self.create_client(AddDockCurrentPose, f'/{namespace}/docking/dock_localizer/add_dock_current_pose')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_localizer/add_dock_current_pose service not available, waiting again...')

def send_request(self):
req = AddDockCurrentPose.Request()
req.dock_name = DOCK_NAME
req.dock_template = DOCK_TEMPLATE
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.add_dock_curr_pose_srv_response_cb)

def add_dock_curr_pose__srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Added dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to add dock!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

add_dock_curr_pose_client = AddDockCurrentPose(namespace)
add_dock_curr_pose_client.send_request()

rclpy.spin(add_dock_curr_pose_client)


if __name__ == '__main__':
main()

docking/dock_localizer/get_dock_poses

Return the dock and predock poses of a specific dock.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import GetDockPoses

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'GetDockPoses'

# TODO: Enter your dock name
DOCK_NAME: str = ''


class GetDockPoses(Node):

def __init__(self, namespace: str):
super().__init__('get_dock_poses_client')
self._srv_client = self.create_client(GetDockPoses, f'/{namespace}/docking/dock_localizer/get_dock_poses')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_localizer/get_dock_poses service not available, waiting again...')

def send_request(self):
req = GetDockPoses.Request()
req.name = DOCK_NAME
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.get_dock_poses_srv_response_cb)

def get_dock_poses_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Retrieved dock poses succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to retriee dock poses!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

get_dock_poses_client = GetDockPoses(namespace)
get_dock_poses_client.send_request()

rclpy.spin(get_dock_poses_client)


if __name__ == '__main__':
main()

localization/lat_lon_to_xy

Convert lat/lon coordinate to map XY coordinate.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_localization_msgs.srv import ConvertLatLonToCartesian
from sensor_msgs.msg import NavSatFix

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ConvertLatLonToCartesian'

# TODO: Enter your lat/lon coordinates
LAT_LON_MSG = NavSatFix(
latitude=,
longitude=,
)

class ConvertLatLonToCartesian(Node):

def __init__(self, namespace: str):
super().__init__('convert_latlon_client')
self._srv_client = self.create_client(ConvertLatLonToCartesian, f'/{namespace}/localization/lat_lon_to_xy')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/localization/lat_lon_to_xy service not available, waiting again...')

def send_request(self):
req = ConvertLatLonToCartesian.Request()
req.msg = LAT_LON_MSG
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.convert_latlon_client_srv_response_cb)

def convert_latlon_client_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Converted lat/lon to xy succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to convert lat/lon to xy!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

convert_latlon_client = ConvertLatLonToCartesian(namespace)
convert_latlon_client.send_request()

rclpy.spin(convert_latlon_client)


if __name__ == '__main__':
main()

localization/lat_lon_to_xy_array

Convert a set of lat/lon coordinates to map XY coordinates.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_localization_msgs.srv import ConvertLatLonToCartesianArray
from sensor_msgs.msg import NavSatFix

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ConvertLatLonToCartesianArray'

# TODO: Enter your lat/lon coordinates
LAT_LON_MSG1 = NavSatFix(
latitude=,
longitude=,
)
LAT_LON_MSG2 = NavSatFix(
latitude=,
longitude=,
)


class ConvertLatLonToCartesianArray(Node):

def __init__(self, namespace: str):
super().__init__('convert_latlon_client')
self._srv_client = self.create_client(ConvertLatLonToCartesianArray, f'/{namespace}/localization/lat_lon_to_xy_array')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/localization/lat_lon_to_xy_array service not available, waiting again...')

def send_request(self):
req = ConvertLatLonToCartesianArray.Request()
req.msg = [LAT_LON_MSG1, LAT_LON_MSG2]
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.convert_latlon_client_srv_response_cb)

def convert_latlon_client_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Converted lat/lon to xy succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to convert lat/lon to xy!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

convert_latlon_client = ConvertLatLonToCartesianArray(namespace)
convert_latlon_client.send_request()

rclpy.spin(convert_latlon_client)


if __name__ == '__main__':
main()

localization/xy_to_lat_lon

Convert map xy coordinate to lat/lon coordinates.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_localization_msgs.srv import ConvertCartesianToLatLon
from geometry_msgs.msg import PoseStamped, Pose, Point

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ConvertCartesianToLatLon'

# TODO: Enter your xy coordinates
POSE_MSG = PoseStamped(
pose=Pose(
position=Point(
x=,
y=
)
)
)

class ConvertCartesianToLatLon(Node):

def __init__(self, namespace: str):
super().__init__('convert_xy_client')
self._srv_client = self.create_client(ConvertCartesianToLatLon, f'/{namespace}/localization/xy_to_lat_lon')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/localization/xy_to_lat_lon service not available, waiting again...')

def send_request(self):
req = ConvertCartesianToLatLon.Request()
req.pose = POSE_MSG
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.convert_xy_client_srv_response_cb)

def convert_xy_client_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Converted xy to lat/lon succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to convert xy to lat/lon!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

convert_xy_client = ConvertCartesianToLatLon(namespace)
convert_xy_client.send_request()

rclpy.spin(convert_xy_client)


if __name__ == '__main__':
main()

localization/xy_to_lat_lon_array

Convert map xy coordinate to lat/lon coordinates.

from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_localization_msgs.srv import ConvertCartesianToLatLonArray
from geometry_msgs.msg import PoseStamped, Pose, Point

ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ConvertCartesianToLatLonArray'

# TODO: Enter your xy coordinates
POSE_MSG1 = PoseStamped(
pose=Pose(
position=Point(
x=,
y=
)
)
)

POSE_MSG2 = PoseStamped(
pose=Pose(
position=Point(
x=,
y=
)
)
)

class ConvertCartesianToLatLonArray(Node):

def __init__(self, namespace: str):
super().__init__('convert_xy_client')
self._srv_client = self.create_client(ConvertCartesianToLatLonArray, f'/{namespace}/localization/xy_to_lat_lon_array')

while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/localization/xy_to_lat_lon_array service not available, waiting again...')

def send_request(self):
req = ConvertCartesianToLatLonArray.Request()
req.pose = [POSE_MSG1, POSE_MSG2]
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.convert_xy_client_srv_response_cb)

def convert_xy_client_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Converted xy to lat/lon succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to convert xy to lat/lon!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')

self.destroy_node()
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']

convert_xy_client = ConvertCartesianToLatLonArray(namespace)
convert_xy_client.send_request()

rclpy.spin(convert_xy_client)


if __name__ == '__main__':
main()