py_trees源码分析之装饰节点
版本信息
Python: 3.9
py_trees: 2.2.3
本文的代码与示例绝大部分来自github,只分析了部分重要的代码,包含基础行为模块、通用行为模块、通用模块、组合节点模块、装饰节点模块、黑板模块,且较简单的模块用表格汇总。
装饰节点(decorators)
装饰节点是一个特殊的行为节点,它只有一个子节点,可以修改其子节点的行为
基类
装饰基类
参数
- name: 装饰节点名称
- child: 子节点
方法
方法名 | 参数 | 作用 | 返回 |
---|---|---|---|
tick | 修改子节点的状态 | 行为树 | |
stop | 新状态 | 停止方法,将行为树置为新状态并终止行为树(如果是RUNNING置为INVALID) | |
tip | 提取树的最后一个运行节点 | 行为节点 |
装饰节点(继承于装饰基类)
重复节点类
作用
重复执行子节点
如果子节点返回成功,则继续执行,直到返回成功的次数达到num_success次(注意这里如果num_success为-1,则无限次执行,直到返回失败或被外部终止),则返回成功;
如果返回失败,则装饰器立即返回失败,不再执行。
参数
- name:节点名称
- child: 子节点
- num_success: 执行次数
核心方法解析
- 如果子节点状态FAILURE则返回FAILURE
- 如果子节点状态SUCCESS则成功次数+1
- 如果成功次数等于num_success则返回SUCCESS
- 否则返回RUNNING
- 如果子节点状态RUNNING则返回RUNNING
if self.decorated.status == common.Status.FAILURE:
self.feedback_message = f"failed, aborting [status: {self.success} success from {self.num_success}]"
return common.Status.FAILURE
elif self.decorated.status == common.Status.SUCCESS:
self.success += 1
self.feedback_message = (
f"success [status: {self.success} success from {self.num_success}]"
)
if self.success == self.num_success:
return common.Status.SUCCESS
else:
return common.Status.RUNNING
else: # RUNNING
self.feedback_message = (
f"running [status: {self.success} success from {self.num_success}]"
)
return common.Status.RUNNING
示例
import py_trees, time
success = py_trees.behaviours.Success(name="Success")
repeat = py_trees.decorators.Repeat("repeat_2", child=success, num_success=2)
for i in range(2):
try:
print("\n--------- Tick {0} ---------\n".format(i))
repeat.tick_once()
print("\n")
print("{}".format(py_trees.display.unicode_tree(repeat, show_status=True)))
time.sleep(1.0)
except KeyboardInterrupt:
break
print("\n")
# 可以看出在尝试2次均返回成功后才返回成功
>>
--------- Tick 0 ---------
-^- repeat_2 [*] -- success [status: 1 success from 2]
--> Success [✓] -- success
--------- Tick 1 ---------
-^- repeat_2 [✓] -- success [status: 2 success from 2]
--> Success [✓] -- success
>>
重试节点类
作用
如果子节点失败,则返回运行中继续执行,直到返回失败的次数达到num_failures次,返回失败
如果成功或者运行中,则直接返回对应状态
参数
- name:节点名称
- child: 子节点
- num_failures: 失败重试次数
核心方法解析
- 如果子节点状态FAILURE
- 失败次数+1,然后判断是否小于num_failures
- 是,则返回RUNNING
- 否,则返回FAILURE
- 失败次数+1,然后判断是否小于num_failures
- 如果子节点状态SUCCESS则返回SUCCESS
- 如果子节点状态RUNNING则返回RUNNING
if self.decorated.status == common.Status.FAILURE:
self.failures += 1
if self.failures < self.num_failures:
self.feedback_message = f"attempt failed [status: {self.failures} failure from {self.num_failures}]"
return common.Status.RUNNING
else:
self.feedback_message = f"final failure [status: {self.failures} failure from {self.num_failures}]"
return common.Status.FAILURE
elif self.decorated.status == common.Status.RUNNING:
self.feedback_message = (
f"running [status: {self.failures} failure from {self.num_failures}]"
)
return common.Status.RUNNING
else: # SUCCESS
self.feedback_message = (
f"succeeded [status: {self.failures} failure from {self.num_failures}]"
)
return common.Status.SUCCESS
示例
import py_trees, time
failure = py_trees.behaviours.Failure(name="Failure")
retry = py_trees.decorators.Retry("retry_2", child=failure, num_failures=2)
for i in range(2):
try:
print("\n--------- Tick {0} ---------\n".format(i))
retry.tick_once()
print("\n")
print("{}".format(py_trees.display.unicode_tree(retry, show_status=True)))
time.sleep(1.0)
except KeyboardInterrupt:
break
# 可以看出在尝试2次均返回失败后才返回失败
>>
--------- Tick 0 ---------
-^- retry_2 [*] -- attempt failed [status: 1 failure from 2]
--> Failure [✕] -- failure
--------- Tick 1 ---------
-^- retry_2 [✕] -- final failure [status: 2 failure from 2]
--> Failure [✕] -- failure
>>
超时节点类
作用
给其子节点或子树添加超时限制
参数
- name: 节点名称
- child: 子节点
- duration: 超时时间,默认5s
核心方法解析
- 获取当前时间
- 如果当前子节点仍在运行状态&&当前时间>结束时间,直接重置状态INVALID并返回FAILURE
current_time = time.monotonic()
if (
self.decorated.status == common.Status.RUNNING
and current_time > self.finish_time
):
self.feedback_message = "timed out"
self.logger.debug(
"{}.update() {}".format(self.__class__.__name__, self.feedback_message)
)
# invalidate the decorated (i.e. cancel it), could also put this logic in a terminate() method
self.decorated.stop(common.Status.INVALID)
return common.Status.FAILURE
if self.decorated.status == common.Status.RUNNING:
self.feedback_message = "time still ticking ... [remaining: {}s]".format(
self.finish_time - current_time
)
else:
self.feedback_message = "child finished before timeout triggered"
return self.decorated.status
示例
import py_trees, time
# 1.创建一个2s后才返回成功的节点
class ReturnAfterTwoSec(py_trees.behaviour.Behaviour):
def __init__(self, name="ReturnAfterTwoSec"):
super(ReturnAfterTwoSec, self).__init__(name)
self.start_time = time.time()
def update(self):
if time.time() - self.start_time > 2:
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.RUNNING
# 2.创建一个1s后超时的装饰节点
timeout = py_trees.decorators.Timeout("timeout_1", child=ReturnAfterTwoSec(), duration=1)
for _ in range(3):
timeout.tick_once()
print("\n")
print("{}".format(py_trees.display.unicode_tree(timeout, show_status=True)))
time.sleep(0.5)
# 在1s之后子节点仍在运行状态,则返回失败。
>>
-^- timeout_1 [*] -- time still ticking ... [remaining: 0.999991708s]
--> ReturnAfterTwoSec [*]
-^- timeout_1 [*] -- time still ticking ... [remaining: 0.49481966600000005s]
--> ReturnAfterTwoSec [*]
-^- timeout_1 [✕] -- timed out
--> ReturnAfterTwoSec [-]
>>
计数节点类
作用
对所有状态进行计数并打印
参数
- name: 节点名称
- child: 子节点
核心方法解析
- 在update方法中对RUNNING状态进行计数
- 在terminate方法中对其他状态进行计数
def update(self) -> common.Status:
"""
Increment the counter.
Returns:
the behaviour's new status :class:`~py_trees.common.Status`
"""
self.total_tick_count += 1
if self.decorated.status == common.Status.RUNNING:
self.running_count += 1
return self.decorated.status
def terminate(self, new_status: common.Status) -> None:
"""Increment the completion / interruption counters."""
if new_status == common.Status.INVALID:
self.interrupt_count += 1
elif new_status == common.Status.SUCCESS:
self.success_count += 1
elif new_status == common.Status.FAILURE:
self.failure_count += 1
一次执行节点类
作用
在给定的策略下激活后,后续tick直接返回子节点返回的最终状态,而不再执行子节点。这样确保子节点只被执行一次(这里一次并非是一次tick,而是达到目标策略的完整周期)
参数
- name: 节点名称
- child: 子节点
- policy: 策略<ON_SUCCESSFUL_COMPLETION: 子节点返回成功激活><ON_COMPLETION: 子节点返回成功/失败激活>
核心方法解析
- update方法用于判断是否满足策略,如果满足则返回最终状态
- tick方法用于执行装饰器,如果子节点已经满足策略,则直接
def update(self) -> common.Status:
if self.final_status:
return self.final_status
return self.decorated.status
def tick(self) -> typing.Iterator[behaviour.Behaviour]:
if self.final_status:
for node in behaviour.Behaviour.tick(self):
yield node
else:
for node in Decorator.tick(self):
yield node
def terminate(self, new_status: common.Status) -> None:
if not self.final_status and new_status in self.policy.value:
self.feedback_message = "oneshot completed"
self.final_status = new_status
else:
self.logger.debug(
"{}.terminate({})".format(self.__class__.__name__, new_status)
)
示例
import time
import py_trees
if __name__ == '__main__':
sequence1 = py_trees.composites.Sequence("Sequence1", False)
sequence2 = py_trees.composites.Sequence("Sequence2", False)
# 设置行为队列节点,依次返回RUNNING、FAILURE、SUCCESS
ffs1, ffs2 = (py_trees.behaviours.StatusQueue(
name=f"FFS{i+1}",
queue=[
py_trees.common.Status.RUNNING,
py_trees.common.Status.FAILURE,
],
eventually=py_trees.common.Status.SUCCESS,
) for i in range(2))
sequence1.add_children([ffs1])
sequence2.add_children([ffs2])
# 第一棵树设置策略为ON_SUCCESSFUL_COMPLETION
root1 = py_trees.decorators.OneShot(
name="OneShot_S",
child=sequence1,
policy=py_trees.common.OneShotPolicy.ON_SUCCESSFUL_COMPLETION)
# 第一棵树设置策略为ON_COMPLETION
root2 = py_trees.decorators.OneShot(
name="OneShot_C",
child=sequence2,
policy=py_trees.common.OneShotPolicy.ON_COMPLETION)
for i in range(3):
root1.tick_once()
root2.tick_once()
print(f"-------root1.final_status: {root1.final_status}, root2.final_status: {root2.final_status}-------")
print(py_trees.display.unicode_tree(root=root1, show_status=True))
print(py_trees.display.unicode_tree(root=root2, show_status=True))
print('-'*30)
time.sleep(1.0)
>>
-------root1.final_status: None, root2.final_status: None-------
-^- OneShot_S [*]
[-] Sequence1 [*]
--> FFS1 [*]
-^- OneShot_C [*]
[-] Sequence2 [*]
--> FFS2 [*]
# 可以看到在第二次tick,root2(ON_COMPLETION)策略已经满足要求,因为子节点返回FAILURE
-------root1.final_status: None, root2.final_status: Status.FAILURE-------
-^- OneShot_S [✕]
[-] Sequence1 [✕]
--> FFS1 [✕]
-^- OneShot_C [✕] -- oneshot completed
[-] Sequence2 [✕]
--> FFS2 [✕]
# 可以看到在第三次tick,root1(ON_SUCCESSFUL_COMPLETION)策略已经满足要求,因为子节点返回SUCCESS
-------root1.final_status: Status.SUCCESS, root2.final_status: Status.FAILURE-------
-^- OneShot_S [✓] -- oneshot completed
[-] Sequence1 [✓]
--> FFS1 [✓]
-^- OneShot_C [✕] -- oneshot completed
[-] Sequence2 [✕]
--> FFS2 [✕]
>>
条件节点类
作用
检查被装饰的子节点的状态是否变成期望状态,满足返回成功,否则返回运行中。⚠️注意:此节点不会返回失败
参数
- name: 节点名称
- child: 子节点
- status: 期望状态
核心方法解析
if self.decorated.status == self.succeed_status:
return common.Status.SUCCESS
return common.Status.RUNNING
子节点状态展示节点类
直接返回节点状态
其他涉及状态变更类汇总
类 | 参数 | 作用 | 返回 |
---|---|---|---|
反转节点类 | name/child | 反转节点返回状态,仅针对成功/失败状态反转 | |
运行转成功类 | 从运行状态转化为成功状态 | 节点类型 | |
运行转失败类 | 从运行状态转化为失败状态 | 节点类型 | |
失败转成功类 | 从失败状态转化为成功状态 | 节点类型 | |
成功转失败类 | 从成功状态转化为失败状态 | 节点类型 | |
成功转运行类 | 从成功状态转化为运行状态 | 节点类型 |