Python の wave モジュール例の殴り書き(5)

そろそろ「waveモジュール」関係なくなり始めるし、そっちこそがオレ的本題。

それでもまだ辛うじて「wave モジュール」感をとどめるので、続き物として続けてみる。

まず(4)を先に少し「整理」。つまり「別モジュールに追い出す」:

tiny_wave_wrapper.py
 1 #
 2 # NOTE:
 3 #   Don't think this module is highly flexible but this is just
 4 #   for my personal use. Especially this module can't handle
 5 #   high-resolution PCM (24bit), etc, etc, etc.
 6 #
 7 import itertools
 8 import wave
 9 
10 
11 try:
12     # for python 2.7
13     zip = itertools.izip
14 except AttributeError:
15     pass
16 
17 
18 class _WaveRWBase(object):
19     def __init__(self, waveobj):
20         self._wave = waveobj
21 
22     def __enter__(self):
23         return self
24 
25     def __exit__(self, type, value, tb):
26         try:
27             self._wave.close()
28         except:
29             pass
30 
31     def __getattr__(self, attrname):
32         return getattr(self._wave, attrname)
33 
34 
35 class WaveReader(_WaveRWBase):
36     def __init__(self, fname):
37         _WaveRWBase.__init__(self, wave.open(fname, "r"))
38 
39 
40 class WaveWriter(_WaveRWBase):
41     def __init__(self, fname, nchannels, sampwidth, samprate):
42         _WaveRWBase.__init__(self, wave.open(fname, "w"))
43         self._wave.setparams(
44             (nchannels, sampwidth, samprate,
45              # For seekable output streams, the wave header will
46              # automatically be updated to reflect the number of
47              # frames actually written.
48              0,
49              'NONE', 'not compressed'  # you can't change these
50              )
51             )
52 
53     def _writepcmraw(self, pcm):
54         import struct
55 
56         # NOTE: 'h' means 16bit integer.
57         packed = struct.pack('h' * len(pcm), *pcm)
58         self._wave.writeframes(packed)
59 
60     def writechannels(self, channels):
61         import struct
62         CHUNK_SIZE = 4096
63 
64         _pcm = []
65         for d in itertools.chain.from_iterable(zip(*channels)):
66             _pcm.append(d)
67             if len(_pcm) == CHUNK_SIZE:
68                 self._writepcmraw(_pcm)
69                 _pcm = []
70         if _pcm:
71             self._writepcmraw(_pcm)

一番頭でコメントしてる通り。「オレにも使えるぜ」と期待されると困る。可搬性は一切考えてないのであるからして。

で一応(4)のコードをこの「インチキモジュール」で書き換えたもの:

hoge.py
 1 import itertools
 2 import numpy as np
 3 from tiny_wave_wrapper import WaveReader, WaveWriter
 4 
 5 
 6 try:
 7     # for python 2.7
 8     zip = itertools.izip
 9 except AttributeError:
10     pass
11 
12 
13 if __name__ == '__main__':
14     import os
15     import argparse
16     parser = argparse.ArgumentParser()
17     parser.add_argument(
18         "mode", choices=[
19             "copy", "reverse", "swaplr", "rdelay",
20             "left2mono", "right2mono", "negate", "sampx2"
21             ])
22     parser.add_argument("--modeintparam", type=int, default=0)
23     parser.add_argument("target")
24     args = parser.parse_args()
25 
26     with WaveReader(args.target) as fi:
27         nchannels, width, rate, nframes, _, _ = fi.getparams()
28         onchannels = nchannels
29         raw = np.fromstring(fi.readframes(nframes), dtype=np.int16)
30         if args.mode == "reverse":
31             channels = reversed(raw[::2]), reversed(raw[1::2])
32         elif args.mode == "swaplr":
33             # if nchannels is 1, actualy this swap odd and even.
34             channels = raw[1::2], raw[::2]
35         elif args.mode == "rdelay":
36             if not args.modeintparam:
37                 delay = rate // 50
38             else:
39                 delay = args.modeintparam
40             channels = [
41                 np.hstack((raw[::2], np.zeros(delay, dtype=np.int16))),
42                 np.hstack((np.zeros(delay, dtype=np.int16), raw[1::2]))]
43         elif nchannels == 1 and args.mode in ("left2mono", "right2mono"):
44             channels = (raw,)
45         elif args.mode == "negate":
46             channels = (-raw[::2], -raw[1::2])
47         elif args.mode == "left2mono":
48             channels = (raw[::2],)
49             onchannels = 1
50         elif args.mode == "right2mono":
51             channels = (raw[1::2], )
52             onchannels = 1
53         elif args.mode == "sampx2":
54             left, right = raw[::2], raw[1::2]
55             left = itertools.chain.from_iterable(zip(left, left))
56             right = itertools.chain.from_iterable(zip(right, right))
57             channels = left, right
58         else:
59             channels = raw[::2], raw[1::2]
60 
61     ofname = os.path.splitext(args.target)[0] + "_" + args.mode + '.wav'
62     with WaveWriter(ofname, onchannels, width, rate) as fo:
63         fo.writechannels(channels)

itertools も 2.7 用 zip 対策もこちらから消えないのはこれはしょうがない。後者がやなら six でも使ってみれば? (3)だったかで言ったように、「オレ的本題」で混乱したらいつでも戻ってこられるホームポジションが欲しくて書いたものなので、何かしら実用的に価値があるもんではなくて、ただの「サンプル」用途ね。

で、より本題に近いのはたとえば:

 1 import itertools
 2 import numpy as np
 3 from tiny_wave_wrapper import WaveReader, WaveWriter
 4 
 5 
 6 try:
 7     # for python 2.7
 8     zip = itertools.izip
 9     map = itertools.imap
10 except AttributeError:
11     pass
12 
13 
14 if __name__ == '__main__':
15     import os
16     import argparse
17     parser = argparse.ArgumentParser()
18     parser.add_argument("target")
19     args = parser.parse_args()
20 
21     with WaveReader(args.target) as fi:
22         nchannels, width, rate, nframes, _, _ = fi.getparams()
23         onchannels = nchannels
24         raw = np.fromstring(fi.readframes(nframes), dtype=np.int16)
25         channels = raw[::2], raw[1::2]
26 
27     def _i16(d):
28         _I16MAX = 2**15 - 1
29         return int(max(-_I16MAX, min(d.real, _I16MAX)))
30 
31     pcn = [[], []]
32     for chn in range(len(pcn)):
33         # to freq domain
34         fft = np.fft.fft(channels[chn])  # don't use 'rfft' in this case.
35 
36         # helper to interpret freq
37         #freq = np.fft.fftfreq(len(fft), 1./rate)
38 
39         # do something to fft-ed
40         #fft[freq < 1000] = 0
41 
42         # to time domain
43         pcn[chn] = map(_i16, np.fft.ifft(fft))  # don't use 'irfft' in this case.
44 
45 
46     ofname = os.path.splitext(args.target)[0] + "_fftifft" + '.wav'
47     with WaveWriter(ofname, onchannels, width, rate) as fo:
48         fo.writechannels(pcn)

まぁこれは「fft してその inverse で元に戻れろ?」を確認したくてやったんだけれど、個人的にはこれだけで既にどハマリした。この場合は「r版」(実数版)使っちゃダメ、戻らない。あと「do something to fft-ed」部分もそう簡単にはいかない。というか結果の確認が簡単じゃない。のかな、あるいは単に理解が足りてない?

で、ひとまずちょっと迷走しつつも「それっぽい」ヤツ:

 1 import itertools
 2 import numpy as np
 3 from tiny_wave_wrapper import WaveReader, WaveWriter
 4 
 5 
 6 try:
 7     # for python 2.7
 8     zip = itertools.izip
 9     map = itertools.imap
10 except AttributeError:
11     pass
12 
13 
14 # -------
15 _SCALES = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"]
16 _SCALES = {s: i for i, s in enumerate(_SCALES)}
17 
18 def _on2nnrange(on):
19     """octave no to range of note numbers"""
20     s = (on + 2) * 12
21     return range(s, s + 12)
22 
23 def _freq(d):
24     """
25     note number to frequency.
26 
27     Converting from midi note number (d) to frequency (f) is given by the following formula:
28 
29     .. math::
30 
31         f = 2^{\frac{(d - 69)}{12}} \times 440 [Hz]
32 
33     See `MIDI tuning standard <https://en.wikipedia.org/wiki/MIDI_tuning_standard>`_.
34     """
35     import math
36     return math.pow(2, (d - 69) / 12.) * 440
37 
38 def _onsc2f(on, sc):
39     return _freq(_on2nnrange(on)[_SCALES[sc]])
40 # -------
41 
42 
43 # -------
44 def apply_butter(data, cutoff, fs, btype='bandpass', order=5):
45     from scipy.signal import butter, lfilter
46 
47     nyq = 0.5 * fs  # nyquist
48     normal_cutoff = cutoff / nyq
49     print(cutoff, normal_cutoff)
50 
51     b, a = butter(order, normal_cutoff, btype=btype, analog=False)
52     return lfilter(b, a, data)
53 # -------
54 
55 
56 if __name__ == '__main__':
57     import os
58     import argparse
59     parser = argparse.ArgumentParser()
60     parser.add_argument("target")
61     args = parser.parse_args()
62 
63     with WaveReader(args.target) as fi:
64         nchannels, width, rate, nframes, _, _ = fi.getparams()
65         onchannels = nchannels
66         raw = np.fromstring(fi.readframes(nframes), dtype=np.int16)
67         channels = raw[::2], raw[1::2]
68 
69     def _i16(d):
70         _I16MAX = 2**15 - 1
71         return int(max(-_I16MAX, min(d.real, _I16MAX)))
72 
73     pcm = [[], []]
74     for chn in range(len(channels)):
75         pcm[chn] = map(
76             _i16,
77             apply_butter(
78                 channels[chn],
79                 np.array([_onsc2f(2, "C"), _onsc2f(9, "C")]),
80                 rate))
81 
82     ofname = os.path.splitext(args.target)[0] + "_" + "butter" + '.wav'
83     with WaveWriter(ofname, onchannels, width, rate) as fo:
84         fo.writechannels(pcm)

一応 bandpass filter てことになるんだけれど、スペクトラムを描画しつつ確認してるわけじゃないので、うまくいってるのかがどうもわからん。それっぽいはそれっぽいんだけれど、どういう範囲で切り取っても結構ホワイトノイズが入ったように聞こえる音声になるんだけれど、そういうもんであろうか? もちっと冷静に考えないと正しさについてよくわかんない。

イコライザを作りたいわけでもないし、アナライザを作りたいんでもない、のではあるけれど、ワタシ的本題のためには結局は近しいことが必要なので、ちゃんと出来たほうがいいのには違いはない。まぁ地道にやるさ。