py_trees源码分析(四)
@ 晚风 · Thursday, Apr 18, 2024 · 4 分钟阅读 · 更新于 4月 18, 2024

py_trees源码分析之装饰节点

版本信息

Python: 3.9

py_trees: 2.2.3

本文的代码与示例绝大部分来自github,只分析了部分重要的代码,包含基础行为模块、通用行为模块、通用模块、组合节点模块、装饰节点模块、黑板模块,且较简单的模块用表格汇总。

装饰节点(decorators)

装饰节点是一个特殊的行为节点,它只有一个子节点,可以修改其子节点的行为

基类

装饰基类

参数

  • name: 装饰节点名称
  • child: 子节点

方法

方法名 参数 作用 返回
tick 修改子节点的状态 行为树
stop 新状态 停止方法,将行为树置为新状态并终止行为树(如果是RUNNING置为INVALID)
tip 提取树的最后一个运行节点 行为节点

装饰节点(继承于装饰基类)

重复节点类

作用

重复执行子节点

如果子节点返回成功,则继续执行,直到返回成功的次数达到num_success次(注意这里如果num_success为-1,则无限次执行,直到返回失败或被外部终止),则返回成功;

如果返回失败,则装饰器立即返回失败,不再执行。

参数

  • name:节点名称
  • child: 子节点
  • num_success: 执行次数

核心方法解析

  1. 如果子节点状态FAILURE则返回FAILURE
  2. 如果子节点状态SUCCESS则成功次数+1
    1. 如果成功次数等于num_success则返回SUCCESS
    2. 否则返回RUNNING
  3. 如果子节点状态RUNNING则返回RUNNING
if self.decorated.status == common.Status.FAILURE:
    self.feedback_message = f"failed, aborting [status: {self.success} success from {self.num_success}]"
    return common.Status.FAILURE
elif self.decorated.status == common.Status.SUCCESS:
    self.success += 1
    self.feedback_message = (
        f"success [status: {self.success} success from {self.num_success}]"
    )
    if self.success == self.num_success:
        return common.Status.SUCCESS
    else:
        return common.Status.RUNNING
else:  # RUNNING
    self.feedback_message = (
        f"running [status: {self.success} success from {self.num_success}]"
    )
    return common.Status.RUNNING

示例

import py_trees, time

success = py_trees.behaviours.Success(name="Success")

repeat = py_trees.decorators.Repeat("repeat_2", child=success, num_success=2)

for i in range(2):
    try:
        print("\n--------- Tick {0} ---------\n".format(i))
        repeat.tick_once()
        print("\n")
        print("{}".format(py_trees.display.unicode_tree(repeat, show_status=True)))
        time.sleep(1.0)
    except KeyboardInterrupt:
        break
print("\n")

# 可以看出在尝试2次均返回成功后才返回成功
>>
--------- Tick 0 ---------
-^- repeat_2 [*] -- success [status: 1 success from 2]
    --> Success [] -- success

--------- Tick 1 ---------
-^- repeat_2 [] -- success [status: 2 success from 2]
    --> Success [] -- success

>>

重试节点类

作用

如果子节点失败,则返回运行中继续执行,直到返回失败的次数达到num_failures次,返回失败

如果成功或者运行中,则直接返回对应状态

参数

  • name:节点名称
  • child: 子节点
  • num_failures: 失败重试次数

核心方法解析

  1. 如果子节点状态FAILURE
    1. 失败次数+1,然后判断是否小于num_failures
      1. 是,则返回RUNNING
      2. 否,则返回FAILURE
  2. 如果子节点状态SUCCESS则返回SUCCESS
  3. 如果子节点状态RUNNING则返回RUNNING
if self.decorated.status == common.Status.FAILURE:
    self.failures += 1
    if self.failures < self.num_failures:
        self.feedback_message = f"attempt failed [status: {self.failures} failure from {self.num_failures}]"
        return common.Status.RUNNING
    else:
        self.feedback_message = f"final failure [status: {self.failures} failure from {self.num_failures}]"
        return common.Status.FAILURE
elif self.decorated.status == common.Status.RUNNING:
    self.feedback_message = (
        f"running [status: {self.failures} failure from {self.num_failures}]"
    )
    return common.Status.RUNNING
else:  # SUCCESS
    self.feedback_message = (
        f"succeeded [status: {self.failures} failure from {self.num_failures}]"
    )
    return common.Status.SUCCESS

示例

import py_trees, time

failure = py_trees.behaviours.Failure(name="Failure")
retry = py_trees.decorators.Retry("retry_2", child=failure, num_failures=2)

for i in range(2):
    try:
        print("\n--------- Tick {0} ---------\n".format(i))
        retry.tick_once()
        print("\n")
        print("{}".format(py_trees.display.unicode_tree(retry, show_status=True)))
        time.sleep(1.0)
    except KeyboardInterrupt:
        break

# 可以看出在尝试2次均返回失败后才返回失败
>>
--------- Tick 0 ---------
-^- retry_2 [*] -- attempt failed [status: 1 failure from 2]
    --> Failure [] -- failure

--------- Tick 1 ---------
-^- retry_2 [] -- final failure [status: 2 failure from 2]
    --> Failure [] -- failure
>>

超时节点类

作用

给其子节点或子树添加超时限制

参数

  • name: 节点名称
  • child: 子节点
  • duration: 超时时间,默认5s

核心方法解析

  1. 获取当前时间
  2. 如果当前子节点仍在运行状态&&当前时间>结束时间,直接重置状态INVALID并返回FAILURE
current_time = time.monotonic()
if (
    self.decorated.status == common.Status.RUNNING
    and current_time > self.finish_time
):
    self.feedback_message = "timed out"
    self.logger.debug(
        "{}.update() {}".format(self.__class__.__name__, self.feedback_message)
    )
    # invalidate the decorated (i.e. cancel it), could also put this logic in a terminate() method
    self.decorated.stop(common.Status.INVALID)
    return common.Status.FAILURE
if self.decorated.status == common.Status.RUNNING:
    self.feedback_message = "time still ticking ... [remaining: {}s]".format(
        self.finish_time - current_time
    )
else:
    self.feedback_message = "child finished before timeout triggered"
return self.decorated.status

示例

import py_trees, time

# 1.创建一个2s后才返回成功的节点
class ReturnAfterTwoSec(py_trees.behaviour.Behaviour):
    def __init__(self, name="ReturnAfterTwoSec"):
        super(ReturnAfterTwoSec, self).__init__(name)
        self.start_time = time.time()
    def update(self):
        if time.time() - self.start_time > 2:
            return py_trees.common.Status.SUCCESS
        else:
            return py_trees.common.Status.RUNNING
   
# 2.创建一个1s后超时的装饰节点
timeout = py_trees.decorators.Timeout("timeout_1", child=ReturnAfterTwoSec(), duration=1)

for _ in range(3):
    timeout.tick_once()
    print("\n")
    print("{}".format(py_trees.display.unicode_tree(timeout, show_status=True)))
    time.sleep(0.5)

# 在1s之后子节点仍在运行状态,则返回失败。
>>
-^- timeout_1 [*] -- time still ticking ... [remaining: 0.999991708s]
    --> ReturnAfterTwoSec [*]



-^- timeout_1 [*] -- time still ticking ... [remaining: 0.49481966600000005s]
    --> ReturnAfterTwoSec [*]



-^- timeout_1 [] -- timed out
    --> ReturnAfterTwoSec [-]
>>

计数节点类

作用

对所有状态进行计数并打印

参数

  • name: 节点名称
  • child: 子节点

核心方法解析

  1. 在update方法中对RUNNING状态进行计数
  2. 在terminate方法中对其他状态进行计数
def update(self) -> common.Status:
        """
        Increment the counter.

        Returns:
            the behaviour's new status :class:`~py_trees.common.Status`
        """
        self.total_tick_count += 1
        if self.decorated.status == common.Status.RUNNING:
            self.running_count += 1
        return self.decorated.status

    def terminate(self, new_status: common.Status) -> None:
        """Increment the completion / interruption counters."""
        if new_status == common.Status.INVALID:
            self.interrupt_count += 1
        elif new_status == common.Status.SUCCESS:
            self.success_count += 1
        elif new_status == common.Status.FAILURE:
            self.failure_count += 1
       

一次执行节点类

作用

在给定的策略下激活后,后续tick直接返回子节点返回的最终状态,而不再执行子节点。这样确保子节点只被执行一次(这里一次并非是一次tick,而是达到目标策略的完整周期)

参数

  • name: 节点名称
  • child: 子节点
  • policy: 策略<ON_SUCCESSFUL_COMPLETION: 子节点返回成功激活><ON_COMPLETION: 子节点返回成功/失败激活>

核心方法解析

  1. update方法用于判断是否满足策略,如果满足则返回最终状态
  2. tick方法用于执行装饰器,如果子节点已经满足策略,则直接
def update(self) -> common.Status:
    if self.final_status:
        return self.final_status
    return self.decorated.status

def tick(self) -> typing.Iterator[behaviour.Behaviour]:
    if self.final_status:
        for node in behaviour.Behaviour.tick(self):
            yield node
    else:
        for node in Decorator.tick(self):
            yield node

def terminate(self, new_status: common.Status) -> None:
    if not self.final_status and new_status in self.policy.value:
        self.feedback_message = "oneshot completed"
        self.final_status = new_status
    else:
        self.logger.debug(
            "{}.terminate({})".format(self.__class__.__name__, new_status)
        )

示例

import time
import py_trees

if __name__ == '__main__':
    sequence1 = py_trees.composites.Sequence("Sequence1", False)
    sequence2 = py_trees.composites.Sequence("Sequence2", False)
    
    # 设置行为队列节点,依次返回RUNNING、FAILURE、SUCCESS
    ffs1, ffs2 = (py_trees.behaviours.StatusQueue(
        name=f"FFS{i+1}",
        queue=[
            py_trees.common.Status.RUNNING,
            py_trees.common.Status.FAILURE,
        ],
        eventually=py_trees.common.Status.SUCCESS,
    ) for i in range(2))

    sequence1.add_children([ffs1])
    sequence2.add_children([ffs2])

    # 第一棵树设置策略为ON_SUCCESSFUL_COMPLETION
    root1 = py_trees.decorators.OneShot(
        name="OneShot_S",
        child=sequence1,
        policy=py_trees.common.OneShotPolicy.ON_SUCCESSFUL_COMPLETION)

    # 第一棵树设置策略为ON_COMPLETION
    root2 = py_trees.decorators.OneShot(
        name="OneShot_C",
        child=sequence2,
        policy=py_trees.common.OneShotPolicy.ON_COMPLETION)

    for i in range(3):
        root1.tick_once()
        root2.tick_once()
        print(f"-------root1.final_status: {root1.final_status}, root2.final_status: {root2.final_status}-------")
        print(py_trees.display.unicode_tree(root=root1, show_status=True))
        print(py_trees.display.unicode_tree(root=root2, show_status=True))
        print('-'*30)
        time.sleep(1.0)
>>  
-------root1.final_status: None, root2.final_status: None-------
-^- OneShot_S [*]
    [-] Sequence1 [*]
        --> FFS1 [*]

-^- OneShot_C [*]
    [-] Sequence2 [*]
        --> FFS2 [*]

# 可以看到在第二次tick,root2(ON_COMPLETION)策略已经满足要求,因为子节点返回FAILURE
-------root1.final_status: None, root2.final_status: Status.FAILURE-------
-^- OneShot_S []
    [-] Sequence1 []
        --> FFS1 []

-^- OneShot_C [] -- oneshot completed
    [-] Sequence2 []
        --> FFS2 []
# 可以看到在第三次tick,root1(ON_SUCCESSFUL_COMPLETION)策略已经满足要求,因为子节点返回SUCCESS
-------root1.final_status: Status.SUCCESS, root2.final_status: Status.FAILURE-------
-^- OneShot_S [] -- oneshot completed
    [-] Sequence1 []
        --> FFS1 []

-^- OneShot_C [] -- oneshot completed
    [-] Sequence2 []
        --> FFS2 []
>>

条件节点类

作用

检查被装饰的子节点的状态是否变成期望状态,满足返回成功,否则返回运行中。⚠️注意:此节点不会返回失败

参数

  • name: 节点名称
  • child: 子节点
  • status: 期望状态

核心方法解析

if self.decorated.status == self.succeed_status:
    return common.Status.SUCCESS
return common.Status.RUNNING

子节点状态展示节点类

直接返回节点状态

其他涉及状态变更类汇总

参数 作用 返回
反转节点类 name/child 反转节点返回状态,仅针对成功/失败状态反转
运行转成功类 从运行状态转化为成功状态 节点类型
运行转失败类 从运行状态转化为失败状态 节点类型
失败转成功类 从失败状态转化为成功状态 节点类型
成功转失败类 从成功状态转化为失败状态 节点类型
成功转运行类 从成功状态转化为运行状态 节点类型

关于我

❤️

姓名: lwz


性别: 男


年龄: 29


星座: 摩羯座


职业: python工程师


爱好: 秋、ps5、运动


主要的技术栈是:

  • python
  • 自动驾驶仿真验证

学习网站: leetcode


公司: 国科础石


– 2025 年 2 月 25 日更新

我的一些开源项目

等等?项目呢?不会没有吧??

其他

如果你喜欢我的开源项目或者它们可以给你带来帮助,可以赏一杯咖啡 ☕ 给我。~

It is better to attach some information or leave a message so that I can record the donation 📝, thank you very much 🙏.

社交链接