Commit e629ad7f authored by Thomas Coffy's avatar Thomas Coffy
Browse files

Initial commit

parents
# IVS ROS2 Process example job
## Simple Automatic line chart generation from ROS2 recording example.
The aim of this IVS job example is to provide a quick and automated processing example solution for the IVS platform.
Chart data generation is performed using a python ROS2 node.
Outputs:
```
{
'axis': {
'x': 'minutes',
'y': 'm/s'
},
'series': [
{
'name': 'Speed',
'data': [
{
'x': 1,
'y': 0.06021586592173569
}
]
}
]
}
```
Building:
process example job:
export REPO_NAME=499527426393.dkr.ecr.eu-west-3.amazonaws.com
docker build -f docker/job_ros2_process_example.Dockerfile -t ivs/jobs/ros2/process_example:latest .
docker tag ivs/jobs/ros2/process_example:latest $REPO_NAME/ivs/jobs/ros2/process_example:latest
docker push $REPO_NAME/ivs/jobs/ros2/process_example:latest
Release:
export REPO_NAME=499527426393.dkr.ecr.eu-west-3.amazonaws.com
export TAG_BASE_NAME=ivs-releases/jobs/ros2/process_example
export TAG_VERSION=1.0.0
aws ecr create-repository --repository-name $TAG_BASE_NAME --image-tag-mutability IMMUTABLE
docker tag ivs/jobs/ros2/process_example:latest $REPO_NAME/$TAG_BASE_NAME:$TAG_VERSION
docker push $REPO_NAME/$TAG_BASE_NAME:$TAG_VERSION
FROM ivs/jobs/ros2/ros2-ros1_bridge:latest
# User "job" from parent
ARG DOCKER_USER_NAME="job"
ARG DOCKER_USER_HOME="/home/${DOCKER_USER_NAME}"
ENV ROS2_JOB_DIR="/home/${DOCKER_USER_NAME}/embedded_resources/"
RUN mkdir -p ${ROS2_JOB_DIR}
ENV ROS_DISTRO=foxy
# define ROS2 package
ENV ROS2_PACKAGE=ivs_ros2_process_example_pkg
ENV ROS2_LAUNCH_FILE=process_example_launch.py
ENV ROS2_REPLAY_RATE=1
# Copy python tagger
RUN mkdir -p ${ROS2_JOB_DIR}/${ROS2_PACKAGE}
COPY ${ROS2_PACKAGE} ${ROS2_JOB_DIR}/${ROS2_PACKAGE}
RUN sudo chown -R ${DOCKER_USER_NAME} "/home/${DOCKER_USER_NAME}/"
# Set the new user/home
USER "${DOCKER_USER_NAME}"
WORKDIR "${DOCKER_USER_HOME}"
# Build ROS2 package
RUN . /opt/ros/${ROS_DISTRO}/setup.sh && cd ${ROS2_JOB_DIR}/${ROS2_PACKAGE} && colcon build
CMD echo "Launching ${ROS2_PACKAGE}:" \
&& cd ${ROS2_JOB_DIR} \
&& . ${ROS2_PACKAGE}/install/setup.sh \
&& /home/job/scripts/run.sh
\ No newline at end of file
#!/usr/bin/env python3
import json
import rclpy
import signal
import sys
import os
import datetime
import numpy as np
import math
from rclpy.node import Node
import gps_msgs.msg
JSON_OUT_DIRECTORY = '/ivsci/output/'
JSON_FILE_PREFIX = 'astronomical_'
JSON_OUTPUT_PERIOD = 1000000 # 1 seconds (ts are in ms)
ASTRAL_MAX_PERIOD = 1*60*1000000 # 1 min between checks
class ivs_ros2_process_example_node(Node):
def __init__(self, config_inputs):
super().__init__('ivs_ros2_process_example_node')
self.verbose = True
self.subscription = self.create_subscription(gps_msgs.msg.GPSFix, self.get_topic_from_config(config_inputs, 'gps_fix'),
self.listener_callback_gpsfix, 10)
self.last_request_ts = 0
self.last_info = 0
self.first_ts = 0
self.start_ts = 0
self.end_ts = 0
self.tag_raw_position_lat = 0
self.tag_raw_position_lon = 0
self.tag_time = 0
self.speed_values = []
self.speed_values_tag = []
self.bar_data = {
"axis": {
"x": "minutes",
"y": "m/s"
},
"series": [{
"name": "Speed",
"data": []
}
#,{
#"name": "Serie 2",
#"data": []
#}
]
}
def __del__(self):
if len(self.speed_values) > 0 and self.first_ts is not None:
mean = np.mean(self.speed_values)
self.bar_data["series"][0]["data"].append({
"x": abs(math.floor((self.start_ts - self.first_ts) / 1000000)),
"y": mean})
self.speed_values = []
self.start_ts += 10000000
self.speed_values.append(self.tag_speed)
self.speed_values_tag.append(self.tag_speed)
print("Writing means.json output file: {}".format(self.bar_data))
with open(os.path.join(JSON_OUT_DIRECTORY, "means.json"), 'w') as fp:
json.dump(self.bar_data, fp)
def get_topic_from_config(self, inputs, name):
topic = ""
try:
for i in inputs:
if i['name'] == name:
topic = i['stream']
break
except Exception as e:
print('Exception: ' + str(e))
if self.verbose:
print("get_topic_from_config: name=" + name + " => topic: " + topic)
return topic
def listener_callback_gpsfix(self, msg):
#self.get_logger().info('I heard gps_lat_lon! "%s"' % msg)
self.tag_raw_position_lat = msg.latitude
self.tag_raw_position_lon = msg.longitude
self.tag_speed = msg.speed
self.tag_time = self.get_time(msg.header.stamp)
self.process()
# convert ROS2 timestamps (secs, nanosec) into microseconds since epoch
def get_time(self, stamp):
return stamp.sec*1000000 + stamp.nanosec/1000
def process(self):
if self.tag_time is None:
return
now = self.tag_time
try:
if self.first_ts == 0:
self.first_ts = now
if self.start_ts is None or self.start_ts == 0: # first run
self.start_ts = (int(now / JSON_OUTPUT_PERIOD)) * JSON_OUTPUT_PERIOD
self.end_ts = self.start_ts + JSON_OUTPUT_PERIOD
# Fill bar data every 10 seconds
if now > (self.start_ts + 10000000):
mean = np.mean(self.speed_values)
self.bar_data["series"][0]["data"].append({
"x": abs(math.floor((self.start_ts - self.first_ts) /1000000)),
"y": mean})
self.speed_values = []
self.start_ts += 10000000
self.speed_values.append(self.tag_speed)
self.speed_values_tag.append(self.tag_speed)
except Exception as e:
print('Exception: ' + str(e))
# convert output from GPS frame decoder (NMEA GPRMC)
def decode_time(self, in_time_ms):
return datetime.datetime.fromtimestamp(in_time_ms / 1000.0, tz=datetime.timezone.utc)
def signal_handler(sig, frame):
print('SIGINT received, leaving.')
sys.exit(0)
def parse_config_inputs(inputs_file):
try:
f = open(inputs_file,)
config_inputs = json.load(f)
f.close()
return config_inputs
except ValueError as err: # includes simplejson.decoder.JSONDecodeError
print('Decoding JSON has failed: ', err)
exit(1)
def main(args=None):
# create json directory if needed
if not os.path.exists(JSON_OUT_DIRECTORY):
os.makedirs(JSON_OUT_DIRECTORY)
# get input config
config_inputs = parse_config_inputs('/ivsci/config/inputs')
if len(config_inputs) <= 0:
exit(1)
print("Setting signal handler for SIGINT")
signal.signal(signal.SIGINT, signal_handler)
rclpy.init(args=args)
process_example_node = ivs_ros2_process_example_node(config_inputs)
rclpy.spin(process_example_node)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
process_example_node.destroy_node()
rclpy.shutdown()
return 0
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# IVS ROS2 rosbag player
# (C) 2021 Intempora
import subprocess
import json
import os
import time
def get_topic_from_config(inputs, name):
try:
for i in inputs:
if i['name'] == name:
return i['stream']
except Exception as e:
print('Exception: ' + str(e))
return None
def parse_config_inputs(inputs_file):
try:
f = open(inputs_file, )
config_inputs = json.load(f)
f.close()
return config_inputs
except ValueError as err: # includes simplejson.decoder.JSONDecodeError
print('Decoding JSON has failed: ', err)
return []
def parse_config_parameters(param_file):
try:
f = open(param_file, )
params = json.load(f)
f.close()
print('params: ', params)
for p in params:
if 'name' in p and p['name'] == 'replay_rate':
return p['value']
except Exception as err: # includes simplejson.decoder.JSONDecodeError
print('Decoding parameters JSON has failed: ', err)
return '1'
def main(args=None):
print("#########################################################")
print("# IVS ROS2 bag player #")
print("#########################################################")
# get input config
config_inputs = parse_config_inputs('/ivsci/config/inputs')
replay_rate = parse_config_parameters('/ivsci/config/parameters')
print(config_inputs)
print("replay_rate: ", replay_rate)
topics_wanted = ""
reliability_override_file_content = ""
for i in config_inputs:
topics_wanted += i['stream'] + " "
reliability_override_file_content += i['stream'] + ':\n reliability: reliable\n history: keep_all\n'
# change QoS to reliable by creating a file reliability_override.yaml
f = open("/tmp/reliability_override.yaml", "w")
f.write(reliability_override_file_content)
f.close()
rosbag_file = config_inputs[0]['recording_file']
# if rosbag_file ends with .bag then we use rosbag_v2 plugin
rosbag_plugin_or_not = ' '
if rosbag_file.lower().endswith(".bag"):
rosbag_plugin_or_not = ' -s rosbag_v2 '
# if rosbag_file ends with .db3 use directory
if rosbag_file.lower().endswith(".db3"):
rosbag_file = os.path.realpath(rosbag_file)
rosbag_file = os.path.dirname(rosbag_file)
ros2_command = 'ros2 bag play -r ' + str(replay_rate) + rosbag_plugin_or_not + rosbag_file \
+ ' --topics ' + topics_wanted \
+ ' --qos-profile-overrides-path /tmp/reliability_override.yaml'
command = ["/bin/sh", "-c", ros2_command]
print("rosbag_player: file to replay: ", rosbag_file)
print("rosbag_player: topics_wanted: ", topics_wanted)
print("rosbag_player: command to launch: ", command)
result = subprocess.Popen(command)
result.wait()
print("rosbag_player: replay done, waiting 5 sec.")
time.sleep(5) # TODO make it a parameter
print("rosbag_player: ", command, " returned:", result.returncode)
return result.returncode
if __name__ == '__main__':
main()
\ No newline at end of file
import launch
from launch import LaunchDescription
from launch_ros.actions import Node
import os
import sys
from glob import glob
import time
import signal
import launch_ros
from launch_ros.actions import Node
from launch_ros.actions import LifecycleNode
from launch_ros.events.lifecycle import ChangeState
from launch_ros.event_handlers import OnStateTransition
import lifecycle_msgs.msg
def generate_launch_description():
ld = LaunchDescription()
process_node = Node(
emulate_tty=True,
package="ivs_ros2_process_example_pkg",
executable="process_example",
output={
'stdout': 'screen',
'stderr': 'screen',
}
)
# there may exist a better way to execute rosbag_player.py
rosbag_player_cmd = launch.substitutions.Command(command='/bin/sh -c "find . -name rosbag_player.py|head -n 1"')
rosbag_player = launch.actions.ExecuteProcess(
emulate_tty=True,
cmd=["python", rosbag_player_cmd],
shell=True,
on_exit=[
launch.actions.LogInfo(msg=["========== rosbag_player done. Stopping everything"]),
launch.actions.Shutdown(reason="rosbag_player ended.")
],
output={
'stdout': 'screen',
'stderr': 'screen',
}
)
ld.add_action(process_node)
ld.add_action(rosbag_player)
return ld
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>ivs_ros2_process_example_pkg</name>
<version>0.0.1</version>
<description>IVS ROS2 processing example job</description>
<maintainer email="thomas.coffy@intempora.com">thomas</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
[develop]
script-dir=$base/lib/ivs_ros2_process_example_pkg
[install]
install-scripts=$base/lib/ivs_ros2_process_example_pkg
import os
from glob import glob
from setuptools import setup
package_name = 'ivs_ros2_process_example_pkg'
setup(
name=package_name,
version='0.0.1',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*_launch.py')),
(os.path.join('share', package_name, 'package_name'), glob('*.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='thomas',
maintainer_email='thomas.coffy@intempora.com',
description='IVS ROS2 Processing Example',
license='Copyright Intempora (c) 2022',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'process_example = ivs_ros2_process_example_pkg.ivs_ros2_process_example:main'
],
}
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment