py_trees源码分析之组合节点
版本信息
Python: 3.9
py_trees: 2.2.3
本文的代码与示例绝大部分来自github,只分析了部分重要的代码,包含基础行为模块、通用行为模块、通用模块、组合节点模块、装饰节点模块、黑板模块,且较简单的模块用表格汇总。
组合节点(composites)
通常管理子行为,并在方式上应用一些逻辑,它们执行并返回结果,但通常自己不做任何事情。
执行在非组合行为中需要执行的检查或操作。
基类
组合基类 Composite
参数
- name: 组合节点名
- children: 需要添加到该组合的子节点
方法
方法名 | 参数 | 作用 | 返回 |
---|---|---|---|
tick | 纯虚函数,字类须重写 | ||
update | 暂未使用,字类重写 | 状态 | |
stop | 新状态 | 公共停止方法,将行为树置为新状态并终止行为树(如果新状态为INVALID则将所有子树都置为INVALID) | |
tip | 提取树的最后一个运行节点 | 行为节点 | |
add_child | 子节点 | 增加一个子节点 | 节点ID |
add_children | 子节点列表 | 增加多个子节点 | 本身 |
remove_child | 子节点 | 删除一个子节点(如果运行中,则调用stop置为INVALID) | 索引 |
remove_all_children | 删除所有子节点(如果有节点运行中,则调用stop置为INVALID) | ||
replace_child | 替代节点 新节点 |
删除一个旧节点remove_child,获取索引,然后插入一个新的节点insert_child | |
remove_child_by_id | 节点id | 根据id查到对应节点,然后调用remove_child方法 | |
prepend_child | 子节点 | 在头部插入一个子节点 | 节点ID |
insert_child | 节点 索引 |
在指定索引处插入一个子节点 | 节点ID |
组合节点(继承组合基类)
选择节点类
作用
选择器依次执行它的每个子行为,直到其中一个运行/成功则返回对应状态,或者遍历完子节点后返回失败。如果先执行的节点返回运行中,则即使后序节点有可能会成功,也仍会返回运行中状态。
参数
- name: 选择节点名称
- memory: 是否记录上次执行时的状态,如果为True,则下次tick会继续执行上次执行为RUNNING状态的节点,而不会重新执行所有子节点
- Children: 子节点列表
核心方法解析
-
前置判断和准备工作
- 如果当前状态为非RUNING状态则先进行初始化
- 如果当前选择节点下没有子节点,则以失败状态停止并返回
- 如果memory为True,则直接找到记忆的子节点索引,注意⚠️:这里有做了一个重置当前子节点前的所有子节点状态工作,它的意义在于假设当前子节点失败了,那么选择器可能会回退,重新尝试执行前面子节点,如果不做重置工作,那么节点会返回之前的结果。
-
核心工作:
- 创建
previous
变量,保存上次返回的节点(RUNNING/SUCCESS) - 遍历children并获取tick结果
node is child
为了获取当前运行节点- 如果节点状态为RUNNING/SUCCESS则准备返回
- 首先更新
self.current_child
和self.status
作为当前选择器正运行的节点和当前选择器的状态 - 判断
previous
变量是否为空或者等于当前节点,此判断的意义是上次返回的节点不是当前节点,比如上次选择器有两个节点A和B,上次tick中A返回RUNNING,则previous记录为A,当前tick中A返回FAILURE,B返回RUNNING,则仅此判断。继续往下看 - 如果当前节点和上一轮返回的节点不一致,则需要进行一些处理,分析37-42行。首先将
passed
置为False,遍历所有子节点,如果pased
为True,则将剩下节点状态重置INVALID。注意这里仅在当前节点=返回节点时更新passed
状态为True,后续将一直为True,证明获取的就是当前节点后序的所有节点。那么为什么要把后序节点重置呢?假设一共三轮tick,第一轮A子节点返回FAILURE,B节点返回RUNNING,则选择器会把B节点作为返回节点。第二轮A子节点返回RUNNING,如果此时不重置,选择器把A节点作为返回节点,B状态仍是RUNNING,在第三轮A节点返回FAILURE,则会直接把B节点的RUNNING状态返回,而不会重新运行B。
- 首先更新
- 所有子节点运行完也没有RUNNING/SUCCESS状态,则返回FAILURE,当前节点为最后一个节点
- 创建
def tick(self) -> typing.Iterator[behaviour.Behaviour]:
# initialise
if self.status != common.Status.RUNNING:
self.current_child = self.children[0] if self.children else None
self.initialise()
# nothing to do
if not self.children:
self.current_child = None
self.stop(common.Status.FAILURE)
yield self
return
# starting point
if self.memory:
assert self.current_child is not None # should never be true, help mypy out
index = self.children.index(self.current_child)
for child in itertools.islice(self.children, None, index):
child.stop(common.Status.INVALID)
else:
index = 0
# actual work
previous = self.current_child
for child in itertools.islice(self.children, index, None):
for node in child.tick():
yield node
if node is child:
if (
node.status == common.Status.RUNNING
or node.status == common.Status.SUCCESS
):
self.current_child = child
self.status = node.status
if previous is None or previous != self.current_child:
# we interrupted, invalidate everything at a lower priority
passed = False
for child in self.children:
if passed:
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
passed = True if child == self.current_child else passed
yield self
return
# all children failed, set failure ourselves and current child to the last bugger who failed us
self.status = common.Status.FAILURE
try:
self.current_child = self.children[-1]
except IndexError:
self.current_child = None
yield self
示例
# 1.创建一个选择节点
root = py_trees.composites.Selector(name="Selector", memory=False)
# 2.创建一个行为节点(队列)
ffs = py_trees.behaviours.StatusQueue(
name="FFS",
queue=[
py_trees.common.Status.FAILURE,
py_trees.common.Status.FAILURE,
py_trees.common.Status.SUCCESS,
],
eventually=py_trees.common.Status.SUCCESS,
)
# 3.创建一个一直返回RUNING的行为节点
always_running = py_trees.behaviours.Running(name="Running")
# 4.将这两个行为子节点加入到选择节点中
root.add_children([ffs, always_running])
root.tick()
# 第一次tick 返回结果 FFS -> FAILURE or Running -> RUNNING = Selector -> RUNNING
root.tick()
# 第二次tick 返回结果 FFS -> FAILURE or Running -> RUNNING = Selector -> RUNNING
root.tick()
# 第三次tick 返回结果 FFS -> SUCCESS or Running -> RUNNING = Selector -> SUCCESS
# 此时Selector将Running stop为无效状态,这是由于 选择器在tick时如果某个子节点已经返回成功,则将其他子节点置为INVALID, child.stop(common.Status.INVALID)
如果想要详细了解选择节点的运行原理,可以尝试使用不同的状态节点的返回顺序组合来测试。
顺序节点类
作用
顺序节点依次执行它的每个子行为,如果全部节点成功则返回SUCCESS,或者遍历完子节点后返回RUNNING。⚠️注意:如果某个节点返回RUNNING,那么顺序节点将直接返回RUNNING,不再遍历后续节点。
参数
- name: 顺序节点名称
- memory: 是否记录上次执行时的状态,如果为True,则下次tick会继续执行上次执行为RUNNING状态的节点,而不会重新执行所有子节点
- Children: 子节点列表
核心方法解析
- 前置判断
- 如果当前为非RUNNING状态,则进行所有节点重置、初始化、更新当前节点为第一个节点
- 如果有记忆状态,则更新index
- 如果没有记忆状态,则更新当前节点为第一个节点
- 如果没有子节点,直接返回SUCCESS
- 核心工作
- 遍历子节点进行tick工作
- 如果当前节点状态为非SUCCESS状态
- 更新顺序节点状态为此节点状态
- 如果没有记忆状态,则判断后序节点状态,非INVALID则直接重置后序节点
- 更新index
- 所有节点都为SUCCESS则返回SUCCESS
def tick(self) -> typing.Iterator[behaviour.Behaviour]:
"""
Tick over the children.
Yields:
:class:`~py_trees.behaviour.Behaviour`: a reference to itself or one of its children
"""
self.logger.debug("%s.tick()" % self.__class__.__name__)
# initialise
index = 0
if self.status != common.Status.RUNNING:
self.current_child = self.children[0] if self.children else None
for child in self.children:
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
self.initialise() # user specific initialisation
elif self.memory and common.Status.RUNNING:
assert self.current_child is not None # should never be true, help mypy out
index = self.children.index(self.current_child)
elif not self.memory and common.Status.RUNNING:
self.current_child = self.children[0] if self.children else None
else:
# previous conditional checks should cover all variations
raise RuntimeError("Sequence reached an unknown / invalid state")
# nothing to do
if not self.children:
self.current_child = None
self.stop(common.Status.SUCCESS)
yield self
return
# actual work
for child in itertools.islice(self.children, index, None):
for node in child.tick():
yield node
if node is child and node.status != common.Status.SUCCESS:
self.status = node.status
if not self.memory:
# invalidate the remainder of the sequence
# i.e. kill dangling runners
for child in itertools.islice(self.children, index + 1, None):
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
yield self
return
try:
# advance if there is 'next' sibling
self.current_child = self.children[index + 1]
index += 1
except IndexError:
pass
self.stop(common.Status.SUCCESS)
yield self
示例
# 1.创建一个顺序节点
root = py_trees.composites.Sequence(name="Sequence", memory=True)
# 2.创建一个行为节点(队列)
for action in ["Action 1", "Action 2", "Action 3"]:
rssss = py_trees.behaviours.StatusQueue(
name=action,
queue=[
py_trees.common.Status.RUNNING,
py_trees.common.Status.SUCCESS,
],
eventually=py_trees.common.Status.SUCCESS,
)
root.add_child(rssss)
return root
并行节点类
作用
依次执行所有子节点,获取到所有子节点状态后,根据设置的并行策略再确定最终返回的结果。注意虽然该节点名为并行节点
,但是由于行为树是在单线程上运行,并不会在运行中通过开启其他线程去实现节点并行执行,所以他与选择节点的区别是,无论子节点返回什么,都会保证全部运行完,然后再根据策略返回。
参数
- name: 并行节点名称
- policy: 并行策略对象,包含两个属性:策略<全部成功/一个成功/部分成功>;目标节点列表(仅当策略为部分节点成功使用,必须包含于Children)
- Children: 子节点列表
核心方法解析
- 此处就不展示
validate_policy_configuration
方法,此方法主要是为了检查policy
为部分成功时的节点情况,主要检查2个方面。1/目标节点不能为空列表 2/目标节点必须存在于Children
- 如果当前并行节点为非RUNNING状态,则进行所有节点重置、初始化、更新当前节点为None
- 如果没有子节点,直接返回SUCCESS
- 遍历子节点进行tick工作
- 获取所有失败节点并返回FAILURE
- 如果没有失败节点则判断是否满足策略<全部成功/一个成功/部分成功>,满足则返回SUCCESS,并将当前执行节点置为满足条件的最后一个节点。
- 如果并行节点非RUNNING,则stop
self.validate_policy_configuration()
# reset
if self.status != common.Status.RUNNING:
for child in self.children:
# reset the children, this ensures old SUCCESS/FAILURE status flags
# don't break the synchronisation logic below
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
self.current_child = None
# subclass (user) handling
self.initialise()
# nothing to do
if not self.children:
self.current_child = None
self.stop(common.Status.SUCCESS)
yield self
return
# process them all first
for child in self.children:
if self.policy.synchronise and child.status == common.Status.SUCCESS:
continue
for node in child.tick():
yield node
# determine new status
new_status = common.Status.RUNNING
self.current_child = self.children[-1]
try:
failed_child = next(
child
for child in self.children
if child.status == common.Status.FAILURE
)
self.current_child = failed_child
new_status = common.Status.FAILURE
except StopIteration:
if type(self.policy) is common.ParallelPolicy.SuccessOnAll:
if all([c.status == common.Status.SUCCESS for c in self.children]):
new_status = common.Status.SUCCESS
self.current_child = self.children[-1]
elif type(self.policy) is common.ParallelPolicy.SuccessOnOne:
successful = [
child
for child in self.children
if child.status == common.Status.SUCCESS
]
if successful:
new_status = common.Status.SUCCESS
self.current_child = successful[-1]
elif type(self.policy) is common.ParallelPolicy.SuccessOnSelected:
if all(
[c.status == common.Status.SUCCESS for c in self.policy.children]
):
new_status = common.Status.SUCCESS
self.current_child = self.policy.children[-1]
else:
raise RuntimeError(
"this parallel has been configured with an unrecognised policy [{}]".format(
type(self.policy)
)
)
# this parallel may have children that are still running
# so if the parallel itself has reached a final status, then
# these running children need to be terminated so they don't dangle
if new_status != common.Status.RUNNING:
self.stop(new_status)
self.status = new_status
yield self
示例
当策略为全部成功或一个成功比较简单,我们来举一个部分节点成功的例子
# 1.首先创建一个并行节点对象,策略为SuccessOnSelected,由于参数chilren是必填参数,我们先传入一个空列表
root = py_trees.composites.Parallel(
name="Parallel", policy=py_trees.common.ParallelPolicy.SuccessOnSelected([])
)
# 2.创建两个行为队列节点,其中一个在返回一次RUNNING后返回SUCCESS,另外一个在返回两次RUNNING后返回SUCCESS
success_after_one = py_trees.behaviours.StatusQueue(
name="SuccessAfterOne",
queue=[py_trees.common.Status.RUNNING],
eventually=py_trees.common.Status.SUCCESS,
)
success_after_two = py_trees.behaviours.StatusQueue(
name="SuccessAfterTwo",
queue=[py_trees.common.Status.RUNNING, py_trees.common.Status.RUNNING],
eventually=py_trees.common.Status.SUCCESS,
)
# 3.加入并行节点
root.add_child(success_after_one)
root.add_child(success_after_two)
# 4.设定目标节点为两次后成功节点
root.policy.children = [success_after_two]
for _ in range(3):
root.tick_once()
# 可以看出仅当SuccessAfterTwo返回成功后,并行节点才更新状态为SUCCESS
>>
--------- Tick 1 ---------
[DEBUG] Parallel : Parallel.tick()
[DEBUG] Parallel : Parallel.tick(): re-initialising
[DEBUG] SuccessAfterOne : StatusQueue.tick()
[DEBUG] SuccessAfterOne : StatusQueue.update()
[DEBUG] SuccessAfterTwo : StatusQueue.tick()
[DEBUG] SuccessAfterTwo : StatusQueue.update()
/_/ Parallel [*]
--> SuccessAfterOne [*]
--> SuccessAfterTwo [*]
--------- Tick 2 ---------
[DEBUG] Parallel : Parallel.tick()
[DEBUG] SuccessAfterOne : StatusQueue.tick()
[DEBUG] SuccessAfterOne : StatusQueue.update()
[DEBUG] SuccessAfterOne : StatusQueue.stop(Status.RUNNING->Status.SUCCESS)
[DEBUG] SuccessAfterOne : StatusQueue.terminate(Status.RUNNING->Status.SUCCESS)
[DEBUG] SuccessAfterTwo : StatusQueue.tick()
[DEBUG] SuccessAfterTwo : StatusQueue.update()
/_/ Parallel [*]
--> SuccessAfterOne [✓]
--> SuccessAfterTwo [*]
--------- Tick 3 ---------
[DEBUG] Parallel : Parallel.tick()
[DEBUG] SuccessAfterTwo : StatusQueue.tick()
[DEBUG] SuccessAfterTwo : StatusQueue.update()
[DEBUG] SuccessAfterTwo : StatusQueue.stop(Status.RUNNING->Status.SUCCESS)
[DEBUG] SuccessAfterTwo : StatusQueue.terminate(Status.RUNNING->Status.SUCCESS)
[DEBUG] Parallel : Parallel.stop()[Status.RUNNING->Status.SUCCESS]
/_/ Parallel [✓]
--> SuccessAfterOne [✓]
--> SuccessAfterTwo [✓]
>>