华为Atlas Python多进程应用开发

一边摔倒一边填坑

华为Atlas Python多进程应用开发
封面图片 Tomas Sobek

华为的Atlas设备搭载了昇腾310芯片,这是华为海思设计的一款SOC。这块芯片不止包含了用于加速神经网络运算的NPU,还有用于多媒体解码的加速硬件、以及用于任务协调调度的CPU(华为叫做AICPU)。这块芯片可以将CPU从运算繁重的任务中解放出来,但是同时这给应用开发也带来了一些挑战。你可以从上一篇文章华为Atlas设备图像裁切中体会到涉及昇腾芯片应用开发的不同之处。本篇文章则描述了使用Python语言在Atlas设备上实现多进程应用开发技术坑以及解决方式。

‌             ‌

常规多进程视频处理

假设你要开发一款视频分析应用,需要同时接入两路视频,且两路画面都需要人脸检测功能。为了充分利用设备资源,视频解码要在昇腾芯片上实现。一种可能的应用设计如下:

两个进程实现

该设计包含了两个进程,进程1负责接入视频与图像解码;进程2负责人脸检测。两个进程通过队列在内存中交换数据。在不使用昇腾硬件的环境下,你可以使用这段代码实现:

"""Template code for video frame processing.""" 
import multiprocessing as mp 
import cv2 

BUFFER_SIZE = 12 
RUNNING = mp.Value("i", 1) 


def get_frames(srcs, q: mp.Queue): 
    """子进程对象。用于视频流解码,并将解码后的图像填充到队列中。""" 
    caps = [cv2.VideoCapture(src) for src in srcs] 
    print("Opening streams...") 
    while RUNNING.value == 1: 
    	for i, cap in enumerate(caps): 
        	_, frame = cap.read() 
            q.put((frame, i)) 
            print("Streaming stopped.") 


def main(): 
    # 设定视频源 
    video_srcs = ['/home/robin/Videos/pose.mp4', '/home/robin/Videos/mask.mp4', ] 
    
    # 构建用于交换数据的队列 
    frame_q = mp.Queue(maxsize=BUFFER_SIZE) 
    
    # 构建进程并启动它 
    p = mp.Process(target=get_frames, args=(video_srcs, frame_q,)) 
    p.start() 
    
    # 预览图像。 
    while RUNNING.value == 1: 
        frame, cap_id = frame_q.get() 
        frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5) 
        cv2.imshow('{}'.format(cap_id), frame) 
        if cv2.waitKey(1) == 27: 
            RUNNING.value = 0 
            
    print("Closing queue...") 
    frame_q.close() 
    
    print("Joining process...") 
    p.terminate() 
    
    print("All done.") 

if __name__ == "__main__": 
    main()
两个进程实现视频解码与处理

这段代码在运行时会在两个独立的预览窗口显示解码后的图像。

两路视频预览画面

一切看上去都很和谐。但是,当你引入昇腾芯片后,事情发生了变化。

硬件加速的视频解码

视频解码是典型的CPU繁重型任务。昇腾310芯片中有专门的硬件模块可以实现H.264/265硬件加速解码。我们可以借助该硬件模块来减轻CPU负担,加快应用运行速度。

在上一篇关于图像裁切的文章中,你已经通过昇腾应用开发的Python SDK pyACL实现了图像的区域裁切。视频解码的开发过程与其非常类似,充斥着大量有着“奇怪”名字的API。在这里我们将引入一款由华为FAE工程师进一步封装后的Python SDK:AscendFly。它将常用功能封装为模块,然后对外提供类似OpenCV风格的接口,可以有效提升开发效率。

ascend-fae/ascendfly
ascend目标检测分类框架
AscendFly官方代码

AscendFly提供了视频功能抽象模块 ascend.VideoCapture 来实现视频文件的读取与解码。它的使用方法与OpenCV的 VideoCapture 非常相似。

# 导入ascend模块 
import ascend 

# 构建昇腾硬件抽象 
context = ascend.Context({0}).context_dict[0] 

# 构建视频处理对象 
cap = ascend.VideoCapture(context, video_file, channel_id) 

# 读取视频帧 
frame, frame_id = cap.read()
ascend视频解码

相比pyACL提供的API,经过AscendFly封装后的模块显然更加友好。同理,我们可以将之前已经写好的代码稍作改造,让它实现硬件加速的视频解码。

"""Template code for video frame processing.""" 
import multiprocessing as mp 
import ascend 
import cv2 

DEVICE_ID = 0 
BUFFER_SIZE = 12 
RUNNING = mp.Value("i", 1)

def get_frames(srcs, q: mp.Queue): 
    """子进程对象。用于视频流解码,并将解码后的图像填充到队列中。""" 
    ctx = ascend.Context({DEVICE_ID}).context_dict[DEVICE_ID] 
    caps = [ascend.VideoCapture(ctx, v, i+1) for i, v in enumerate(srcs)] 
    print("Opening streams...") 
    
    while RUNNING.value == 1: 
        for i, cap in enumerate(caps): 
            if cap.is_open(): 
                yuv, _ = cap.read(False) 
                    if yuv is not None: 
                        bgr = cv2.cvtColor(yuv.to_np, cv2.COLOR_YUV2BGR_NV12) 
                        q.put((bgr, i)) 
    print("Streaming stopped.") 
    
def main(): 
    # 设定视频源 
    video_srcs = ['/home/robin/Videos/pose.mp4', '/home/robin/Videos/mask.mp4', ] 
    
    # 构建用于交换数据的队列 
    frame_q = mp.Queue(maxsize=BUFFER_SIZE) 
    
    # 构建进程并启动它 
    p = mp.Process(target=get_frames, args=(video_srcs, frame_q,))   
    p.start() 
    
    # 预览图像。 
    while RUNNING.value == 1: 
        frame, cap_id = frame_q.get() 
        frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5) 
        cv2.imshow('{}'.format(cap_id), frame) 
        
        if cv2.waitKey(1) == 27: 
            RUNNING.value = 0 
            
    print("Closing queue...") 
    frame_q.close() 
    
    print("Joining process...") 
    p.terminate() 
    
    print("All done.") 

if __name__ == "__main__": 
    main()

与之前的代码相比,这段代码的主要改动发生在 get_frames 函数中:使用 ascend.VideoCapture 替换了OpenCV的 VideoCapture 。同时为了让硬件解码正常工作,在子进程的创建函数中初始化了昇腾硬件资源抽象context。另外需要注意的是昇腾硬件解码后的图像格式为YUV420SP。第25行代码将其转换为OpenCV BGR格式。

至此,你已经成功在一个子进程中启用了昇腾芯片的硬件解码加速功能。接下来,你需要在另一个进程中实现人脸检测功能。

NPU加速的人脸检测

与视频解码不同,人脸检测需要调用神经网络模型加速硬件NPU。你也许已经猜到了,这个调用过程同样离不开昇腾芯片的硬件抽象。一个典型的调用方式大概是这样:

# 导入ascendfly 
import ascend 

# 构建硬件抽象 
context = ascend.Context({0}).context_dict[0] 

# 初始化模型 
model = ascend.AscendModel(context, 'detector.om')
离线模型的构建方式

虽然你已经在 get_frames 函数中构建了一个context,但是由于人脸检测模型与视频解码不在同一个进程下,如何实现context的共用?

一种方法是将 get_frames 函数中的context构建删掉,在主进程中构建context,然后将其作为参数传入子进程。修改内容大致如下:

# 用于子进程创建的函数 
def get_frames(context, src, q: mp.Queue): 
    cap = ascend.VideoCapture(context, src) 
    
# 在main函数中 
def main():
    context = ascend.Context({0}).context_dict[0] 
    p = mp.Process(target=get_frames, args=(context, video_src, frame_q,))
将context作为参数传递给子进程

但是,这种传递方式是无效的。这里的昇腾硬件抽象context是与进程绑定在一起的。它无法作为一个参数传递给另一个进程去使用。那么分别在两个进程中创建各自的context呢?

# 用于子进程创建的函数 
def get_frames(src, q: mp.Queue): 
    context = ascend.Context({0}).context_dict[0] 
    cap = ascend.VideoCapture(context, src) 
    
# 在main函数中 
def main():
    context = ascend.Context({0}).context_dict[0] 
    p = mp.Process(target=get_frames, args=(context, video_src, frame_q,))
在两个进程中分别构建自己的context

这个思路是正确的。但是,像上方这样的实现,会导致进程挂起,陷入永久的等待之中。而要解决这个问题,需要先从Python下的进程创建说起。

Spawn与Fork

进程是计算机中一系列运算操作的集合,这个小集合共享当前进程所拥有的运算资源。从现有进程派生出的新进程叫作子进程。在Linux下,Python支持三种进程派生方式,其中较为常用的有两个:Spawn与Fork。

Spawn的字面意思是鱼、蛙产卵。根据Python官方文档,采用这种方式创建的子进程,只继承了运行所需的部分必要资源,就像是上级进程“生”出来的一样,而非上级进程的克隆。像是文件操作符等资源不会被继承。

图像来源 Rachel Hisko 

Fork的字面意思是叉子。以这种方式创建的子进程从上级进程中继承了全部资源,就像是一把叉子的末端分叉出的多个分叉。在分叉的起始点它们是一模一样的。

子进程也是进程,这意味着它也会有属于自己的运算资源。但是由于派生方式的不同,导致它们在派生之后拥有的运算资源存在差异。这时候回想一下,前文中在使用昇腾芯片的硬件加速模块时,必须要为其创建的硬件抽象context也是一种资源。读到这里你也许已经猜到了,进程挂起多半是与硬件抽象context的继承方式有关。

的确如此。根据华为工程师的说明,昇腾硬件资源的跨进程继承当前只支持Spawn方式。而根据Python官方文档,在Linux下,进程的默认派生方式为Fork。所以,前述代码中未设定子进程派生方式的时候,Python以Fork的形式创建了子进程,子进程中继承的升腾运算资源无法正常使用,这是导致进程挂起的原因。而其解决方式也很自然:

  1. 以Spawn的形式派生子进程。
  2. 在子进程中初始化新的昇腾运算抽象资源。

经测试,使用这种方式可以实现Python下的多进程昇腾加速运算。

总结

使用Python开发华为Atlas多进程应用时,需要注意昇腾开发SDK当前仅支持以Spawn的方式派生新进程。在Linux下进程的默认派生方式为Fork,这可能会导致进程挂起失去响应。显式的指定Spawn派生方式,并在子进程中初始化昇腾硬件抽象即可解决这个问题。

参考资料

  1. 进程Fork的历史:https://www.youtube.com/watch?v=4crIuSj-irQ
  2. Python多进程文档:https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

专属内容

一段Atlas多进程调用代码,实现了硬件解码与离线模型加载。