PyAV で「Viewing the CaffeNet model’s predictation and its target video at the same time. 」な動画を作る、からの「やや」汎用化

同じ処理を何度も必要としそうな上に、PyAV で「Viewing the CaffeNet model’s predictation and its target video at the same time. 」な動画を作る、の巻にはちょっとした問題もあってさ。

かなり自覚的に雑に書いたもんで、PyAV で「Viewing the CaffeNet model’s predictation and its target video at the same time. 」な動画を作る、の巻は Python 3 で動作しなかったし、「末尾での flush」をしてなかったり、「そもそも超絶スパゲッティ」だったり。

スパゲティ的には大して進化はしてないけれど、PyAV で「Viewing the CaffeNet model’s predictation and its target video at the same time. 」な動画を作る、の巻から「オレ的に肝心なとこ」、つまり「機械学習の練習」部分を取り除いて、代わりにその部分を「指定モジュール」から読み出せるようにした「微汎用化スクリプト」:

pyav_imagefilter.py みたいにエラそーな名前をつけてみた
  1 #! /bin/env python
  2 import os
  3 import sys
  4 import argparse
  5 import signal
  6 import logging
  7 from multiprocessing import Process, Queue
  8 try:
  9     from Queue import Empty
 10 except ImportError:
 11     from queue import Empty
 12 
 13 from PIL import Image, ImageFont, ImageDraw
 14 import av
 15 
 16 def _importmodule(arg):
 17     import importlib
 18     if ("\\" not in arg) and ("/" not in arg):
 19         return importlib.import_module(arg)
 20 
 21     path = arg
 22     filename = os.path.basename(path)
 23     name, ext = os.path.splitext(filename)
 24 
 25     if hasattr(importlib, "util"):
 26         # python 3.x
 27         import importlib.util
 28         spec = importlib.util.spec_from_file_location(name, filename)
 29         module = importlib.util.module_from_spec(spec)
 30         spec.loader.exec_module(module)
 31     else:
 32         # python 2.x
 33         import imp
 34         magic = imp.get_magic()
 35         with open(path, 'r') as file:
 36             if file.read(len(magic)) == magic:
 37                 kind = imp.PY_COMPILED
 38             else:
 39                 kind = imp.PY_SOURCE
 40             file.seek(0)
 41             module = imp.load_module(name, file, path, (ext, 'r', kind))
 42     return module
 43 
 44 
 45 def _textoverlay(img, txts):
 46     # TODO: more flexible: font size, layout of tile
 47     fnt = ImageFont.truetype('couri.ttf', 26)
 48 
 49     tmpdctx = ImageDraw.Draw(img)
 50     x = 30
 51     for txt in txts:
 52         txtsz = tmpdctx.multiline_textsize(txt, fnt)
 53         osd = Image.new("RGB", (txtsz[0] + 10, txtsz[1] + 10), "white")
 54         dctx = ImageDraw.Draw(osd)
 55         dctx.multiline_text((5, 5), txt, font=fnt, fill="black")
 56         del dctx
 57         img.paste(
 58             osd,
 59             box=(x, 30, osd.size[0] + x, osd.size[1] + 30),
 60             mask=Image.new("L", osd.size, 192))
 61         x += txtsz[0] + 50
 62     del tmpdctx
 63 
 64 
 65 def _run(args, q):
 66     def _IntHandler(signum, frame):
 67         q.put("done")
 68 
 69     signal.signal(signal.SIGINT, _IntHandler)
 70 
 71     logging.basicConfig(stream=sys.stderr, level=logging.INFO)
 72 
 73     filter_func = _importmodule(args.image_filter_module).filter_func
 74 
 75     #
 76     icntnr = av.open(args.inputmovie_path)
 77     ocntnr = av.open(args.inputmovie_path + ".out.mp4", "w")
 78 
 79     ivstrm = next(s for s in icntnr.streams if s.type == 'video')
 80     iastrm = next(s for s in icntnr.streams if s.type == 'audio')
 81     ostrms = {
 82         "audio": ocntnr.add_stream(codec_name=iastrm.codec.name, rate=iastrm.rate),
 83         "video": ocntnr.add_stream(codec_name=ivstrm.codec.name, rate=ivstrm.rate),
 84         }
 85     ostrms["video"].width = ivstrm.width
 86     ostrms["video"].height = ivstrm.height
 87 
 88     if args.start_sec:
 89         seek_pts_v = int(args.start_sec / float(ivstrm.time_base) + ivstrm.start_time)
 90         seek_pts_a = int(args.start_sec / float(iastrm.time_base) + iastrm.start_time)
 91         iastrm.seek(seek_pts_a)
 92         ivstrm.seek(seek_pts_v)
 93 
 94     def _closing():
 95         for typ in ostrms.keys():
 96             try:
 97                 # flush the rest in queue
 98                 for p in ostrms[typ].encode():
 99                     ocntnr.mux(p)
100             except av.AVError as e:  # End Of File
101                 pass
102         ocntnr.close()
103 
104     count = 0
105     for packet in icntnr.demux():
106         for ifr in packet.decode():
107             try:
108                 r = q.get(block=False, timeout=1/500.)
109                 if r:
110                     _closing()
111                     return
112             except Empty as e:
113                 pass
114 
115             typ = packet.stream.type
116             ifr.pts = None
117             if typ == 'video':
118                 img = ifr.to_image()
119                 # --------------------------------------------
120                 if count % args.step == 0:
121                     img, txts = filter_func(img)  #
122                     logging.debug(txts)
123                 # --------------------------------------------
124                 _textoverlay(img, txts)
125 
126                 ofr = av.VideoFrame.from_image(img)
127                 for p in ostrms[typ].encode(ofr):
128                     ocntnr.mux(p)
129                 if count % args.step == 0:
130                     logging.info("count={}".format(count))
131                 count += 1
132             else:
133                 for p in ostrms[typ].encode(ifr):
134                     ocntnr.mux(p)
135             if args.count and args.count <= count:
136                 _closing()
137                 return
138                 
139     _closing()
140 
141 
142 if __name__ == '__main__':
143     parser = argparse.ArgumentParser()
144     parser.add_argument("inputmovie_path")
145     parser.add_argument("image_filter_module")
146     parser.add_argument("--count", type=int, default=0)
147     parser.add_argument("--step", type=int, default=1)
148     parser.add_argument("--start_sec", type=int, default=0)
149     args = parser.parse_args()
150 
151     q = Queue()
152     p = Process(target=_run, args=(args, q,))
153     p.start()
154     p.join()

このスクリプトの第二引数 image_filter_module のファイルを例えば「hoge.py」として:

hoge.py
1 def filter_func(img):
2     return img, [repr(img)]

みたいに「filter_func」という名前の「img を受け、結果 img と何らかテキストのリスト」を返すようにしておくと、pyav_imagefilter.py がエラそーに結果 img を使いつつ、返却されたテキストのリストを画像に埋め込む。

元の「機械学習の練習」だった部分を全部この filter_func に詰め込みやがれ、つーわけである。


さて今回のこれ。もちろん「機械学習の練習」を続けることにも目的はあるけれど、それだけでなくて、もっとしょうもないような用途に使いたくて切り出した。

そもそも「フィルタ」が欲しいだけなら、PyAV の(生半可な)紹介で書いておいた通り ffmpeg が内蔵しているフィルタを使う方がひょっとしたら出来ることも多いかも知れず、なのでこの「微汎用化」は、実は実用性という意味では全然大したもんではない。

けど例えば「Pillow や scikit-image の振る舞いを確かめたい」みたいな場合に、「色んな静止画を与えてチマチマ確認する」のって実はかなり大変になりえるわけね。変化が大々的な加工はともかく、そうでない微妙な加工は、ほとんどの場合「入力の性質」に大きく依存し、そしてそういう「都合のいい静止画」を探すことが大変だったりするわけね。これってのはPyAV で「Viewing the CaffeNet model’s predictation and its target video at the same time. 」な動画を作る、の巻のモチベーションと全く同じもので、つまり「だったら動画を入力にしちまえ」てことである。

例えば上の hoge.py をこんなんにしてみる:

hoge.py
 1 from PIL import Image
 2 from PIL import ImageFilter
 3 from PIL.ImageFilter import UnsharpMask
 4 
 5 class _MyF(object):
 6     def __init__(self):
 7         self._i = 1
 8 
 9     def filter_func(self, img):
10         dimg = img.filter(UnsharpMask(radius=self._i, percent=150, threshold=3))
11         res = dimg, ["radius=%d, percent=150, threshold=3" % self._i]
12         self._i = max(1, (self._i + 1) % 20)
13         return res
14 
15 _myf = _MyF()
16 filter_func = _myf.filter_func

こうすれば UnsharpMask の radius の与え方でどう変化するのかを連続的に視覚的に見ることが出来る。(例はほんとうにただの例。意味のあるものではない。)

もちろんこのアプローチには欠点もあって、動画な都合、「lossless」が案外大変だということ。ここでやってるように h264 な mpeg4 にエンコードしてしまう、ということは、あるいはそういう動画を入力にしてしまうということは、「やろうとしてる加工の繊細さ」の邪魔になる場合もある。たとえば dither の与え方による違いを見たい、なんて目的があるなら、そういうのは一枚一枚マジメに静止画与えて確認した方がいい。そうでないと騙される可能性がある。