Multithreading
This advanced-level tutorial explains the concept of multithreaded node in the context of libuavcan, and shows an example application that runs a multithreaded node.
The reader must be familiar with the specification, particularly with the specification of the transport layer.
Motivation
An application that runs a DroneCAN node is often required to run hard real-time UAVCAN-related processes side by side with less time-critical ones. Making both co-exist in the same execution thread can be challenging because of radically different requirements and constraints. For example, part of the application that controls actuators over UAVCAN may require very low latency, whereas a non-realtime component (e.g. a firmware update trigger) does not require real-time priority, but in turn may have to perform long blocking I/O operations, such as file system access, etc.
A radical solution to this problem is to run the node in two (or more) execution threads or processes. This enables the designer to put hard real-time tasks in a dedicated thread, while running all low-priority and/or blocking processes in lower-priority threads.
Architecture
Libuavcan allows to add low-priority threads by means of adding sub-nodes, decoupled from the main node via a virtual CAN driver. In this tutorial we’ll be reviewing a use case with just one sub-node, but the approach can be scaled to more sub-nodes if necessary by means of daisy-chaining them together with extra virtual drivers.
A virtual CAN driver is a class that implements uavcan::ICanDriver
(see libuavcan porting guide for details).
An object of this class is fed to the sub-node in place of a real CAN driver.
Transmission
The sub-node emits its outgoing (TX) CAN frames into the virtual driver, where they get enqueued in a synchronized prioritized queue. The main node periodically unloads the enqueued TX frames from the virtual driver into its own prioritized TX queue, which then gets flushed into the CAN driver, thus completing the pipeline.
Injection of TX frames from sub-node to the main node’s queue is done via uavcan::INode::injectTxFrame(..)
.
Reception
The main node simply duplicates all incoming (RX) CAN frames to the virtual CAN driver.
This is done via the interface uavcan::IRxFrameListener
,
which is installed via the method uavcan::INode::installRxFrameListener(uavcan::IRxFrameListener*)
.
Multiprocessing
A node may be implemented not only in multiple threads, but in multiple processes as well (with isolated address spaces).
In this case, every sub-node will be accessing the CAN hardware as an independent node, but it will be using the same node ID in all communications, therefore all sub-nodes and the main node will appear on the bus as the same network participant.
We introduce a new term here - compound node - which refers to either a multithreaded or a multiprocessed node.
Limitations
A curious reader could have noticed that the above proposed approaches are in conflict with requirements of the transport layer specification. Indeed, there is one corner case that has to be kept in mind when working with compound nodes.
The transport layer specification introduces the concept of transfer ID map, which is mandatory in order to assign proper transfer ID values to outgoing transfers. The main node and its sub-nodes cannot use a shared transfer ID map, therefore multiple sub-nodes must not simultaneously publish transfers with identical descriptors.
This point can be expressed in higher-level terms:
- Every sub-node of a compound node may receive any incoming transfers without limitations.
- Only one sub-node can implement a certain publisher or service server.
Note that the class uavcan::SubNode
does not publish uavcan.protocol.NodeStatus
and
does not provide service uavcan.protocol.GetNodeInfo
and such (actually it doesn’t implement any
publishers or servers or such at all), because this functionality is already provided by the main node class.
Example
Virtual CAN driver
/**
* This header implements a virtual CAN driver for multi-threaded libuavcan nodes.
* It is supposed to be connected to the secondary node (aka sub-node) in place of a real CAN driver.
* Once connected, the virtual driver will be redirecting outgoing CAN frames to the main node, and
* it will be copying RX frames received by the main node to the secondary node (i.e. every RX frame will
* go into the main node AND the secondary node).
*
* @file uavcan_virtual_driver.hpp
* @author Pavel Kirienko <pavel.kirienko@zubax.com>
*
* The source code contained in this file is distributed under the terms of CC0 (public domain dedication).
*/
#pragma once
#include <iostream> // For std::cout
#include <thread> // For std::mutex
#include <condition_variable> // For std::condition_variable
#include <uavcan/uavcan.hpp> // Main libuavcan header
namespace uavcan_virtual_driver
{
/**
* Generic queue based on the linked list class defined in libuavcan.
* It does not use heap memory, but uses the libuavcan's deterministic pool allocator.
*
* This class is used to implement synchronized RX queue of the secondary thread.
*/
template <typename T>
class Queue
{
struct Item : public uavcan::LinkedListNode<Item>
{
T payload;
template <typename... Args>
Item(Args... args) : payload(args...) { }
};
uavcan::LimitedPoolAllocator allocator_;
uavcan::LinkedListRoot<Item> list_;
public:
/**
* @param arg_allocator Used to allocate memory for stored items.
*
* @param block_allocation_quota Maximum number of memory blocks this queue can take from the allocator.
* Defines the depth of the queue.
*/
Queue(uavcan::IPoolAllocator& arg_allocator,
std::size_t block_allocation_quota) :
allocator_(arg_allocator, block_allocation_quota)
{
uavcan::IsDynamicallyAllocatable<Item>::check();
}
bool isEmpty() const { return list_.isEmpty(); }
/**
* Creates one item in-place at the end of the list.
* Returns true if the item has been appended successfully, false if there's not enough memory.
* Complexity is O(N) where N is queue length.
*/
template <typename... Args>
bool tryEmplace(Args... args)
{
// Allocating memory
void* const ptr = allocator_.allocate(sizeof(Item));
if (ptr == nullptr)
{
return false;
}
// Constructing the new item
Item* const item = new (ptr) Item(args...);
assert(item != nullptr);
// Inserting the new item at the end of the list
Item* p = list_.get();
if (p == nullptr)
{
list_.insert(item);
}
else
{
while (p->getNextListNode() != nullptr)
{
p = p->getNextListNode();
}
assert(p->getNextListNode() == nullptr);
p->setNextListNode(item);
assert(p->getNextListNode()->getNextListNode() == nullptr);
}
return true;
}
/**
* Accesses the first element.
* Nullptr will be returned if the queue is empty.
* Complexity is O(1).
*/
T* peek() { return isEmpty() ? nullptr : &list_.get()->payload; }
const T* peek() const { return isEmpty() ? nullptr : &list_.get()->payload; }
/**
* Removes the first element.
* If the queue is empty, nothing will be done and assertion failure will be triggered.
* Complexity is O(1).
*/
void pop()
{
Item* const item = list_.get();
assert(item != nullptr);
if (item != nullptr)
{
list_.remove(item);
item->~Item();
allocator_.deallocate(item);
}
}
};
/**
* This class implements one virtual interface.
*
* Objects of this class are owned by the secondary thread.
* This class does not use heap memory, instead it uses a block allocator provided by reference to the constructor.
*/
class Iface final : public uavcan::ICanIface,
uavcan::Noncopyable
{
struct RxItem
{
const uavcan::CanRxFrame frame;
const uavcan::CanIOFlags flags;
RxItem(const uavcan::CanRxFrame& arg_frame,
uavcan::CanIOFlags arg_flags) :
frame(arg_frame),
flags(arg_flags)
{ }
};
std::mutex& mutex_;
uavcan::CanTxQueue prioritized_tx_queue_;
Queue<RxItem> rx_queue_;
/**
* Implements uavcan::ICanDriver. Will be invoked by the sub-node.
*/
std::int16_t send(const uavcan::CanFrame& frame,
uavcan::MonotonicTime tx_deadline,
uavcan::CanIOFlags flags) override
{
std::lock_guard<std::mutex> lock(mutex_);
prioritized_tx_queue_.push(frame, tx_deadline, uavcan::CanTxQueue::Volatile, flags);
return 1;
}
/**
* Implements uavcan::ICanDriver. Will be invoked by the sub-node.
*/
std::int16_t receive(uavcan::CanFrame& out_frame,
uavcan::MonotonicTime& out_ts_monotonic,
uavcan::UtcTime& out_ts_utc,
uavcan::CanIOFlags& out_flags) override
{
std::lock_guard<std::mutex> lock(mutex_);
if (rx_queue_.isEmpty())
{
return 0;
}
const auto item = *rx_queue_.peek();
rx_queue_.pop();
out_frame = item.frame;
out_ts_monotonic = item.frame.ts_mono;
out_ts_utc = item.frame.ts_utc;
out_flags = item.flags;
return 1;
}
/**
* Stubs for uavcan::ICanDriver. Will be invoked by the sub-node.
* These methods are meaningless for a virtual interface.
*/
std::int16_t configureFilters(const uavcan::CanFilterConfig*, std::uint16_t) override { return -uavcan::ErrDriver; }
std::uint16_t getNumFilters() const override { return 0; }
std::uint64_t getErrorCount() const override { return 0; }
public:
/**
* This class should be instantiated by the secondary thread.
*
* @param allocator Needed to provide storage for the prioritized TX queue and the RX queue.
*
* @param clock Needed for the TX queue so it can discard stale frames.
*
* @param arg_mutex Needed because the main and the secondary threads communicate through this class,
* therefore it must be thread-safe.
*
* @param quota_per_queue Defines how many frames the queues, both RX and TX, can accommodate.
*/
Iface(uavcan::IPoolAllocator& allocator,
uavcan::ISystemClock& clock,
std::mutex& arg_mutex,
unsigned quota_per_queue) :
mutex_(arg_mutex),
prioritized_tx_queue_(allocator, clock, quota_per_queue),
rx_queue_(allocator, quota_per_queue)
{ }
/**
* This method adds one frame to the RX queue of the secondary thread.
* It is invoked by the main thread when the node receives a frame from the bus.
*
* Note that RX queue overwrites oldest items when overflowed.
* No additional locking is required - this method is thread-safe. Call from the main thread only.
*
* @param frame The frame to be received by the sub-node.
*
* @param flags Flags associated with the frame. See @ref uavcan::CanIOFlags for available flags.
*/
void addRxFrame(const uavcan::CanRxFrame& frame,
uavcan::CanIOFlags flags)
{
std::lock_guard<std::mutex> lock(mutex_);
if (!rx_queue_.tryEmplace(frame, flags) && !rx_queue_.isEmpty())
{
rx_queue_.pop();
(void)rx_queue_.tryEmplace(frame, flags);
}
}
/**
* This method flushes frames from the sub-node's TX queue into the main node's TX queue.
*
* No additional locking is required - this method is thread-safe. Call from the main thread only.
*
* @param main_node Reference to the main node, which will receive the frames.
*
* @param iface_index Index of the interface in which the frames will be sent.
*/
void flushTxQueueTo(uavcan::INode& main_node,
std::uint8_t iface_index)
{
std::lock_guard<std::mutex> lock(mutex_);
const std::uint8_t iface_mask = static_cast<std::uint8_t>(1U << iface_index);
while (auto e = prioritized_tx_queue_.peek())
{
#if !NDEBUG && UAVCAN_TOSTRING
std::cout << "uavcan_virtual_driver::Iface: TX injection [iface_index=" << int(iface_index) << "]: "
<< e->toString() << std::endl;
#endif
const int res = main_node.injectTxFrame(e->frame, e->deadline, iface_mask,
uavcan::CanTxQueue::Qos(e->qos), e->flags);
prioritized_tx_queue_.remove(e);
if (res <= 0)
{
break;
}
}
}
/**
* This method reports whether there's data for the sub-node to read from the RX thread.
*
* No additional locking is required - this method is thread-safe. Call from the secondary thread only.
*/
bool hasDataInRxQueue() const
{
std::lock_guard<std::mutex> lock(mutex_);
return !rx_queue_.isEmpty();
}
};
/**
* This interface defines one method that will be called by the main node thread periodically in order to
* transfer contents of TX queue of the sub-node into the TX queue of the main node.
*/
class ITxQueueInjector
{
public:
virtual ~ITxQueueInjector() { }
/**
* Flush contents of TX queues into the main node's prioritized TX queue.
* @param main_node Reference to the main node.
*/
virtual void injectTxFramesInto(uavcan::INode& main_node) = 0;
};
/**
* This is the main, user-facing part of the virtual driver.
* This class will be instantiated by the application and passed into the sub-node as its CAN interface.
*
* Objects of this class are owned by the secondary thread.
* This class does not use heap memory, instead it uses a block allocator provided by reference to the constructor.
*/
class Driver final : public uavcan::ICanDriver,
public uavcan::IRxFrameListener,
public ITxQueueInjector,
uavcan::Noncopyable
{
/**
* Basic synchronization object. Can be replaced with whatever is appropriate for the target platform.
*/
class Event
{
std::mutex m_;
std::condition_variable cv_;
public:
/**
* Note that in this implementation this method may return spuriously, which is OK.
*/
void waitFor(uavcan::MonotonicDuration duration)
{
std::unique_lock<std::mutex> lk(m_);
(void)cv_.wait_for(lk, std::chrono::microseconds(duration.toUSec()));
}
void signal() { cv_.notify_all(); }
};
Event event_; ///< Used to unblock the sub-node's select() call when IO happens.
std::mutex mutex_; ///< Shared across all ifaces
uavcan::LazyConstructor<Iface> ifaces_[uavcan::MaxCanIfaces];
const unsigned num_ifaces_;
uavcan::ISystemClock& clock_;
/**
* Implements uavcan::ICanDriver. Will be invoked by the sub-node.
*/
uavcan::ICanIface* getIface(std::uint8_t iface_index) override
{
return (iface_index < num_ifaces_) ? ifaces_[iface_index].operator Iface*() : nullptr;
}
/**
* Implements uavcan::ICanDriver. Will be invoked by the sub-node.
*/
std::uint8_t getNumIfaces() const override { return num_ifaces_; }
/**
* Implements uavcan::ICanDriver. Will be invoked by the sub-node.
*/
std::int16_t select(uavcan::CanSelectMasks& inout_masks,
const uavcan::CanFrame* (&)[uavcan::MaxCanIfaces],
uavcan::MonotonicTime blocking_deadline) override
{
bool need_block = (inout_masks.write == 0); // Write queue is infinite
for (unsigned i = 0; need_block && (i < num_ifaces_); i++)
{
const bool need_read = inout_masks.read & (1U << i);
if (need_read && ifaces_[i]->hasDataInRxQueue())
{
need_block = false;
}
}
if (need_block)
{
event_.waitFor(blocking_deadline - clock_.getMonotonic());
}
inout_masks = uavcan::CanSelectMasks();
for (unsigned i = 0; i < num_ifaces_; i++)
{
const std::uint8_t iface_mask = 1U << i;
inout_masks.write |= iface_mask; // Always ready to write
if (ifaces_[i]->hasDataInRxQueue())
{
inout_masks.read |= iface_mask;
}
}
return num_ifaces_; // We're always ready to write, hence > 0.
}
/**
* Implements uavcan::IRxFrameListener. Will be invoked by the main node.
*/
void handleRxFrame(const uavcan::CanRxFrame& frame,
uavcan::CanIOFlags flags) override
{
#if !NDEBUG && UAVCAN_TOSTRING
std::cout << "uavcan_virtual_driver::Driver: RX [flags=" << flags << "]: "
<< frame.toString(uavcan::CanFrame::StrAligned) << std::endl;
#endif
if (frame.iface_index < num_ifaces_)
{
ifaces_[frame.iface_index]->addRxFrame(frame, flags);
event_.signal();
}
}
/**
* Implements ITxQueueInjector. Will be invoked by the main thread.
*/
void injectTxFramesInto(uavcan::INode& main_node) override
{
for (unsigned i = 0; i < num_ifaces_; i++)
{
ifaces_[i]->flushTxQueueTo(main_node, i);
}
event_.signal();
}
public:
/**
* This class should be instantiated by the secondary thread.
*
* @param arg_num_ifaces The number of interfaces defines how many virtual interfaces will be instantiated
* by this class. IT DOESN'T HAVE TO MATCH THE NUMBER OF PHYSICAL INTERFACES THE MAIN
* NODE IS USING - it can be less than that (but not greater), in which case the
* sub-node will be using only interfaces with lower indices, which may be fine
* (depending on the application's requirements). For example, if the main node has
* three interfaces with indices 0, 1, 2, and the virtual driver implements only two,
* the sub-node will only have access to interfaces 0 and 1.
*
* @param clock Needed by the virtual iface class, and for select() timing.
*
* @param shared_allocator This allocator will be used to keep inter-thread queues.
*
* @param block_allocation_quota_per_virtual_iface Maximum number of blocks that can be allocated per virtual
* iface. Ifaces use dynamic memory to keep RX/TX queues, so
* higher quota enables deeper queues. Note that ifaces DO NOT
* allocate memory unless they need it, i.e. the memory will not
* be taken unless there is data to enqueue; once a queue is
* flushed, the memory will be immediately freed.
* One block fits exactly one CAN frame, i.e. a quota of 64
* blocks allows the interface to keep up to 64 RX+TX CAN frames.
*/
Driver(unsigned arg_num_ifaces,
uavcan::ISystemClock& clock,
uavcan::IPoolAllocator& shared_allocator,
unsigned block_allocation_quota_per_virtual_iface) :
num_ifaces_(arg_num_ifaces),
clock_(clock)
{
assert(num_ifaces_ > 0 && num_ifaces_ <= uavcan::MaxCanIfaces);
const unsigned quota_per_queue = block_allocation_quota_per_virtual_iface; // 2x overcommit
for (unsigned i = 0; i < num_ifaces_; i++)
{
ifaces_[i].template
construct<uavcan::IPoolAllocator&, uavcan::ISystemClock&, std::mutex&, unsigned>
(shared_allocator, clock_, mutex_, quota_per_queue);
}
}
};
}
Demo application
This demo also shows how to use a thread-safe heap-based shared block memory allocator. This allocator enables lower memory footprint for compound nodes than the default one.
/*
* This demo shows how to implement a multi-threaded DroneCAN node with libuavcan.
*
* The main thread is publishing KeyValue messages, simulating a hard real-time task.
*
* The secondary thread is running an active node monitor (based on uavcan::NodeInfoRetriever),
* which performs blocking filesystem I/O and therefore cannot be implemented in the main thread.
*/
/*
* Standard C++ headers.
*/
#include <iostream> // For std::cout and std::cerr
#include <fstream> // For std::ofstream, which is used in the demo payload logic
#include <thread> // For std::thread
/*
* Libuavcan headers.
*/
#include <uavcan/uavcan.hpp> // Main libuavcan header
#include <uavcan/node/sub_node.hpp> // For uavcan::SubNode, which is essential for multithreaded nodes
#include <uavcan/helpers/heap_based_pool_allocator.hpp> // In this example we're going to use heap-based allocator
/*
* These are purely for demonstrational purposes - they have no relation to multithreading.
*/
#include <uavcan/protocol/debug/KeyValue.hpp>
#include <uavcan/protocol/node_info_retriever.hpp>
/*
* Demo implementation of a virtual CAN driver.
*/
#include "uavcan_virtual_driver.hpp"
/*
* These functions are explained in one of the first tutorials.
*/
extern uavcan::ICanDriver& getCanDriver();
extern uavcan::ISystemClock& getSystemClock();
/**
* This class demonstrates a simple main node that is supposed to run some hard real-time tasks.
*/
class MainNodeDemo
{
/**
* This value will be reported by the allocator (read below) via @ref getBlockCapacity().
* The real limit (hard limit) can be configured separately, and its default value is twice the soft limit.
*/
static constexpr unsigned AllocatorBlockCapacitySoftLimit = 250;
/**
* Libuavcan's allocators can be made thread-safe by means of providing optional template argument - a type
* that will be instantiated within every thread-critical context.
* Since this allocator will be shared between nodes working in different threads, it must be thread-safe.
*/
struct AllocatorSynchronizer
{
std::mutex& getMutex()
{
static std::mutex m;
return m;
}
AllocatorSynchronizer() { getMutex().lock(); }
~AllocatorSynchronizer() { getMutex().unlock(); }
};
/**
* It is also possible to use the default pool-based allocator here, but we'll use the heap-based one for
* demonstrational purposes. The difference is that the heap-based allocator doesn't reserve the pool statically,
* instead it takes the memory from the heap using std::malloc(), and then keeps it for future use. This allocator
* has the following advantages over the default one:
*
* - Lower memory footprint. This is because it only allocates memory when the node needs it. Once a block is
* allocated, it will be kept for future re-use, therefore it doesn't need to constantly access heap (otherwise
* that would be highly non-deterministic and cause heap fragmentation).
*
* - Ability to shrink if the memory is no longer needed. For example, after the node has finished a certain
* operation that required a lot of memory, it can be reclaimed back by means of calling
* @ref uavcan::HeapBasedPoolAllocator::shrink(). This method will call std::free() for every block that is
* not currently in use by the node.
*
* Needless to say, initial allocations depend on std::malloc(), which may cause problems for real-time
* applications, so this allocator should be used with care. If in doubt, use traditional one, since it also
* can be made thread-safe. Or even use independent allocators per every (sub)node object, this is even more
* deterministic, but takes much more memory.
*/
uavcan::HeapBasedPoolAllocator<uavcan::MemPoolBlockSize, AllocatorSynchronizer> allocator_;
/**
* Note that we don't provide the pool size parameter to the @ref uavcan::Node template.
* If no value is provided, it defaults to zero, making the node expect a reference to @ref uavcan::IPoolAllocator
* to the constructor. This allows us to install a custom allocator, instantiated above.
*/
uavcan::Node<> node_;
public:
MainNodeDemo(uavcan::NodeID self_node_id, const std::string& self_node_name) :
allocator_(AllocatorBlockCapacitySoftLimit),
node_(getCanDriver(), getSystemClock(), allocator_) // Installing our custom allocator.
{
node_.setNodeID(self_node_id);
node_.setName(self_node_name.c_str());
}
uavcan::INode& getNode() { return node_; }
/**
* Needed only for demonstration.
*/
unsigned getMemoryAllocatorFootprint() const
{
return allocator_.getNumReservedBlocks() * uavcan::MemPoolBlockSize;
}
void runForever()
{
const int start_res = node_.start();
if (start_res < 0)
{
throw std::runtime_error("Failed to start the main node: " + std::to_string(start_res));
}
/*
* Initializing a KV publisher.
* This publication simulates a hard-real time task for the main node.
*/
uavcan::Publisher<uavcan::protocol::debug::KeyValue> kv_pub(node_);
uavcan::Timer kv_pub_timer(node_);
kv_pub_timer.startPeriodic(uavcan::MonotonicDuration::fromMSec(1000));
kv_pub_timer.setCallback([&kv_pub](const uavcan::TimerEvent&)
{
uavcan::protocol::debug::KeyValue msg;
msg.key = "Bob";
msg.value = 1.0F / static_cast<float>(std::rand());
(void)kv_pub.broadcast(msg); // TODO: Error handling
});
/*
* We know that in this implementation the class uavcan_virtual_driver::Driver inherits uavcan::IRxFrameListener,
* so we can simply restore the reference to uavcan_virtual_driver::ITxQueueInjector using dynamic_cast<>.
*
* In other implementations this approach may be unacceptable (e.g. RTTI, which is required for dynamic_cast<>,
* is often unavailable on deeply embedded systems), so a reference to uavcan_virtual_driver::ITxQueueInjector
* will have to be passed here using some other means (e.g. as a reference to the constructor).
*/
while (node_.getDispatcher().getRxFrameListener() == nullptr)
{
// Waiting for the sub-node to install the RX listener.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
auto& tx_injector =
dynamic_cast<uavcan_virtual_driver::ITxQueueInjector&>(*node_.getDispatcher().getRxFrameListener());
/*
* Running the node ALMOST normally.
*
* Spinning must break every once in a while in order to unload TX queue of the sub-node into the
* TX queue of the main node. Duration of spinning defines worst-case transmission delay for sub-node's
* outgoing frames. Since sub-nodes are not supposed to run hard real-time processes, transmission delays
* introduced by periodic queue synchronizations are acceptable.
*/
node_.setModeOperational();
while (true)
{
const int res = node_.spin(uavcan::MonotonicDuration::fromMSec(2));
if (res < 0)
{
std::cerr << "Transient failure: " << res << std::endl;
}
// CAN frame transfer from sub-node to the main node occurs here.
tx_injector.injectTxFramesInto(node_);
}
}
};
/**
* This is just some demo logic, it has nothing to do with multithreading.
* This class implements uavcan::INodeInfoListener, storing node info on the file system, one file per node.
* Please refer to the tutorial "Node discovery" to learn more.
*/
class FileBasedNodeInfoCollector final : public uavcan::INodeInfoListener
{
void handleNodeInfoRetrieved(uavcan::NodeID node_id,
const uavcan::protocol::GetNodeInfo::Response& node_info) override
{
std::cout << "Node info for " << int(node_id.get()) << ":\n" << node_info << std::endl;
std::ofstream(std::to_string(node_id.get())) << node_info;
}
void handleNodeInfoUnavailable(uavcan::NodeID node_id) override
{
std::cout << "Node info for " << int(node_id.get()) << " is unavailable" << std::endl;
std::ofstream(std::to_string(node_id.get())) << uavcan::protocol::GetNodeInfo::Response();
}
void handleNodeStatusChange(const uavcan::NodeStatusMonitor::NodeStatusChangeEvent& event) override
{
if (event.status.mode == uavcan::protocol::NodeStatus::MODE_OFFLINE)
{
std::cout << "Node " << int(event.node_id.get()) << " went offline" << std::endl;
(void)std::remove(std::to_string(event.node_id.get()).c_str());
}
}
};
/**
* This class demonstrates a simple sub-node that is supposed to run CPU-intensive, blocking, non-realtime tasks.
*/
class SubNodeDemo
{
static constexpr unsigned BlockAllocationQuotaPerIface = 80;
uavcan_virtual_driver::Driver driver_;
uavcan::SubNode<> node_;
uavcan::NodeInfoRetriever retriever_;
FileBasedNodeInfoCollector collector_;
public:
/**
* Sub-node needs a reference to the main node in order to bind its virtual CAN driver to it.
* Also, the sub-node and the virtual iface all use the same allocator from the main node (it is thread-safe).
* It is also possible to use dedicated allocators for every entity, but that would lead to higher memory
* footprint.
*/
SubNodeDemo(uavcan::INode& main_node) :
driver_(main_node.getDispatcher().getCanIOManager().getCanDriver().getNumIfaces(), // Nice?
getSystemClock(),
main_node.getAllocator(), // Installing our custom allocator from the main node.
BlockAllocationQuotaPerIface),
node_(driver_,
getSystemClock(),
main_node.getAllocator()), // Installing our custom allocator from the main node.
retriever_(node_)
{
node_.setNodeID(main_node.getNodeID()); // Obviously, we must use the same node ID.
main_node.getDispatcher().installRxFrameListener(&driver_); // RX frames will be copied to the virtual driver.
}
void runForever()
{
/*
* Initializing the demo payload.
* Note that the payload doesn't know that it's being runned by a secondary node - on the application level,
* there's no difference between a sub-node and the main node.
*/
const int retriever_res = retriever_.start();
if (retriever_res < 0)
{
throw std::runtime_error("Failed to start the retriever; error: " + std::to_string(retriever_res));
}
const int add_listener_res = retriever_.addListener(&collector_);
if (add_listener_res < 0)
{
throw std::runtime_error("Failed to add listener; error: " + std::to_string(add_listener_res));
}
/*
* Running the node normally.
* Note that the SubNode class does not implement the start() method - there's nothing to start.
*/
while (true)
{
const int res = node_.spin(uavcan::MonotonicDuration::getInfinite());
if (res < 0)
{
std::cerr << "Transient failure: " << res << std::endl;
}
}
}
};
int main(int argc, const char** argv)
{
if (argc < 2)
{
std::cerr << "Usage: " << argv[0] << " <node-id>" << std::endl;
return 1;
}
const int self_node_id = std::stoi(argv[1]);
MainNodeDemo main_node(self_node_id, "org.uavcan.tutorial.multithreading");
SubNodeDemo sub_node(main_node.getNode());
std::thread secondary_thread(std::bind(&SubNodeDemo::runForever, &sub_node));
// This thread is only needed for demo purposes; can be removed freely.
std::thread allocator_stat_reporting_thread([&main_node]()
{
unsigned last_peak = 0;
for (;;)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
const auto usage = main_node.getMemoryAllocatorFootprint();
if (usage != last_peak)
{
last_peak = usage;
std::cout << "Memory footprint: " << last_peak << " bytes" << std::endl;
}
}
});
main_node.runForever();
if (secondary_thread.joinable())
{
secondary_thread.join();
}
}
Possible output:
uavcan_virtual_iface::Driver: Total memory blocks: 468, blocks per queue: 468
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1001557f 05 00 00 00 00 00 00 c5 '........' ts_m=120912.553326 ts_utc=1443574957.568273 iface=0
uavcan_virtual_iface::Iface: TX injection [iface_index=0]: <volat> 0x1e01ff81 c0 '.'
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 18 ef 05 00 00 00 00 80 '........' ts_m=120912.594184 ts_utc=1443574957.609083 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 00 00 00 00 00 00 00 20 '....... ' ts_m=120912.594188 ts_utc=1443574957.609096 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 00 00 00 00 00 00 00 00 '........' ts_m=120912.594189 ts_utc=1443574957.609102 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 00 00 00 00 00 00 00 20 '....... ' ts_m=120912.594189 ts_utc=1443574957.609110 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 00 00 00 00 00 00 00 00 '........' ts_m=120912.594190 ts_utc=1443574957.609116 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 00 00 00 00 00 00 00 20 '....... ' ts_m=120912.594191 ts_utc=1443574957.609121 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 00 6f 72 67 2e 75 61 00 '.org.ua.' ts_m=120912.594192 ts_utc=1443574957.609128 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 76 63 61 6e 2e 6c 69 20 'vcan.li ' ts_m=120912.594193 ts_utc=1443574957.609131 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 6e 75 78 5f 61 70 70 00 'nux_app.' ts_m=120912.594194 ts_utc=1443574957.609135 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 2e 6e 6f 64 65 74 6f 20 '.nodeto ' ts_m=120912.594195 ts_utc=1443574957.609139 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1e0101ff 6f 6c 40 'ol@' ts_m=120912.594195 ts_utc=1443574957.609143 iface=0
Node info for 127:
status:
uptime_sec: 5
health: 0
mode: 0
sub_mode: 0
vendor_specific_status_code: 0
software_version:
major: 0
minor: 0
optional_field_flags: 0
vcs_commit: 0
image_crc: 0
hardware_version:
major: 0
minor: 0
unique_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
certificate_of_authenticity: ""
name: "org.uavcan.linux_app.nodetool"
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1001557f 06 00 00 00 00 00 00 c6 '........' ts_m=120913.553339 ts_utc=1443574958.568299 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1001557f 07 00 00 00 00 00 00 c7 '........' ts_m=120914.553385 ts_utc=1443574959.568286 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1001557f 08 00 00 00 00 00 00 c8 '........' ts_m=120915.553300 ts_utc=1443574960.568262 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1001557f 09 00 00 00 00 00 00 c9 '........' ts_m=120916.553426 ts_utc=1443574961.568343 iface=0
uavcan_virtual_iface::Driver: RX [flags=0]: 0x1001557f 0a 00 00 00 00 00 00 ca '........' ts_m=120917.553359 ts_utc=1443574962.568282 iface=0
Node 127 went offline
Running on Linux
Build the application using the following CMake script:
cmake_minimum_required(VERSION 2.8)
project(tutorial_project)
find_library(UAVCAN_LIB uavcan REQUIRED)
set(CMAKE_CXX_FLAGS "-pthread -Wall -Wextra -pedantic -std=c++11")
add_executable(node node.cpp ${CMAKE_SOURCE_DIR}/../2._Node_initialization_and_startup/platform_linux.cpp)
target_link_libraries(node ${UAVCAN_LIB} rt)