py_trees源码分析(三)
@ 晚风 · Thursday, Apr 18, 2024 · 5 分钟阅读 · 更新于 4月 18, 2024

py_trees源码分析之组合节点

版本信息

Python: 3.9

py_trees: 2.2.3

本文的代码与示例绝大部分来自github,只分析了部分重要的代码,包含基础行为模块、通用行为模块、通用模块、组合节点模块、装饰节点模块、黑板模块,且较简单的模块用表格汇总。

组合节点(composites)

通常管理子行为,并在方式上应用一些逻辑,它们执行并返回结果,但通常自己不做任何事情。

执行在非组合行为中需要执行的检查或操作。

基类

组合基类 Composite

参数

  • name: 组合节点名
  • children: 需要添加到该组合的子节点

方法

方法名 参数 作用 返回
tick 纯虚函数,字类须重写
update 暂未使用,字类重写 状态
stop 新状态 公共停止方法,将行为树置为新状态并终止行为树(如果新状态为INVALID则将所有子树都置为INVALID)
tip 提取树的最后一个运行节点 行为节点
add_child 子节点 增加一个子节点 节点ID
add_children 子节点列表 增加多个子节点 本身
remove_child 子节点 删除一个子节点(如果运行中,则调用stop置为INVALID) 索引
remove_all_children 删除所有子节点(如果有节点运行中,则调用stop置为INVALID)
replace_child 替代节点
新节点
删除一个旧节点remove_child,获取索引,然后插入一个新的节点insert_child
remove_child_by_id 节点id 根据id查到对应节点,然后调用remove_child方法
prepend_child 子节点 在头部插入一个子节点 节点ID
insert_child 节点
索引
在指定索引处插入一个子节点 节点ID

组合节点(继承组合基类)

选择节点类

作用

选择器依次执行它的每个子行为,直到其中一个运行/成功则返回对应状态,或者遍历完子节点后返回失败。如果先执行的节点返回运行中,则即使后序节点有可能会成功,也仍会返回运行中状态。

参数

  • name: 选择节点名称
  • memory: 是否记录上次执行时的状态,如果为True,则下次tick会继续执行上次执行为RUNNING状态的节点,而不会重新执行所有子节点
  • Children: 子节点列表

核心方法解析

  1. 前置判断和准备工作

    1. 如果当前状态为非RUNING状态则先进行初始化
    2. 如果当前选择节点下没有子节点,则以失败状态停止并返回
    3. 如果memory为True,则直接找到记忆的子节点索引,注意⚠️:这里有做了一个重置当前子节点前的所有子节点状态工作,它的意义在于假设当前子节点失败了,那么选择器可能会回退,重新尝试执行前面子节点,如果不做重置工作,那么节点会返回之前的结果。
  2. 核心工作:

    1. 创建previous变量,保存上次返回的节点(RUNNING/SUCCESS)
    2. 遍历children并获取tick结果
    3. node is child为了获取当前运行节点
    4. 如果节点状态为RUNNING/SUCCESS则准备返回
      1. 首先更新self.current_childself.status作为当前选择器正运行的节点和当前选择器的状态
      2. 判断previous变量是否为空或者等于当前节点,此判断的意义是上次返回的节点不是当前节点,比如上次选择器有两个节点A和B,上次tick中A返回RUNNING,则previous记录为A,当前tick中A返回FAILURE,B返回RUNNING,则仅此判断。继续往下看
      3. 如果当前节点和上一轮返回的节点不一致,则需要进行一些处理,分析37-42行。首先将passed置为False,遍历所有子节点,如果pased为True,则将剩下节点状态重置INVALID。注意这里仅在当前节点=返回节点时更新passed状态为True,后续将一直为True,证明获取的就是当前节点后序的所有节点。那么为什么要把后序节点重置呢?假设一共三轮tick,第一轮A子节点返回FAILURE,B节点返回RUNNING,则选择器会把B节点作为返回节点。第二轮A子节点返回RUNNING,如果此时不重置,选择器把A节点作为返回节点,B状态仍是RUNNING,在第三轮A节点返回FAILURE,则会直接把B节点的RUNNING状态返回,而不会重新运行B。
    5. 所有子节点运行完也没有RUNNING/SUCCESS状态,则返回FAILURE,当前节点为最后一个节点
def tick(self) -> typing.Iterator[behaviour.Behaviour]:
      # initialise
      if self.status != common.Status.RUNNING:
          self.current_child = self.children[0] if self.children else None
          self.initialise()

      # nothing to do
      if not self.children:
          self.current_child = None
          self.stop(common.Status.FAILURE)
          yield self
          return

      # starting point
      if self.memory:
          assert self.current_child is not None  # should never be true, help mypy out
          index = self.children.index(self.current_child)
          for child in itertools.islice(self.children, None, index):
              child.stop(common.Status.INVALID)
      else:
          index = 0

      # actual work
      previous = self.current_child
      for child in itertools.islice(self.children, index, None):
          for node in child.tick():
              yield node
              if node is child:
                  if (
                      node.status == common.Status.RUNNING
                      or node.status == common.Status.SUCCESS
                  ):
                      self.current_child = child
                      self.status = node.status
                      if previous is None or previous != self.current_child:
                          # we interrupted, invalidate everything at a lower priority
                          passed = False
                          for child in self.children:
                              if passed:
                                  if child.status != common.Status.INVALID:
                                      child.stop(common.Status.INVALID)
                              passed = True if child == self.current_child else passed
                      yield self
                      return
      # all children failed, set failure ourselves and current child to the last bugger who failed us
      self.status = common.Status.FAILURE
      try:
          self.current_child = self.children[-1]
      except IndexError:
          self.current_child = None
      yield self

示例

# 1.创建一个选择节点
root = py_trees.composites.Selector(name="Selector", memory=False)

# 2.创建一个行为节点(队列)
ffs = py_trees.behaviours.StatusQueue(
          name="FFS",
          queue=[
              py_trees.common.Status.FAILURE,
              py_trees.common.Status.FAILURE,
              py_trees.common.Status.SUCCESS,
          ],
          eventually=py_trees.common.Status.SUCCESS,
      )

# 3.创建一个一直返回RUNING的行为节点
always_running = py_trees.behaviours.Running(name="Running")

# 4.将这两个行为子节点加入到选择节点中
root.add_children([ffs, always_running])

root.tick()
# 第一次tick 返回结果  FFS -> FAILURE or Running -> RUNNING = Selector -> RUNNING
root.tick()
# 第二次tick 返回结果  FFS -> FAILURE or Running -> RUNNING = Selector -> RUNNING
root.tick()
# 第三次tick 返回结果  FFS -> SUCCESS or Running -> RUNNING = Selector -> SUCCESS
# 此时Selector将Running stop为无效状态,这是由于 选择器在tick时如果某个子节点已经返回成功,则将其他子节点置为INVALID, child.stop(common.Status.INVALID) 

如果想要详细了解选择节点的运行原理,可以尝试使用不同的状态节点的返回顺序组合来测试。

顺序节点类

作用

顺序节点依次执行它的每个子行为,如果全部节点成功则返回SUCCESS,或者遍历完子节点后返回RUNNING。⚠️注意:如果某个节点返回RUNNING,那么顺序节点将直接返回RUNNING,不再遍历后续节点。

参数

  • name: 顺序节点名称
  • memory: 是否记录上次执行时的状态,如果为True,则下次tick会继续执行上次执行为RUNNING状态的节点,而不会重新执行所有子节点
  • Children: 子节点列表

核心方法解析

  1. 前置判断
    1. 如果当前为非RUNNING状态,则进行所有节点重置、初始化、更新当前节点为第一个节点
    2. 如果有记忆状态,则更新index
    3. 如果没有记忆状态,则更新当前节点为第一个节点
    4. 如果没有子节点,直接返回SUCCESS
  2. 核心工作
    1. 遍历子节点进行tick工作
    2. 如果当前节点状态为非SUCCESS状态
      1. 更新顺序节点状态为此节点状态
      2. 如果没有记忆状态,则判断后序节点状态,非INVALID则直接重置后序节点
    3. 更新index
    4. 所有节点都为SUCCESS则返回SUCCESS
def tick(self) -> typing.Iterator[behaviour.Behaviour]:
    """
    Tick over the children.

    Yields:
        :class:`~py_trees.behaviour.Behaviour`: a reference to itself or one of its children
    """
    self.logger.debug("%s.tick()" % self.__class__.__name__)

    # initialise
    index = 0
    if self.status != common.Status.RUNNING:
        self.current_child = self.children[0] if self.children else None
        for child in self.children:
            if child.status != common.Status.INVALID:
                child.stop(common.Status.INVALID)
        self.initialise()  # user specific initialisation
    elif self.memory and common.Status.RUNNING:
        assert self.current_child is not None  # should never be true, help mypy out
        index = self.children.index(self.current_child)
    elif not self.memory and common.Status.RUNNING:
        self.current_child = self.children[0] if self.children else None
    else:
        # previous conditional checks should cover all variations
        raise RuntimeError("Sequence reached an unknown / invalid state")

    # nothing to do
    if not self.children:
        self.current_child = None
        self.stop(common.Status.SUCCESS)
        yield self
        return

    # actual work
    for child in itertools.islice(self.children, index, None):
        for node in child.tick():
            yield node
            if node is child and node.status != common.Status.SUCCESS:
                self.status = node.status
                if not self.memory:
                    # invalidate the remainder of the sequence
                    # i.e. kill dangling runners
                    for child in itertools.islice(self.children, index + 1, None):
                        if child.status != common.Status.INVALID:
                            child.stop(common.Status.INVALID)
                yield self
                return
        try:
            # advance if there is 'next' sibling
            self.current_child = self.children[index + 1]
            index += 1
        except IndexError:
            pass

    self.stop(common.Status.SUCCESS)
    yield self

示例

# 1.创建一个顺序节点
root = py_trees.composites.Sequence(name="Sequence", memory=True)

# 2.创建一个行为节点(队列)
for action in ["Action 1", "Action 2", "Action 3"]:
    rssss = py_trees.behaviours.StatusQueue(
        name=action,
        queue=[
            py_trees.common.Status.RUNNING,
            py_trees.common.Status.SUCCESS,
        ],
        eventually=py_trees.common.Status.SUCCESS,
    )
    root.add_child(rssss)
return root

并行节点类

作用

依次执行所有子节点,获取到所有子节点状态后,根据设置的并行策略再确定最终返回的结果。注意虽然该节点名为并行节点,但是由于行为树是在单线程上运行,并不会在运行中通过开启其他线程去实现节点并行执行,所以他与选择节点的区别是,无论子节点返回什么,都会保证全部运行完,然后再根据策略返回。

参数

  • name: 并行节点名称
  • policy: 并行策略对象,包含两个属性:策略<全部成功/一个成功/部分成功>;目标节点列表(仅当策略为部分节点成功使用,必须包含于Children)
  • Children: 子节点列表

核心方法解析

  1. 此处就不展示validate_policy_configuration方法,此方法主要是为了检查policy为部分成功时的节点情况,主要检查2个方面。1/目标节点不能为空列表 2/目标节点必须存在于Children
  2. 如果当前并行节点为非RUNNING状态,则进行所有节点重置、初始化、更新当前节点为None
  3. 如果没有子节点,直接返回SUCCESS
  4. 遍历子节点进行tick工作
  5. 获取所有失败节点并返回FAILURE
  6. 如果没有失败节点则判断是否满足策略<全部成功/一个成功/部分成功>,满足则返回SUCCESS,并将当前执行节点置为满足条件的最后一个节点。
  7. 如果并行节点非RUNNING,则stop
self.validate_policy_configuration()
# reset
if self.status != common.Status.RUNNING:
    for child in self.children:
        # reset the children, this ensures old SUCCESS/FAILURE status flags
        # don't break the synchronisation logic below
        if child.status != common.Status.INVALID:
            child.stop(common.Status.INVALID)
    self.current_child = None
    # subclass (user) handling
    self.initialise()

# nothing to do
if not self.children:
    self.current_child = None
    self.stop(common.Status.SUCCESS)
    yield self
    return

# process them all first
for child in self.children:
    if self.policy.synchronise and child.status == common.Status.SUCCESS:
        continue
    for node in child.tick():
        yield node

# determine new status
new_status = common.Status.RUNNING
self.current_child = self.children[-1]
try:
    failed_child = next(
        child
        for child in self.children
        if child.status == common.Status.FAILURE
    )
    self.current_child = failed_child
    new_status = common.Status.FAILURE
except StopIteration:
    if type(self.policy) is common.ParallelPolicy.SuccessOnAll:
        if all([c.status == common.Status.SUCCESS for c in self.children]):
            new_status = common.Status.SUCCESS
            self.current_child = self.children[-1]
    elif type(self.policy) is common.ParallelPolicy.SuccessOnOne:
        successful = [
            child
            for child in self.children
            if child.status == common.Status.SUCCESS
        ]
        if successful:
            new_status = common.Status.SUCCESS
            self.current_child = successful[-1]
    elif type(self.policy) is common.ParallelPolicy.SuccessOnSelected:
        if all(
            [c.status == common.Status.SUCCESS for c in self.policy.children]
        ):
            new_status = common.Status.SUCCESS
            self.current_child = self.policy.children[-1]
    else:
        raise RuntimeError(
            "this parallel has been configured with an unrecognised policy [{}]".format(
                type(self.policy)
            )
        )
# this parallel may have children that are still running
# so if the parallel itself has reached a final status, then
# these running children need to be terminated so they don't dangle
if new_status != common.Status.RUNNING:
    self.stop(new_status)
self.status = new_status
yield self

示例

当策略为全部成功或一个成功比较简单,我们来举一个部分节点成功的例子

# 1.首先创建一个并行节点对象,策略为SuccessOnSelected,由于参数chilren是必填参数,我们先传入一个空列表
root = py_trees.composites.Parallel(
    name="Parallel", policy=py_trees.common.ParallelPolicy.SuccessOnSelected([])
)

# 2.创建两个行为队列节点,其中一个在返回一次RUNNING后返回SUCCESS,另外一个在返回两次RUNNING后返回SUCCESS
success_after_one = py_trees.behaviours.StatusQueue(
    name="SuccessAfterOne",
    queue=[py_trees.common.Status.RUNNING],
    eventually=py_trees.common.Status.SUCCESS,
)
success_after_two = py_trees.behaviours.StatusQueue(
    name="SuccessAfterTwo",
    queue=[py_trees.common.Status.RUNNING, py_trees.common.Status.RUNNING],
    eventually=py_trees.common.Status.SUCCESS,
)

# 3.加入并行节点
root.add_child(success_after_one)
root.add_child(success_after_two)

# 4.设定目标节点为两次后成功节点
root.policy.children = [success_after_two]

for _ in range(3):
		root.tick_once()
  

# 可以看出仅当SuccessAfterTwo返回成功后,并行节点才更新状态为SUCCESS
>>
--------- Tick 1 ---------

[DEBUG] Parallel             : Parallel.tick()
[DEBUG] Parallel             : Parallel.tick(): re-initialising
[DEBUG] SuccessAfterOne      : StatusQueue.tick()
[DEBUG] SuccessAfterOne      : StatusQueue.update()
[DEBUG] SuccessAfterTwo      : StatusQueue.tick()
[DEBUG] SuccessAfterTwo      : StatusQueue.update()


/_/ Parallel [*]
    --> SuccessAfterOne [*]
    --> SuccessAfterTwo [*]


--------- Tick 2 ---------

[DEBUG] Parallel             : Parallel.tick()
[DEBUG] SuccessAfterOne      : StatusQueue.tick()
[DEBUG] SuccessAfterOne      : StatusQueue.update()
[DEBUG] SuccessAfterOne      : StatusQueue.stop(Status.RUNNING->Status.SUCCESS)
[DEBUG] SuccessAfterOne      : StatusQueue.terminate(Status.RUNNING->Status.SUCCESS)
[DEBUG] SuccessAfterTwo      : StatusQueue.tick()
[DEBUG] SuccessAfterTwo      : StatusQueue.update()


/_/ Parallel [*]
    --> SuccessAfterOne []
    --> SuccessAfterTwo [*]


--------- Tick 3 ---------

[DEBUG] Parallel             : Parallel.tick()
[DEBUG] SuccessAfterTwo      : StatusQueue.tick()
[DEBUG] SuccessAfterTwo      : StatusQueue.update()
[DEBUG] SuccessAfterTwo      : StatusQueue.stop(Status.RUNNING->Status.SUCCESS)
[DEBUG] SuccessAfterTwo      : StatusQueue.terminate(Status.RUNNING->Status.SUCCESS)
[DEBUG] Parallel             : Parallel.stop()[Status.RUNNING->Status.SUCCESS]


/_/ Parallel []
    --> SuccessAfterOne []
    --> SuccessAfterTwo []
>>

关于我

❤️

姓名: lwz


性别: 男


年龄: 29


星座: 摩羯座


职业: python工程师


爱好: 秋、ps5、运动


主要的技术栈是:

  • python
  • 自动驾驶仿真验证

学习网站: leetcode


公司: 国科础石


– 2025 年 2 月 25 日更新

我的一些开源项目

等等?项目呢?不会没有吧??

其他

如果你喜欢我的开源项目或者它们可以给你带来帮助,可以赏一杯咖啡 ☕ 给我。~

It is better to attach some information or leave a message so that I can record the donation 📝, thank you very much 🙏.

社交链接