Automated ESC enumeration
In this example we’ll demonstrate how the automated ESC enumeration feature works from the inside. Please read the UAVCAN specification if the concept of automated enumeration is not familiar to you.
Writing the script
#!/usr/bin/env python3
import dronecan, time
from dronecan import uavcan
# get command line arguments
from argparse import ArgumentParser
parser = ArgumentParser(description='ESC enumeration example')
parser.add_argument("--bitrate", default=1000000, type=int, help="CAN bit rate")
parser.add_argument("--node-id", default=100, type=int, help="CAN node ID")
parser.add_argument("--dna-server", action='store_true', default=False, help="run DNA server")
parser.add_argument("port", default=None, type=str, help="serial port")
args = parser.parse_args()
# Determining how many ESC nodes are present.
# In real use cases though the number of ESC should be obtained from elsewhere, e.g. from control mixer settings.
# There is a helper class in PyDroneCAN that allows one to automate what we're doing here,
# but we're not using it for the purposes of greater clarity of what's going on on the protocol level.
def detect_esc_nodes():
esc_nodes = set()
handle = node.add_handler(uavcan.equipment.esc.Status, lambda event: esc_nodes.add(event.transfer.source_node_id))
try:
node.spin(timeout=3) # Collecting ESC status messages, thus determining which nodes are ESC
finally:
handle.remove()
return esc_nodes
# Enumerating ESC.
# In this example we're using blocking code for simplicity reasons,
# but real applications will most likely resort either to asynchronous code (callback-based),
# or implement the logic in a dedicated thread.
# Conversion of the code from synchronous to asynchronous/multithreaded pertains to the domain of general
# programming issues, so these questions are not covered in this demo.
def enumerate_all_esc(esc_nodes, timeout=60):
begin_responses_succeeded = 0
def begin_response_checker(event):
nonlocal begin_responses_succeeded
if not event:
raise Exception('Request timed out')
if event.response.error != event.response.ERROR_OK:
raise Exception('Enumeration rejected\n' + dronecan.to_yaml(event))
begin_responses_succeeded += 1
overall_deadline = time.monotonic() + timeout
print('Starting enumeration on all nodes...')
begin_request = uavcan.protocol.enumeration.Begin.Request(timeout_sec=timeout)
for node_id in esc_nodes:
print('Sending enumeration begin request to', node_id)
node.request(begin_request, node_id, begin_response_checker)
while begin_responses_succeeded < len(esc_nodes):
node.spin(0.1)
print('Listening for indications...')
enumerated_nodes = []
next_index = 0
while set(enumerated_nodes) != esc_nodes:
received_indication = None
def indication_callback(event):
nonlocal received_indication
if event.transfer.source_node_id in enumerated_nodes:
print('Indication callback from node %d ignored - already enumerated' % event.transfer.source_node_id)
else:
print(dronecan.to_yaml(event))
received_indication = event
indication_handler = node.add_handler(uavcan.protocol.enumeration.Indication, indication_callback)
print('=== PROVIDE ENUMERATION FEEDBACK ON ESC INDEX %d NOW ===' % next_index)
print('=== e.g. turn the motor, press the button, etc, depending on your equipment ===')
try:
while received_indication is None:
node.spin(0.1)
if time.monotonic() > overall_deadline:
raise Exception('Process timed out')
finally:
indication_handler.remove()
target_node_id = received_indication.transfer.source_node_id
print('Indication received from node', target_node_id)
print('Stopping enumeration on node', target_node_id)
begin_responses_succeeded = 0
node.request(uavcan.protocol.enumeration.Begin.Request(), target_node_id, begin_response_checker)
while begin_responses_succeeded < 1:
node.spin(0.1)
print('Setting config param %r to %r...' % (received_indication.message.parameter_name.decode(), next_index))
configuration_finished = False
def param_set_response(event):
if not event:
raise Exception('Request timed out')
assert event.response.name == received_indication.message.parameter_name
assert event.response.value.integer_value == next_index
print(dronecan.to_yaml(event))
node.request(uavcan.protocol.param.ExecuteOpcode.Request(
opcode=uavcan.protocol.param.ExecuteOpcode.Request().OPCODE_SAVE),
target_node_id,
param_opcode_response)
def param_opcode_response(event):
nonlocal configuration_finished
if not event:
raise Exception('Request timed out')
print(dronecan.to_yaml(event))
if not event.response.ok:
raise Exception('Param opcode execution rejected\n' + dronecan.to_yaml(event))
else:
configuration_finished = True
node.request(uavcan.protocol.param.GetSet.Request(value=uavcan.protocol.param.Value(integer_value=next_index),
name=received_indication.message.parameter_name),
target_node_id,
param_set_response)
while not configuration_finished:
node.spin(0.1)
print('Node', target_node_id, 'assigned ESC index', next_index)
next_index += 1
enumerated_nodes.append(target_node_id)
print('Enumerated so far:', enumerated_nodes)
return enumerated_nodes
# Initializing a DroneCAN node instance.
node = dronecan.make_node(args.port, node_id=args.node_id, bitrate=args.bitrate)
# Initializing a node monitor
node_monitor = dronecan.app.node_monitor.NodeMonitor(node)
if args.dna_server:
# optionally start a DNA server
dynamic_node_id_allocator = dronecan.app.dynamic_node_id.CentralizedServer(node, node_monitor)
# Waiting for at least one other node to appear online
while len(node_monitor.get_all_node_id()) <= 1:
print('Waiting for other nodes to become online...')
node.spin(timeout=1)
print("There are %u nodes online" % len(node_monitor.get_all_node_id()))
print('Detecting ESC nodes...')
esc_nodes = detect_esc_nodes()
print('ESC nodes:', esc_nodes)
enumerated_esc = enumerate_all_esc(esc_nodes)
print('All ESC enumerated successfully; index order is as follows:', enumerated_esc)
Running the code
Connect ESC to the CAN bus (it’s better to use multiple ESC, otherwise the auto-enumeration procedure becomes rather pointless), start the script, and follow its instructions.
For each ESC you will see the output similar to this:
=== PROVIDE ENUMERATION FEEDBACK ON ESC INDEX 0 NOW ===
=== e.g. turn the motor, press the button, etc, depending on your equipment ===
### Message from 124 to All ts_mono=94836.894482 ts_real=1470647890.850445
value:
empty:
{}
parameter_name: 'esc_index' # [101, 115, 99, 95, 105, 110, 100, 101, 120]
Indication received from node 124
Stopping enumeration on node 124
Setting config param 'esc_index' to 0...
### Response from 124 to 10 ts_mono=94837.077415 ts_real=1470647891.033378
value:
integer_value: 0
default_value:
integer_value: 0
max_value:
integer_value: 15
min_value:
integer_value: 0
name: 'esc_index' # [101, 115, 99, 95, 105, 110, 100, 101, 120]
### Response from 124 to 10 ts_mono=94837.122424 ts_real=1470647891.078387
argument: 0
ok: true
Node 124 assigned ESC index 0
Enumerated so far: [124]
A relevant video
The following video demonstrates how the automated enumeration works on the user level. Although it was not performed with the script shown in this example, the core principles remain the same.