快速生成网络mp4视频缩略图技术


背景

由于网络原因,在下载视频之前我们往往会希望能够先生成一些视频的缩略图,大致浏览视频内容,再确定是否应花时间下载。如何能够快速得到视频多个帧的缩略图的同时尽量少的下载视频的内容,是一个值得研究的问题。

思路

众所周知,不考虑音频、字幕的话,视频是由多个图像帧拼接而成的,因此我们的目标也就是尽量只下载视频中我们想下载的帧图片,而忽略其他的信息,那么就需要获得对应帧在文件中所在的位置、大小、以及编码格式,为此,首先需要了解视频容器的格式,由于日常生活中h264编码的mp4格式用得比较多,所以这里只分析采用h264编码的mp4格式。

mp4格式的主要组织格式是box,box之间可以相互嵌套,每个box的前8个字节包括box的名字和这个box大小。在我看来,这些box主要分为三类,第一类存储的是整个视频的元信息,像视频的作者、分辨率、帧率等信息,这些信息基本对解码没有影响,所以可以跳过,第二类存储的是编码信息,包括视频的关键帧、帧的大小、视频分块的位置和包含的帧数等信息,这个是我们最关心的信息,需要完整地下载下来。第三类存储的是实际数据,这些我们不用完整下载,可以通过前面的编码信息计算我们所需要的帧在数据box中的位置,然后只下载帧对应的数据就行了,这个操作也是比较简单的。因此,重中之重就是如何分析视频的编码信息。

存储视频编码信息的box叫stbl,其下包括以下几个box:

这样一来,找到一个关键帧的过程大概可以分为以下几步:

  1. 通过stsc box,不断累加每个分块的帧数,得到所需关键帧在哪个分块中
  2. 通过stsc box和stsz box,得到所需关键帧在对应分块中的偏移量,需要将分块中排在所需关键帧前面的帧的大小累加起来
  3. 通过stco box得到分块的偏移量,加上刚才计算得到的所需关键帧的块中偏移量,就是所需关键帧在文件中的偏移量,通过stsz box获得所需关键帧的大小,就能得到完整的所需关键帧数据了。

接下来需要考虑的就是得到的关键帧数据,怎么转换为图片。mp4里的视频画面数据用的编码格式是h264,由于h264解码算法非常复杂,因此我使用ffmpeg程序来进行这一操作。但是,直接从mp4里提取出来的关键帧数据并不能被ffmpeg的h264解码器所识别。这是因为h264的格式分为两类,一类叫Annex-B,称为h264裸流,可以直接被各类解码器、播放器处理和播放;一类叫AVCC,通常使用h264编码的mp4、avi等视频容器中的画面数据用的就是这个格式。也就是说,提取出来的关键帧数据是AVCC格式,需要将其转换为Annex-B格式,才能被ffmpeg转码成png之类的格式。

那么这两个格式有什么不同吗?h264格式的视频流都是由一个个NAL包组成的,每个NAL包可能是帧数据,也可能是其他一些数据,在Annex-B中,其他数据的NAL包头部为00 00 00 01四个字节,帧数据的NAL包头部为由00 00 00 01分隔的PPS和SPS字节串,这两种字节串里存储了帧的分辨率等信息,使解码器能够正确解析h264流。而AVCC的NAL包的前四个字节一律为该NAL包的大小。那么AVCC格式的h264流在mp4容器里是怎么被正确解码的呢?原来它的SPS和PPS字节串存在前面所述的stsd box中,所以将AVCC转换成Annex-B的方法就是将其每个NAL包的前四个字节进行替换,如果该包存储其他数据,就将前四个字节改为00 00 00 01,否则改成由00 00 00 01分隔的PPS和SPS字节串。通常存储帧数据的NAL包第5个字节为0x65,可以由此判断。

于是整个程序的流程就很清晰了,每次下载当前box的名字和大小(前8个字节),如果这个box是编码相关box的父box就进入,否则就跳过,当到达对应的box时则下载需要的信息。接着在关键帧列表中尽量均匀地选取关键帧,然后找到这些关键帧对应的数据位置把它们下载下来,将数据由AVCC转换成Annex-B,然后运行ffmpeg将数据帧转换成图片格式。下载文件片段可以使用HTTP请求里的Range头,为了加快速度,一些能够并行的操作可以用多线程来处理。

代码

import sys
import struct
import requests
import subprocess
from multiprocessing.dummy import Pool

img_num = 240 # 缩略图数量
url = 'https://...' # 视频地址
path = 'D:\\tmp\\' # 下载路径
now = 0
small = set(['moov', 'trak', 'mdia', 'minf', 'stbl']) # 需要“进入”的box名
sample2chunk = [] # stsc box中每一段的起始chunk和帧数
chunk_offset = [] # 分块在文件中的地址
sample_size = [] # 帧大小
key_sample = [] # 选取的关键帧
frames = [] # 选取的关键帧在文件中的偏移和大小
cnt = 5 # 待下载的box的数目
sps = b''
pps = b''
requestnum = 0

def getrange(start, length):
    global requestnum
    requestnum += 1
    while True:
        r = os.system(f'curl -H "Range: bytes={start}-{start + length - 1}" {url} --output {path}temp{start}.bin -m {60 + length // 4000} -f') # 这里运行curl来下载,指定了下载重试时间
        if r == 0: # 如果下载成功(curl返回值为0)
            break
    f = open(f'{path}temp{start}.bin', 'rb')
    s = f.read()
    f.close()
    os.system(f'del {path}temp{start}.bin')
    return s

while cnt > 0:
    s = getrange(now, 8)
    name = s[4:].decode() # box名
    length, = struct.unpack('>l', s[:4]) # box大小
    print(name, length)
    if name in small:
        now += 8 # “进入”该box
    elif name == 'stsd': # 获取sps和pps
        s = getrange(now + 7 * 16 + 4, length - 7 * 16 - 4)
        spsl, = struct.unpack('>H', s[0:2])
        sps = s[2:2 + spsl]
        ppsl, = struct.unpack('>H', s[2 + spsl + 1:spsl + 5])
        pps = s[spsl + 5:spsl + 5 + ppsl]
        now += length
        cnt -= 1
    elif name == 'stsc':
        s = getrange(now, length)
        count, = struct.unpack('>l', s[12:16])
        for i in range(count):
            fc, = struct.unpack('>l', s[16 + i * 12:20 + i * 12])
            spc, = struct.unpack('>l', s[20 + i * 12:24 + i * 12])
            sample2chunk.append((fc, spc))
        now += length
        cnt -= 1
    elif name == 'stco':
        fraglength = length // 12
        frag = [(now + i * fraglength, fraglength) for i in range(11)] # 由于这个box比较大,所以对这个box进行分段然后用多线程来处理
        frag.append((now + 11 * fraglength, length - 11 * fraglength)) # 最后一段
        pool = Pool(12)
        r = pool.map(lambda x: getrange(*x), frag)
        pool.close()
        pool.join()
        s = b''.join(r)
        count, = struct.unpack('>l', s[12:16])
        for i in range(count):
            chunk_offset.append(struct.unpack('>l', s[16 + i * 4:20 + i * 4])[0])
        now += length
        cnt -= 1
    elif name == 'stsz':
        fraglength = length // 12
        frag = [(now + i * fraglength, fraglength) for i in range(11)] # 同上,分段多线程
        frag.append((now + 11 * fraglength, length - 11 * fraglength))
        pool = Pool(12)
        r = pool.map(lambda x: getrange(*x), frag)
        pool.close()
        pool.join()
        s = b''.join(r)
        count, = struct.unpack('>l', s[16:20])
        for i in range(count):
            sample_size.append(struct.unpack('>l', s[20 + i * 4:24 + i * 4])[0])
        now += length
        cnt -= 1
    elif name == 'stss':
        s = getrange(now, length)
        count, = struct.unpack('>l', s[12:16])
        gap = count // img_num # 选取关键帧的间隔大小
        num = count % img_num # 为了尽可能均匀选取,前num个关键帧间隔为gap,后面的关键帧间隔为gap-1
        gap += 1
        for i in range(1, img_num + 1):
            if i <= num:
                key_sample.append(struct.unpack('>l', s[12 + i * gap * 4:16 + i * gap * 4])[0])
            else:
                t = (i - num) * (gap - 1)
                key_sample.append(struct.unpack('>l', s[12 + (num * gap + t) * 4:16 + (num * gap + t) * 4])[0])
        now += length
        cnt -= 1
    else: # 跳过该box
        now += length
sample2chunk.append((len(chunk_offset) + 1, 0)) # 添加一个边界
for i in key_sample:
    sample = 0
    first_chunk = (0, 0)
    for j in range(len(sample2chunk) - 1): # 获得帧对应的是第几段
        if sample + (sample2chunk[j + 1][0] - sample2chunk[j][0]) * sample2chunk[j][1] >= i:
            first_chunk = sample2chunk[j]
            break
        else:
            sample += (sample2chunk[j + 1][0] - sample2chunk[j][0]) * sample2chunk[j][1]
    true_chunk = (i - sample - 1) // first_chunk[1] + first_chunk[0] # 获得帧对应的分块编号
    sample += (i - sample - 1) // first_chunk[1] * first_chunk[1] # 该分块的起始帧编号
    offset = 0
    for j in sample_size[sample:i - 1]: # 累加帧在分块中的偏移
        offset += j
    frames.append((chunk_offset[true_chunk - 1] + offset, sample_size[i - 1]))

def process_frame(frame):
    i, j = frame
    avcc = getrange(*j)
    anexb = b''
    now = 0
    sp = b'\x00\x00\x00\x01'
    while now < j[1]:
        length, = struct.unpack('>l', avcc[now:now + 4])
        if avcc[now + 4] != 0x65: # 其他包
            anexb += sp + avcc[now + 4:now + 4 + length]
            now += 4 + length
        else: # 帧数据包
            anexb += sp + sps + sp + pps + sp + avcc[now + 4:now + 4 + length]
            now += 4 + length
    f = open(f'{path}img{i:03d}.bin', 'wb')
    f.write(anexb)
    f.close()
    os.system(f'ffmpeg -i {path}img{i:03d}.bin {path}img{i:03d}.jpg') # 转码
    os.system(f'del {path}img{i:03d}.bin')

pool = Pool(12)
pool.map(process_frame, list(enumerate(frames))) # 获取帧数据和转换过程用多线程并行
pool.close()
pool.join()

后记

注意该程序默认视频图像流在音频流前面,因为图片流和音频流的父box名都是trak,所以如果音频流在前面可能程序会运行出错。

本篇文章没有对mp4的详细格式进行完整介绍,如果想了解可以参考网上的其他博客,最好的方法是下载一个mp4格式检查器(如mp4 Inspector),它可以自动显示mp4里的box结构,并以十六进制显示box里内容,这样看的更清楚明白。