All Other Endpoints
Topics
autonomy/config
The autonomy configuration
-
Topic Name: autonomy/config
-
Topic Type: clearpath_navigation_msgs/msg/AutonomyConfig
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from clearpath_navigation_msgs.msg import AutonomyConfig
class AutonomyConfig(Node):
def __init__(self):
super().__init__('config_sub')
self.config_sub = self.create_subscription(
AutonomyConfig, 'autonomy/config', self.config_cb, 10)
self.config_sub # prevent unused variable warning
def config_cb(self, msg):
self.last_config_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
config_sub = AutonomyConfig()
rclpy.spin(config_sub)
config_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "clearpath_navigation_msgs/msg/autonomy_config.hpp"
class AutonomyConfig : public rclcpp::Node
{
public:
AutonomyConfig()
: Node("config_sub")
{
auto config_cb =
[this](clearpath_navigation_msgs::msg::AutonomyConfig::UniquePtr msg) -> void {
last_config_msg_ = msg;
};
config_sub_ = this->create_subscription<clearpath_navigation_msgs::msg::AutonomyConfig>("autonomy/config", 10, config_cb);
}
private:
rclcpp::Subscription<clearpath_navigation_msgs::msg::AutonomyConfig>::SharedPtr config_sub_;
clearpath_navigation_msgs::msg::AutonomyConfig::UniquePtr last_config_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<AutonomyConfig>());
rclcpp::shutdown();
return 0;
}
autonomy/initial_path
The initial Path computed by autonomy
-
Topic Name: autonomy/initial_path
-
Topic Type: nav_msgs/msg/Path
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Path
class AutonomyInitialPath(Node):
def __init__(self):
super().__init__('path_sub')
self.path_sub = self.create_subscription(
Path, 'autonomy/initial_path', self.path_cb, 10)
self.path_sub # prevent unused variable warning
def path_cb(self, msg):
self.last_path_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
path_sub = AutonomyInitialPath()
rclpy.spin(path_sub)
path_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "nav_msgs/msg/Path.hpp"
class AutonomyInitialPath : public rclcpp::Node
{
public:
AutonomyInitialPath()
: Node("path_sub")
{
auto path_sub =
[this](nav_msgs::msg::Path::UniquePtr msg) -> void {
last_path_msg_ = msg;
};
path_sub_ = this->create_subscription<nav_msgs::msg::Path>("autonomy/initial_path", 10, path_cb);
}
private:
rclcpp::Subscription<nav_msgs::msg::Path>::SharedPtr path_sub_;
nav_msgs::msg::Path::UniquePtr last_path_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<AutonomyInitialPath>());
rclcpp::shutdown();
return 0;
}
autonomy/status
The status of the autonomy
-
Topic Name: autonomy/status
-
Topic Type: clearpath_navigation_msgs/msg/AutonomyStatus
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from clearpath_navigation_msgs.msg import AutonomyStatus
class AutonomyStatus(Node):
def __init__(self):
super().__init__('status_sub')
self.status_sub = self.create_subscription(
AutonomyStatus, 'autonomy/status', self.status_cb, 10)
self.status_sub # prevent unused variable warning
def status_cb(self, msg):
self.last_status_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
status_sub = AutonomyStatus()
rclpy.spin(status_sub)
status_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "clearpath_navigation_msgs/msg/autonomy_status.hpp"
class AutonomyStatus : public rclcpp::Node
{
public:
AutonomyStatus()
: Node("status_sub")
{
auto status_cb =
[this](clearpath_navigation_msgs::msg::AutonomyStatus::UniquePtr msg) -> void {
last_status_msg_ = msg;
};
status_sub_ = this->create_subscription<clearpath_navigation_msgs::msg::AutonomyStatus>("autonomy/status", 10, status_cb);
}
private:
rclcpp::Subscription<clearpath_navigation_msgs::msg::AutonomyStatus>::SharedPtr status_sub_;
clearpath_navigation_msgs::msg::AutonomyStatus::UniquePtr last_status_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<AutonomyStatus>());
rclcpp::shutdown();
return 0;
}
control_selection/current_mode
Current control mode (NEUTRAL, MANUAL, AUTONOMY).
-
Topic Name: control_selection/current_mode
-
Topic Type: clearpath_control_msgs/msg/ControlMode
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from clearpath_control_msgs.msg import ControlMode
class CurrentMode(Node):
def __init__(self):
super().__init__('mode_sub')
self.mode_sub = self.create_subscription(
CurrentMode, 'control_selection/current_mode', self.mode_cb, 10)
self.mode_sub # prevent unused variable warning
def mode_cb(self, msg):
self.last_mode_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
mode_sub = CurrentMode()
rclpy.spin(mode_sub)
mode_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "clearpath_control_msgs/msg/current_mode.hpp"
class CurrentMode : public rclcpp::Node
{
public:
CurrentMode()
: Node("mode_sub")
{
auto mode_cb =
[this](clearpath_control_msgs::msg::CurrentMode::UniquePtr msg) -> void {
last_mode_msg_ = msg;
};
mode_sub_ = this->create_subscription<clearpath_control_msgs::msg::CurrentMode>("control_selection/current_mode", 10, mode_cb);
}
private:
rclcpp::Subscription<clearpath_control_msgs::msg::CurrentMode>::SharedPtr mode_sub_;
clearpath_control_msgs::msg::CurrentMode::UniquePtr last_mode_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<CurrentMode>());
rclcpp::shutdown();
return 0;
}
control_selection/control_state
The complete state of control selection node.
-
Topic Name: control_selection/control_state
-
Topic Type: clearpath_control_msgs/msg/ControlSelectionState
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from clearpath_control_msgs.msg import ControlSelectionState
class ControlSelectionState(Node):
def __init__(self):
super().__init__('state_sub')
self.state_sub = self.create_subscription(
ControlSelectionState, 'control_selection/control_state', self.state_cb, 10)
self.state_sub # prevent unused variable warning
def state_cb(self, msg):
self.last_status_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
state_sub = ControlSelectionState()
rclpy.spin(state_sub)
state_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "clearpath_control_msgs/msg/control_selection_state.hpp"
class ControlSelectionState : public rclcpp::Node
{
public:
ControlSelectionState()
: Node("state_sub")
{
auto state_cb =
[this](clearpath_control_msgs::msg::ControlSelectionState::UniquePtr msg) -> void {
last_state_msg_ = msg;
};
state_sub_ = this->create_subscription<clearpath_control_msgs::msg::ControlSelectionState>("control_selection/control_state", 10, state_cb);
}
private:
rclcpp::Subscription<clearpath_control_msgs::msg::ControlSelectionState>::SharedPtr state_sub_;
clearpath_control_msgs::msg::ControlSelectionState::UniquePtr last_state_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<ControlSelectionState>());
rclcpp::shutdown();
return 0;
}
docking/docking_server/path
The docking path.
-
Topic Name: docking/docking_server/path
-
Topic Type: nav_msgs/msg/Path
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Path
class DockingPath(Node):
def __init__(self):
super().__init__('path_sub')
self.path_sub = self.create_subscription(
Path, 'docking/docking_server/path', self.path_cb, 10)
self.path_sub # prevent unused variable warning
def path_cb(self, msg):
self.last_path_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
path_sub = DockingPath()
rclpy.spin(path_sub)
path_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "nav_msgs/msg/path.hpp"
class DockingPath : public rclcpp::Node
{
public:
DockingPath()
: Node("path_sub")
{
auto path_cb =
[this](nav_msgs::msg::Path::UniquePtr msg) -> void {
last_path_msg_ = msg;
};
path_sub_ = this->create_subscription<nav_msgs::msg::Path>("docking/docking_server/path", 10, path_cb);
}
private:
rclcpp::Subscription<nav_msgs::msg::Path>::SharedPtr path_sub_;
nav_msgs::msg::Path::UniquePtr last_path_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<DockingPath>());
rclcpp::shutdown();
return 0;
}
docking/undocking_server/path
The undocking path.
-
Topic Name: docking/undocking_server/path
-
Topic Type: nav_msgs/msg/Path
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Path
class UndockingPath(Node):
def __init__(self):
super().__init__('path_sub')
self.path_sub = self.create_subscription(
Path, 'docking/undocking_server/path', self.path_cb, 10)
self.path_sub # prevent unused variable warning
def path_cb(self, msg):
self.last_path_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
path_sub = UndockingPath()
rclpy.spin(path_sub)
path_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "nav_msgs/msg/path.hpp"
class UndockingPath : public rclcpp::Node
{
public:
UndockingPath()
: Node("path_sub")
{
auto path_cb =
[this](nav_msgs::msg::Path::UniquePtr msg) -> void {
last_path_msg_ = msg;
};
path_sub_ = this->create_subscription<nav_msgs::msg::Path>("docking/undocking_server/path", 10, path_cb);
}
private:
rclcpp::Subscription<nav_msgs::msg::Path>::SharedPtr path_sub_;
nav_msgs::msg::Path::UniquePtr last_path_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<UndockingPath>());
rclcpp::shutdown();
return 0;
}
goto/preview
The preview points for a GoTo execution.
-
Topic Name: goto/preview
-
Topic Type: clearpath_navigation_msgs/msg/GoToPreview
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from clearpath_navigation_msgs.msg import GoToPreview
class GoToPreview(Node):
def __init__(self):
super().__init__('preview_sub')
self.preview_sub = self.create_subscription(
GoToPreview, 'goto/preview', self.preview_cb, 10)
self.preview_sub # prevent unused variable warning
def preview_cb(self, msg):
self.last_preview_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
preview_sub = GoToPreview()
rclpy.spin(preview_sub)
preview_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "nav_msgs/msg/path.hpp"
class GoToPreview : public rclcpp::Node
{
public:
GoToPreview()
: Node("preview_sub")
{
auto preview_cb =
[this](clearpath_navigation_msgs::msg::GoToPreview::UniquePtr msg) -> void {
last_preview_msg_ = msg;
};
preview_sub_ = this->create_subscription<clearpath_navigation_msgs::msg::GoToPreview>("goto/preview", 10, preview_cb);
}
private:
rclcpp::Subscription<clearpath_navigation_msgs::msg::GoToPreview>::SharedPtr preview_sub_;
clearpath_navigation_msgs::msg::GoToPreview::UniquePtr last_preview_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<GoToPreview>());
rclcpp::shutdown();
return 0;
}
localization/datum
The lat/lon coordinate of the datum. This value is sotred in persistent memory and loaded at runtime.
-
Topic Name: localization/datum
-
Topic Type: sensor_msgs/msg/NavSatFix
-
Topic QoS:
- Python
- C++
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import NavSatFix
class Datum(Node):
def __init__(self):
super().__init__('datum_sub')
self.datum_sub = self.create_subscription(
NavSatFix, 'localization/datum', self.datum_cb, 10)
self.datum_sub # prevent unused variable warning
def datum_cb(self, msg):
self.last_datum_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
datum_sub = Datum()
rclpy.spin(datum_sub)
datum_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "sensor_msgs/msg/nav_sat_fix.hpp"
class Datum : public rclcpp::Node
{
public:
Datum()
: Node("datum_sub")
{
auto datum_cb =
[this](sensor_msgs::msg::NavSatFix::UniquePtr msg) -> void {
last_datum_msg_ = msg;
};
datum_sub_ = this->create_subscription<sensor_msgs::msg::NavSatFix>("localization/datum", 10, datum_cb);
}
private:
rclcpp::Subscription<sensor_msgs::msg::NavSatFix>::SharedPtr datum_sub_;
sensor_msgs::msg::NavSatFix::UniquePtr last_datum_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<Datum>());
rclcpp::shutdown();
return 0;
}
mission/preview
The preview points for a Mission execution.
-
Topic Name: mission/preview
-
Topic Type: clearpath_navigation_msgs/msg/MissionPreview
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from clearpath_navigation_msgs.msg import MissionPreview
class MissionPreview(Node):
def __init__(self):
super().__init__('preview_sub')
self.preview_sub = self.create_subscription(
MissionPreview, 'mission/preview', self.preview_cb, 10)
self.preview_sub # prevent unused variable warning
def preview_cb(self, msg):
self.last_preview_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
preview_sub = MissionPreview()
rclpy.spin(preview_sub)
preview_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "nav_msgs/msg/path.hpp"
class MissionPreview : public rclcpp::Node
{
public:
MissionPreview()
: Node("preview_sub")
{
auto preview_cb =
[this](clearpath_navigation_msgs::msg::MissionPreview::UniquePtr msg) -> void {
last_preview_msg_ = msg;
};
preview_sub_ = this->create_subscription<clearpath_navigation_msgs::msg::MissionPreview>("mission/preview", 10, preview_cb);
}
private:
rclcpp::Subscription<clearpath_navigation_msgs::msg::MissionPreview>::SharedPtr preview_sub_;
clearpath_navigation_msgs::msg::MissionPreview::UniquePtr last_preview_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MissionPreview>());
rclcpp::shutdown();
return 0;
}
speed_limit
The current navigation speed limit.
-
Topic Name: speed_limit
-
Topic Type: nav2_msgs/msg/SpeedLimit
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from nav2_msgs.msg import SpeedLimit
class SpeedLimit(Node):
def __init__(self):
super().__init__('speed_limit_sub')
self.speed_limit_sub = self.create_subscription(
SpeedLimit, 'speed_limit', self.datum_cb, 10)
self.speed_limit_sub # prevent unused variable warning
def datum_cb(self, msg):
self.last_speed_limit_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
speed_limit_sub = SpeedLimit()
rclpy.spin(speed_limit_sub)
speed_limit_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "nav2_msgs/msg/speed_limit.hpp"
class SpeedLimit : public rclcpp::Node
{
public:
SpeedLimit()
: Node("speed_limit_sub")
{
auto speed_limit_cb =
[this](nav2_msgs::msg::SpeedLimit::UniquePtr msg) -> void {
last_speed_limit_msg_ = msg;
};
speed_limit_sub_ = this->create_subscription<nav2_msgs::msg::SpeedLimit>("speed_limit", 10, speed_limit_cb);
}
private:
rclcpp::Subscription<nav2_msgs::msg::SpeedLimit>::SharedPtr speed_limit_sub_;
nav2_msgs::msg::SpeedLimit::UniquePtr last_speed_limit_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<SpeedLimit>());
rclcpp::shutdown();
return 0;
}
ui/heartbeat
The heartbeat of the UI.
-
Topic Name: ui/heartbeat
-
Topic Type: std_msgs/msg/Header
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from std_msgs.msg import Header
class Heartbeat(Node):
def __init__(self):
super().__init__('heartbeat_sub')
self.heartbeat_sub = self.create_subscription(
Header, 'ui/heartbeat', self.heartbeat_cb, 10)
self.heartbeat_sub # prevent unused variable warning
def heartbeat_cb(self, msg):
self.last_heartbeat_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
heartbeat_sub = Heartbeat()
rclpy.spin(heartbeat_sub)
heartbeat_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/header.hpp"
class Heartbeat : public rclcpp::Node
{
public:
Heartbeat()
: Node("heartbeat_sub")
{
auto heartbeat_cb =
[this](std_msgs::msg::Header::UniquePtr msg) -> void {
last_heartbeat_msg_ = msg;
};
heartbeat_sub_ = this->create_subscription<std_msgs::msg::Header>("ui/heartbeat", 10, heartbeat_cb);
}
private:
rclcpp::Subscription<std_msgs::msg::Header>::SharedPtr heartbeat_sub_;
std_msgs::msg::Header::UniquePtr last_heartbeat_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<Heartbeat>());
rclcpp::shutdown();
return 0;
}
ui_teleop/cmd_vel
The velocity commands from the UI joystick.
-
Topic Name: ui_teleop/cmd_vel
-
Topic Type: geometry_msgs/msg/TwistStamped
-
Topic QoS: System Default
- Python
- C++
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TwistStamped
class UICmdVel(Node):
def __init__(self):
super().__init__('ui_cmd_vel_sub')
self.ui_cmd_vel_sub = self.create_subscription(
TwistStamped, 'ui_teleop/cmd_vel', self.ui_cmd_vel_cb, 10)
self.ui_cmd_vel_sub # prevent unused variable warning
def ui_cmd_vel_cb(self, msg):
self.last_ui_cmd_vel_msg = msg
if __name__ == '__main__':
rclpy.init(args=args)
ui_cmd_vel_sub = UICmdVel()
rclpy.spin(ui_cmd_vel_sub)
ui_cmd_vel_sub.destroy_node()
rclpy.shutdown()
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "geometry_msgs/msg/twist_stamped.hpp"
class UICmdVel : public rclcpp::Node
{
public:
UICmdVel()
: Node("ui_cmd_vel_sub")
{
auto ui_cmd_vel_cb =
[this](geometry_msgs::msg::TwistStamped::UniquePtr msg) -> void {
last_ui_cmd_vel_msg_ = msg;
};
ui_cmd_vel_sub_ = this->create_subscription<geometry_msgs::msg::TwistStamped>("ui_teleop/cmd_vel", 10, ui_cmd_vel_cb);
}
private:
rclcpp::Subscription<geometry_msgs::msg::TwistStamped>::SharedPtr ui_cmd_vel_sub_;
geometry_msgs::msg::TwistStamped::UniquePtr last_ui_cmd_vel_msg_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<UICmdVel>());
rclcpp::shutdown();
return 0;
}
Services
control_selection/set_mode
Switch the control mode of the platform. Options are: ('AUTONOMY', 'MANUAL', 'NEUTRAL')
-
Service Name: control_selection/set_mode
-
Service Type: clearpath_control_msgs/srv/SetControlMode
- Python
- C++
import rclpy
from rclpy.node import Node
from clearpath_control_msgs.srv import SetControlMode
from clearpath_control_msgs.msg import ControlMode
class SetControlMode(Node):
def __init__(self):
super().__init__('set_control_mode_client')
self._srv_client = self.create_client(SetControlMode, 'control_selection/set_mode')
while not self._srv_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = SetControlMode.Request()
def send_request(self, mode):
# set to manual mode
self.req.mode.mode = mode
self.future = self._srv_client.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
if __name__ == '__main__':
rclpy.init(args=args)
set_control_mode_client = SetControlMode()
response = set_control_mode_client.send_request(ControlMode.MANUAL)
set_control_mode_client.get_logger().info('Control mode set to MANUAL mode.'))
set_control_mode_client.destroy_node()
rclpy.shutdown()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_control_msgs/srv/set_control_mode.hpp"
#include "clearpath_control_msgs/msg/control_mode.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("set_control_mode_client");
rclcpp::Client<std_srvs::srv::SetControlMode>::SharedPtr client =
node->create_client<std_srvs::srv::SetControlMode>("control_selection/set_mode");
auto request = std::make_shared<std_srvs::srv::SetControlMode::Request>();
request-> mode.mode = clearpath_control_msgs::ControlMode::MANUAL;
while (!client->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "service not available, waiting again...");
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Control mode set to MANUAL mode");
} else {
RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "Failed to call service control_selection/set_mode");
}
rclcpp::shutdown();
return 0;
}
docking/dock_manager/clear_data
Clears all dock data.
-
Service Name: docking/dock_manager/clear_data
-
Service Type: std_srvs/srv/Trigger
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from std_srvs.srv import Trigger
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ClearDockData'
class ClearDockData(Node):
def __init__(self, namespace: str):
super().__init__('clear_dock_data_client')
self._srv_client = self.create_client(Trigger, f'/{namespace}/docking/dock_manager/clear_data')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/clear_data service not available, waiting again...')
def send_request(self):
req = Trigger.Request()
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.clear_dock_data_srv_response_cb)
def clear_dock_data_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Dock data cleared!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to clear dock data')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
clear_dock_data_client = ClearDockData(namespace)
clear_dock_data_client.send_request()
rclpy.spin(clear_dock_data_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "std_srvs/srv/trigger.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("clear_dock_data_client");
rclcpp::Client<std_srvs::srv::Trigger>::SharedPtr client =
node->create_client<std_srvs::srv::Trigger>("/" + namespace_ + "/" + std::string("docking/dock_manager/clear_data"));
auto request = std::make_shared<std_srvs::srv::Trigger::Request>();
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("clear_dock_data_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("clear_dock_data_client"), "/%s/docking/dock_manager/clear_data service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("clear_dock_data_client"), "Dock data cleared!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("clear_dock_data_client"), "Failed to clear dock data!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("clear_dock_data_client"), "Failed to call service /%s/docking/dock_manager/clear_data", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
docking/dock_manager/delete_dock
Delete a dock by its ID.
-
Service Name: docking/dock_manager/delete_dock
-
Service Type: clearpath_dock_msgs/srv/RemoveDock
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import RemoveDock
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'DeleteDock'
# TODO: Enter name of the dock to be removed
DOCK_NAME: str = ''
class DeleteDock(Node):
def __init__(self, namespace: str):
super().__init__('delete_dock_client')
self._srv_client = self.create_client(RemoveDock, f'/{namespace}/docking/dock_manager/delete_dock')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/delete_dock service not available, waiting again...')
def send_request(self):
req = RemoveDock.Request()
req.name = DOCK_NAME
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.delete_dock_srv_response_cb)
def delete_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Deleted dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to delete dock!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
delete_dock_client = DeleteDock(namespace)
delete_dock_client.send_request()
rclpy.spin(delete_dock_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_dock_msgs/srv/remove_dock.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
// TODO: Enter your map id and waypoint information
const std::string DOCK_NAME = "";
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("delete_dock_client");
rclcpp::Client<clearpath_dock_msgs::srv::RemoveDock>::SharedPtr client =
node->create_client<clearpath_dock_msgs::srv::RemoveDock>("/" + namespace_ + "/" + std::string("docking/dock_manager/delete_dock"));
auto request = std::make_shared<clearpath_dock_msgs::srv::RemoveDock::Request>();
request->name = DOCK_NAME;
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("delete_dock_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("delete_dock_client"), "/%s/docking/dock_manager/delete_dock service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("delete_dock_client"), "Dock deleted successfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("delete_dock_client"), "Failed to delete dock!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("delete_dock_client"), "Failed to call service /%s/docking/dock_manager/delete_dock", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
docking/dock_manager/export
Delete a dock by its ID.
-
Service Name: docking/dock_manager/export
-
Service Type: clearpath_dock_msgs/srv/ExportData
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import ExportData
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ExportDock'
class ExportDock(Node):
def __init__(self, namespace: str):
super().__init__('export_dock_client')
self._srv_client = self.create_client(ExportData, f'/{namespace}/docking/dock_manager/export')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/export service not available, waiting again...')
def send_request(self):
req = ExportData.Request()
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.export_dock_srv_response_cb)
def export_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Exported dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to export dock info!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
export_client = ExportDock(namespace)
export_dock_client.send_request()
rclpy.spin(export_dock_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_dock_msgs/srv/export_data.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("export_dock_client");
rclcpp::Client<clearpath_dock_msgs::srv::ExportData>::SharedPtr client =
node->create_client<clearpath_dock_msgs::srv::ExportData>("/" + namespace_ + "/" + std::string("docking/dock_manager/export"));
auto request = std::make_shared<clearpath_dock_msgs::srv::ExportData::Request>();
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("export_dock_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("export_dock_client"), "/%s/docking/dock_manager/export service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("export_dock_client"), "Dock exported successfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("export_dock_client"), "Failed to export dock!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("export_dock_client"), "Failed to call service /%s/docking/dock_manager/export", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
docking/dock_manager/export
Delete a dock by its ID.
-
Service Name: docking/dock_manager/export
-
Service Type: clearpath_dock_msgs/srv/ExportData
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import ExportData
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ExportDock'
class ExportDock(Node):
def __init__(self, namespace: str):
super().__init__('export_dock_client')
self._srv_client = self.create_client(ExportData, f'/{namespace}/docking/dock_manager/export')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/export service not available, waiting again...')
def send_request(self):
req = ExportData.Request()
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.export_dock_srv_response_cb)
def export_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Exported dock data succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to export dock data!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
export_dock_client = ExportDock(namespace)
export_dock_client.send_request()
rclpy.spin(export_dock_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_dock_msgs/srv/export_data.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("export_dock_client");
rclcpp::Client<clearpath_dock_msgs::srv::ExportData>::SharedPtr client =
node->create_client<clearpath_dock_msgs::srv::ExportData>("/" + namespace_ + "/" + std::string("docking/dock_manager/export"));
auto request = std::make_shared<clearpath_dock_msgs::srv::ExportData::Request>();
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("export_dock_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("export_dock_client"), "/%s/docking/dock_manager/export service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("export_dock_client"), "Dock data exported successfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("export_dock_client"), "Failed to export dock data!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("export_dock_client"), "Failed to call service /%s/docking/dock_manager/export", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
docking/dock_manager/get_database
Returns the entire dock database.
-
Service Name: docking/dock_manager/get_database
-
Service Type: clearpath_dock_msgs/srv/GetDockDatabase
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import GetDockDatabase
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'GetDockDatabase'
class GetDockDatabase(Node):
def __init__(self, namespace: str):
super().__init__('get_dock_database_client')
self._srv_client = self.create_client(GetDockDatabase, f'/{namespace}/docking/dock_manager/get_database')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/get_database service not available, waiting again...')
def send_request(self):
req = GetDockDatabase.Request()
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.get_dock_database_srv_response_cb)
def get_dock_database_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Retrieved dock database succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to retrieve dock database!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
get_dock_database_client = GetDockDatabase(namespace)
get_dock_database_client.send_request()
rclpy.spin(get_dock_database_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_dock_msgs/srv/get_dock_database.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("get_dock_database_client");
rclcpp::Client<clearpath_dock_msgs::srv::GetDockDatabase>::SharedPtr client =
node->create_client<clearpath_dock_msgs::srv::GetDockDatabase>("/" + namespace_ + "/" + std::string("docking/dock_manager/get_database"));
auto request = std::make_shared<clearpath_dock_msgs::srv::GetDockDatabase::Request>();
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("get_dock_database_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("get_dock_database_client"), "/%s/docking/dock_manager/get_database service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("get_dock_database_client"), "Retrieved dock database successfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("get_dock_database_client"), "Failed to retrieve dock database!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("get_dock_database_client"), "Failed to call service /%s/docking/dock_manager/get_database", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
docking/dock_manager/import
Import a dock.
-
Service Name: docking/dock_manager/import
-
Service Type: clearpath_dock_msgs/srv/ImportData
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import ImportData
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ImportDock'
# TODO: Enter your dock info as a JSON string
DOCK_STR: str = ''
class ImportDock(Node):
def __init__(self, namespace: str):
super().__init__('import_dock_client')
self._srv_client = self.create_client(ImportData, f'/{namespace}/docking/dock_manager/import')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/import service not available, waiting again...')
def send_request(self):
req = ImportData.Request()
req.data = DOCK_STR
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.import_dock_srv_response_cb)
def import_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Imported dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to import dock!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
import_dock_client = ImportDock(namespace)
import_dock_client.send_request()
rclpy.spin(import_dock_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_dock_msgs/srv/import_data.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("import_dock_client");
rclcpp::Client<clearpath_dock_msgs::srv::ImportData>::SharedPtr client =
node->create_client<clearpath_dock_msgs::srv::ImportData>("/" + namespace_ + "/" + std::string("docking/dock_manager/import"));
auto request = std::make_shared<clearpath_dock_msgs::srv::ImportData::Request>();
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("import_dock_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("import_dock_client"), "/%s/docking/dock_manager/import service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("import_dock_client"), "Imported dock successfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("import_dock_client"), "Failed to import dock!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("import_dock_client"), "Failed to call service /%s/docking/dock_manager/import", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
docking/dock_manager/update_dock
Import a dock.
-
Service Name: docking/dock_manager/update_dock
-
Service Type: clearpath_dock_msgs/srv/UpdateDock
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import UpdateDock
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'UpdateDock'
# TODO: Enter your dock info
DOCK_NAME: str = ''
DOCK_INFO = DockInfo(
name=DOCK_NAME,
dock_template='a300_side_dock',
latitude=,
longitude=,
orientation=Quaternion()
)
class UpdateDock(Node):
def __init__(self, namespace: str):
super().__init__('update_dock_client')
self._srv_client = self.create_client(UpdateDock, f'/{namespace}/docking/dock_manager/update_dock')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_manager/update_dock service not available, waiting again...')
def send_request(self):
req = UpdateDock.Request()
req.name = DOCK_NAME
req.dock = DOCK_INFO
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.update_dock_srv_response_cb)
def update_dock_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Updated dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to update dock!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
update_dock_client = UpdateDock(namespace)
update_dock_client.send_request()
rclpy.spin(update_dock_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_dock_msgs/srv/update_dock.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("update_dock_client");
rclcpp::Client<clearpath_dock_msgs::srv::UpdateDock>::SharedPtr client =
node->create_client<clearpath_dock_msgs::srv::UpdateDock>("/" + namespace_ + "/" + std::string("docking/dock_manager/update_dock"));
auto request = std::make_shared<clearpath_dock_msgs::srv::UpdateDock::Request>();
request->name = ""
auto dock_info = clearpath_dock_msgs::msg::DockInfo();
dock_info.name = request->name;
dock_info.dock_template = "a300_side_dock";
dock_info.latitude = ;
dock_info.longitude = ;
dock_info.orientation = ;
request->dock = dock_info;
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("update_dock_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("update_dock_client"), "/%s/docking/dock_manager/update_dock service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("update_dock_client"), "Updated dock successfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("update_dock_client"), "Failed to update dock!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("update_dock_client"), "Failed to call service /%s/docking/dock_manager/update_dock", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
docking/dock_localizer/add_dock_current_pose
Add a dock with the current pose.
-
Service Name: docking/dock_localizer/add_dock_current_pose
-
Service Type: clearpath_dock_msgs/srv/AddDockCurrentPose
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import AddDockCurrentPose
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'AddDockCurrentPose'
# TODO: Enter your dock info
DOCK_NAME: str = ''
DOCK_TEMPLATE: str = 'a300_side_dock'
class AddDockCurrentPose(Node):
def __init__(self, namespace: str):
super().__init__('add_dock_curr_pose_client')
self._srv_client = self.create_client(AddDockCurrentPose, f'/{namespace}/docking/dock_localizer/add_dock_current_pose')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_localizer/add_dock_current_pose service not available, waiting again...')
def send_request(self):
req = AddDockCurrentPose.Request()
req.dock_name = DOCK_NAME
req.dock_template = DOCK_TEMPLATE
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.add_dock_curr_pose_srv_response_cb)
def add_dock_curr_pose__srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Added dock succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to add dock!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
add_dock_curr_pose_client = AddDockCurrentPose(namespace)
add_dock_curr_pose_client.send_request()
rclpy.spin(add_dock_curr_pose_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_dock_msgs/srv/add_dock_current_pose.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("add_dock_current_pose_client");
rclcpp::Client<clearpath_dock_msgs::srv::AddDockCurrentPose>::SharedPtr client =
node->create_client<clearpath_dock_msgs::srv::AddDockCurrentPose>("/" + namespace_ + "/" + std::string("docking/dock_localizer/add_dock_current_pose"));
auto request = std::make_shared<clearpath_dock_msgs::srv::AddDockCurrentPose::Request>();
request->dock_name = ""
request->dock_template = "a300_side_dock";
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("add_dock_current_pose_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("add_dock_current_pose_client"), "/%s/docking/dock_localizer/add_dock_current_pose service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("add_dock_current_pose_client"), "Added dock successfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("add_dock_current_pose_client"), "Failed to add dock!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("add_dock_current_pose_client"), "Failed to call service /%s/docking/dock_localizer/add_dock_current_pose", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
docking/dock_localizer/get_dock_poses
Return the dock and predock poses of a specific dock.
-
Service Name: docking/dock_localizer/get_dock_poses
-
Service Type: clearpath_dock_msgs/srv/GetDockPoses
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_dock_msgs.srv import GetDockPoses
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'GetDockPoses'
# TODO: Enter your dock name
DOCK_NAME: str = ''
class GetDockPoses(Node):
def __init__(self, namespace: str):
super().__init__('get_dock_poses_client')
self._srv_client = self.create_client(GetDockPoses, f'/{namespace}/docking/dock_localizer/get_dock_poses')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/docking/dock_localizer/get_dock_poses service not available, waiting again...')
def send_request(self):
req = GetDockPoses.Request()
req.name = DOCK_NAME
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.get_dock_poses_srv_response_cb)
def get_dock_poses_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Retrieved dock poses succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to retriee dock poses!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
get_dock_poses_client = GetDockPoses(namespace)
get_dock_poses_client.send_request()
rclpy.spin(get_dock_poses_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_dock_msgs/srv/get_dock_poses.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("add_dock_current_pose_client");
rclcpp::Client<clearpath_dock_msgs::srv::GetDockPoses>::SharedPtr client =
node->create_client<clearpath_dock_msgs::srv::GetDockPoses>("/" + namespace_ + "/" + std::string("docking/dock_localizer/get_dock_poses"));
auto request = std::make_shared<clearpath_dock_msgs::srv::GetDockPoses::Request>();
request->name = ""
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("get_dock_poses_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("get_dock_poses_client"), "/%s/docking/dock_localizer/get_dock_poses service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("get_dock_poses_client"), "Retrieved dock poses successfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("get_dock_poses_client"), "Failed to retrieve dock poses!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("get_dock_poses_client"), "Failed to call service /%s/docking/dock_localizer/get_dock_poses", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
localization/lat_lon_to_xy
Convert lat/lon coordinate to map XY coordinate.
-
Service Name: localization/lat_lon_to_xy
-
Service Type: clearpath_localization_msgs/srv/ConvertLatLonToCartesian
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_localization_msgs.srv import ConvertLatLonToCartesian
from sensor_msgs.msg import NavSatFix
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ConvertLatLonToCartesian'
# TODO: Enter your lat/lon coordinates
LAT_LON_MSG = NavSatFix(
latitude=,
longitude=,
)
class ConvertLatLonToCartesian(Node):
def __init__(self, namespace: str):
super().__init__('convert_latlon_client')
self._srv_client = self.create_client(ConvertLatLonToCartesian, f'/{namespace}/localization/lat_lon_to_xy')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/localization/lat_lon_to_xy service not available, waiting again...')
def send_request(self):
req = ConvertLatLonToCartesian.Request()
req.msg = LAT_LON_MSG
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.convert_latlon_client_srv_response_cb)
def convert_latlon_client_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Converted lat/lon to xy succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to convert lat/lon to xy!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
convert_latlon_client = ConvertLatLonToCartesian(namespace)
convert_latlon_client.send_request()
rclpy.spin(convert_latlon_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_localization_msgs/srv/convert_lat_lon_to_cartesian.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("convert_latlon_client");
rclcpp::Client<clearpath_localization_msgs::srv::ConvertLatLonToCartesian>::SharedPtr client =
node->create_client<clearpath_localization_msgs::srv::ConvertLatLonToCartesian>("/" + namespace_ + "/" + std::string("localization/lat_lon_to_xy"));
auto request = std::make_shared<clearpath_localization_msgs::srv::ConvertLatLonToCartesian::Request>();
auto latlon_msg = sensor_msgs::msg::NavSatFix;
latlon_msg.latitude = ;
latlon_msg.longitude = ;
request->msg = latlon_msg
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("convert_latlon_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("convert_latlon_client"), "/%s/localization/lat_lon_to_xy service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("convert_latlon_client"), "Converted lat/lon to xy succesfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("convert_latlon_client"), "Failed to convert lat/lon to xy!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("convert_latlon_client"), "Failed to call service /%s/localization/lat_lon_to_xy", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
localization/lat_lon_to_xy_array
Convert a set of lat/lon coordinates to map XY coordinates.
-
Service Name: localization/lat_lon_to_xy_array
-
Service Type: clearpath_localization_msgs/srv/ConvertLatLonToCartesianArray
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_localization_msgs.srv import ConvertLatLonToCartesianArray
from sensor_msgs.msg import NavSatFix
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ConvertLatLonToCartesianArray'
# TODO: Enter your lat/lon coordinates
LAT_LON_MSG1 = NavSatFix(
latitude=,
longitude=,
)
LAT_LON_MSG2 = NavSatFix(
latitude=,
longitude=,
)
class ConvertLatLonToCartesianArray(Node):
def __init__(self, namespace: str):
super().__init__('convert_latlon_client')
self._srv_client = self.create_client(ConvertLatLonToCartesianArray, f'/{namespace}/localization/lat_lon_to_xy_array')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/localization/lat_lon_to_xy_array service not available, waiting again...')
def send_request(self):
req = ConvertLatLonToCartesianArray.Request()
req.msg = [LAT_LON_MSG1, LAT_LON_MSG2]
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.convert_latlon_client_srv_response_cb)
def convert_latlon_client_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Converted lat/lon to xy succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to convert lat/lon to xy!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
convert_latlon_client = ConvertLatLonToCartesianArray(namespace)
convert_latlon_client.send_request()
rclpy.spin(convert_latlon_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_localization_msgs/srv/convert_lat_lon_to_cartesian_array.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("convert_latlon_client");
rclcpp::Client<clearpath_localization_msgs::srv::ConvertLatLonToCartesianArray>::SharedPtr client =
node->create_client<clearpath_localization_msgs::srv::ConvertLatLonToCartesianArray>("/" + namespace_ + "/" + std::string("localization/lat_lon_to_xy_array"));
auto request = std::make_shared<clearpath_localization_msgs::srv::ConvertLatLonToCartesianArray::Request>();
auto latlon_msg1 = sensor_msgs::msg::NavSatFix;
latlon_msg1.latitude = ;
latlon_msg1.longitude = ;
auto latlon_msg1 = sensor_msgs::msg::NavSatFix;
latlon_msg2.latitude = ;
latlon_msg2.longitude = ;
request->msg = std::vector<sensor_msgs::msg::NavSatFix>{latlon_msg1, latlon_msg2}
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("convert_latlon_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("convert_latlon_client"), "/%s/localization/lat_lon_to_xy_array service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("convert_latlon_client"), "Converted lat/lon to xy succesfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("convert_latlon_client"), "Failed to convert lat/lon to xy!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("convert_latlon_client"), "Failed to call service /%s/localization/lat_lon_to_xy_array", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
localization/xy_to_lat_lon
Convert map xy coordinate to lat/lon coordinates.
-
Service Name: localization/xy_to_lat_lon
-
Service Type: clearpath_localization_msgs/srv/ConvertCartesianToLatLon
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_localization_msgs.srv import ConvertCartesianToLatLon
from geometry_msgs.msg import PoseStamped, Pose, Point
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ConvertCartesianToLatLon'
# TODO: Enter your xy coordinates
POSE_MSG = PoseStamped(
pose=Pose(
position=Point(
x=,
y=
)
)
)
class ConvertCartesianToLatLon(Node):
def __init__(self, namespace: str):
super().__init__('convert_xy_client')
self._srv_client = self.create_client(ConvertCartesianToLatLon, f'/{namespace}/localization/xy_to_lat_lon')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/localization/xy_to_lat_lon service not available, waiting again...')
def send_request(self):
req = ConvertCartesianToLatLon.Request()
req.pose = POSE_MSG
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.convert_xy_client_srv_response_cb)
def convert_xy_client_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Converted xy to lat/lon succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to convert xy to lat/lon!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
convert_xy_client = ConvertCartesianToLatLon(namespace)
convert_xy_client.send_request()
rclpy.spin(convert_xy_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_localization_msgs/srv/convert_cartesian_to_lat_lon.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("convert_xy_client");
rclcpp::Client<clearpath_localization_msgs::srv::ConvertCartesianToLatLon>::SharedPtr client =
node->create_client<clearpath_localization_msgs::srv::ConvertCartesianToLatLon>("/" + namespace_ + "/" + std::string("localization/xy_to_lat_lon"));
auto request = std::make_shared<clearpath_localization_msgs::srv::ConvertCartesianToLatLon::Request>();
auto pose_msg = geometry_msgs::msg::PoseStamped;
pose_msg.pose.position.x = ;
pose_msg.pose.position.y = ;
request->pose = pose_msg
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("convert_xy_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("convert_xy_client"), "/%s/localization/xy_to_lat_lon service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("convert_xy_client"), "Converted xy to lat/lon succesfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("convert_xy_client"), "Failed to convert xy to lat/lon!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("convert_xy_client"), "Failed to call service /%s/localization/xy_to_lat_lon", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}
localization/xy_to_lat_lon_array
Convert map xy coordinate to lat/lon coordinates.
-
Service Name: localization/xy_to_lat_lon_array
-
Service Type: clearpath_localization_msgs/srv/ConvertCartesianToLatLonArray
- Python
- C++
from clearpath_config.common.utils.yaml import read_yaml
import rclpy
from rclpy.node import Node
from clearpath_localization_msgs.srv import ConvertCartesianToLatLonArray
from geometry_msgs.msg import PoseStamped, Pose, Point
ROBOT_CONFIG_PATH = '/etc/clearpath/robot.yaml'
LOGGING_NAME = 'ConvertCartesianToLatLonArray'
# TODO: Enter your xy coordinates
POSE_MSG1 = PoseStamped(
pose=Pose(
position=Point(
x=,
y=
)
)
)
POSE_MSG2 = PoseStamped(
pose=Pose(
position=Point(
x=,
y=
)
)
)
class ConvertCartesianToLatLonArray(Node):
def __init__(self, namespace: str):
super().__init__('convert_xy_client')
self._srv_client = self.create_client(ConvertCartesianToLatLonArray, f'/{namespace}/localization/xy_to_lat_lon_array')
while not self._srv_client.wait_for_service(timeout_sec=2.0):
self.get_logger().info(f'[{LOGGING_NAME}] /{namespace}/localization/xy_to_lat_lon_array service not available, waiting again...')
def send_request(self):
req = ConvertCartesianToLatLonArray.Request()
req.pose = [POSE_MSG1, POSE_MSG2]
self.future = self._srv_client.call_async(req)
self.future.add_done_callback(self.convert_xy_client_srv_response_cb)
def convert_xy_client_srv_response_cb(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'[{LOGGING_NAME}] Converted xy to lat/lon succesfully!')
else:
self.get_logger().error(f'[{LOGGING_NAME}] Failed to convert xy to lat/lon!')
except Exception as e:
self.get_logger().error(f'[{LOGGING_NAME}] Service call failed: {e}')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
robot_config = read_yaml(ROBOT_CONFIG_PATH)
namespace = robot_config['system']['ros2']['namespace']
convert_xy_client = ConvertCartesianToLatLonArray(namespace)
convert_xy_client.send_request()
rclpy.spin(convert_xy_client)
if __name__ == '__main__':
main()
#include "rclcpp/rclcpp.hpp"
#include "clearpath_localization_msgs/srv/convert_cartesian_to_lat_lon_array.hpp"
#include <chrono>
#include <cstdlib>
#include <memory>
#include <yaml-cpp/yaml.h>
using namespace std::chrono_literals;
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
YAML::Node robot_config = YAML::LoadFile("/etc/clearpath/robot.yaml");
std::string namespace_ = robot_config["system"]["ros2"]["namespace"].as<std::string>();
std::shared_ptr<rclcpp::Node> node = rclcpp::Node::make_shared("convert_xy_client");
rclcpp::Client<clearpath_localization_msgs::srv::ConvertCartesianToLatLonArray>::SharedPtr client =
node->create_client<clearpath_localization_msgs::srv::ConvertCartesianToLatLonArray>("/" + namespace_ + "/" + std::string("localization/xy_to_lat_lon_array"));
auto request = std::make_shared<clearpath_localization_msgs::srv::ConvertCartesianToLatLonArray::Request>();
auto pose_msg1 = geometry_msgs::msg::PoseStamped;
pose_msg1.pose.position.x = ;
pose_msg1.pose.position.y = ;
auto pose_msg2 = geometry_msgs::msg::PoseStamped;
pose_msg2.pose.position.x = ;
pose_msg2.pose.position.y = ;
request->pose = std::vector<geometry_msgs::msg::PoseStamped>(pose_msg1, pose_msg2)
while (!client->wait_for_service(2s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("convert_xy_client"), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(rclcpp::get_logger("convert_xy_client"), "/%s/localization/xy_to_lat_lon_array service not available, waiting again...", namespace_.c_str());
}
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) ==
rclcpp::FutureReturnCode::SUCCESS)
{
if (result.get()->success)
{
RCLCPP_INFO(rclcpp::get_logger("convert_xy_client"), "Converted xy to lat/lon succesfully!");
}
else
{
RCLCPP_ERROR(rclcpp::get_logger("convert_xy_client"), "Failed to convert xy to lat/lon!");
}
} else {
RCLCPP_ERROR(rclcpp::get_logger("convert_xy_client"), "Failed to call service /%s/localization/xy_to_lat_lon_array", namespace_.c_str());
}
rclcpp::shutdown();
return 0;
}