そろそろ「waveモジュール」関係なくなり始めるし、そっちこそがオレ的本題。
それでもまだ辛うじて「wave モジュール」感をとどめるので、続き物として続けてみる。
まず(4)を先に少し「整理」。つまり「別モジュールに追い出す」:
1 #
2 # NOTE:
3 # Don't think this module is highly flexible but this is just
4 # for my personal use. Especially this module can't handle
5 # high-resolution PCM (24bit), etc, etc, etc.
6 #
7 import itertools
8 import wave
9
10
11 try:
12 # for python 2.7
13 zip = itertools.izip
14 except AttributeError:
15 pass
16
17
18 class _WaveRWBase(object):
19 def __init__(self, waveobj):
20 self._wave = waveobj
21
22 def __enter__(self):
23 return self
24
25 def __exit__(self, type, value, tb):
26 try:
27 self._wave.close()
28 except:
29 pass
30
31 def __getattr__(self, attrname):
32 return getattr(self._wave, attrname)
33
34
35 class WaveReader(_WaveRWBase):
36 def __init__(self, fname):
37 _WaveRWBase.__init__(self, wave.open(fname, "r"))
38
39
40 class WaveWriter(_WaveRWBase):
41 def __init__(self, fname, nchannels, sampwidth, samprate):
42 _WaveRWBase.__init__(self, wave.open(fname, "w"))
43 self._wave.setparams(
44 (nchannels, sampwidth, samprate,
45 # For seekable output streams, the wave header will
46 # automatically be updated to reflect the number of
47 # frames actually written.
48 0,
49 'NONE', 'not compressed' # you can't change these
50 )
51 )
52
53 def _writepcmraw(self, pcm):
54 import struct
55
56 # NOTE: 'h' means 16bit integer.
57 packed = struct.pack('h' * len(pcm), *pcm)
58 self._wave.writeframes(packed)
59
60 def writechannels(self, channels):
61 import struct
62 CHUNK_SIZE = 4096
63
64 _pcm = []
65 for d in itertools.chain.from_iterable(zip(*channels)):
66 _pcm.append(d)
67 if len(_pcm) == CHUNK_SIZE:
68 self._writepcmraw(_pcm)
69 _pcm = []
70 if _pcm:
71 self._writepcmraw(_pcm)
一番頭でコメントしてる通り。「オレにも使えるぜ」と期待されると困る。可搬性は一切考えてないのであるからして。
で一応(4)のコードをこの「インチキモジュール」で書き換えたもの:
1 import itertools
2 import numpy as np
3 from tiny_wave_wrapper import WaveReader, WaveWriter
4
5
6 try:
7 # for python 2.7
8 zip = itertools.izip
9 except AttributeError:
10 pass
11
12
13 if __name__ == '__main__':
14 import os
15 import argparse
16 parser = argparse.ArgumentParser()
17 parser.add_argument(
18 "mode", choices=[
19 "copy", "reverse", "swaplr", "rdelay",
20 "left2mono", "right2mono", "negate", "sampx2"
21 ])
22 parser.add_argument("--modeintparam", type=int, default=0)
23 parser.add_argument("target")
24 args = parser.parse_args()
25
26 with WaveReader(args.target) as fi:
27 nchannels, width, rate, nframes, _, _ = fi.getparams()
28 onchannels = nchannels
29 raw = np.fromstring(fi.readframes(nframes), dtype=np.int16)
30 if args.mode == "reverse":
31 channels = reversed(raw[::2]), reversed(raw[1::2])
32 elif args.mode == "swaplr":
33 # if nchannels is 1, actualy this swap odd and even.
34 channels = raw[1::2], raw[::2]
35 elif args.mode == "rdelay":
36 if not args.modeintparam:
37 delay = rate // 50
38 else:
39 delay = args.modeintparam
40 channels = [
41 np.hstack((raw[::2], np.zeros(delay, dtype=np.int16))),
42 np.hstack((np.zeros(delay, dtype=np.int16), raw[1::2]))]
43 elif nchannels == 1 and args.mode in ("left2mono", "right2mono"):
44 channels = (raw,)
45 elif args.mode == "negate":
46 channels = (-raw[::2], -raw[1::2])
47 elif args.mode == "left2mono":
48 channels = (raw[::2],)
49 onchannels = 1
50 elif args.mode == "right2mono":
51 channels = (raw[1::2], )
52 onchannels = 1
53 elif args.mode == "sampx2":
54 left, right = raw[::2], raw[1::2]
55 left = itertools.chain.from_iterable(zip(left, left))
56 right = itertools.chain.from_iterable(zip(right, right))
57 channels = left, right
58 else:
59 channels = raw[::2], raw[1::2]
60
61 ofname = os.path.splitext(args.target)[0] + "_" + args.mode + '.wav'
62 with WaveWriter(ofname, onchannels, width, rate) as fo:
63 fo.writechannels(channels)
itertools も 2.7 用 zip 対策もこちらから消えないのはこれはしょうがない。後者がやなら six でも使ってみれば? (3)だったかで言ったように、「オレ的本題」で混乱したらいつでも戻ってこられるホームポジションが欲しくて書いたものなので、何かしら実用的に価値があるもんではなくて、ただの「サンプル」用途ね。
で、より本題に近いのはたとえば:
1 import itertools
2 import numpy as np
3 from tiny_wave_wrapper import WaveReader, WaveWriter
4
5
6 try:
7 # for python 2.7
8 zip = itertools.izip
9 map = itertools.imap
10 except AttributeError:
11 pass
12
13
14 if __name__ == '__main__':
15 import os
16 import argparse
17 parser = argparse.ArgumentParser()
18 parser.add_argument("target")
19 args = parser.parse_args()
20
21 with WaveReader(args.target) as fi:
22 nchannels, width, rate, nframes, _, _ = fi.getparams()
23 onchannels = nchannels
24 raw = np.fromstring(fi.readframes(nframes), dtype=np.int16)
25 channels = raw[::2], raw[1::2]
26
27 def _i16(d):
28 _I16MAX = 2**15 - 1
29 return int(max(-_I16MAX, min(d.real, _I16MAX)))
30
31 pcn = [[], []]
32 for chn in range(len(pcn)):
33 # to freq domain
34 fft = np.fft.fft(channels[chn]) # don't use 'rfft' in this case.
35
36 # helper to interpret freq
37 #freq = np.fft.fftfreq(len(fft), 1./rate)
38
39 # do something to fft-ed
40 #fft[freq < 1000] = 0
41
42 # to time domain
43 pcn[chn] = map(_i16, np.fft.ifft(fft)) # don't use 'irfft' in this case.
44
45
46 ofname = os.path.splitext(args.target)[0] + "_fftifft" + '.wav'
47 with WaveWriter(ofname, onchannels, width, rate) as fo:
48 fo.writechannels(pcn)
まぁこれは「fft してその inverse で元に戻れろ?」を確認したくてやったんだけれど、個人的にはこれだけで既にどハマリした。この場合は「r版」(実数版)使っちゃダメ、戻らない。あと「do something to fft-ed」部分もそう簡単にはいかない。というか結果の確認が簡単じゃない。のかな、あるいは単に理解が足りてない?
で、ひとまずちょっと迷走しつつも「それっぽい」ヤツ:
1 import itertools
2 import numpy as np
3 from tiny_wave_wrapper import WaveReader, WaveWriter
4
5
6 try:
7 # for python 2.7
8 zip = itertools.izip
9 map = itertools.imap
10 except AttributeError:
11 pass
12
13
14 # -------
15 _SCALES = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"]
16 _SCALES = {s: i for i, s in enumerate(_SCALES)}
17
18 def _on2nnrange(on):
19 """octave no to range of note numbers"""
20 s = (on + 2) * 12
21 return range(s, s + 12)
22
23 def _freq(d):
24 """
25 note number to frequency.
26
27 Converting from midi note number (d) to frequency (f) is given by the following formula:
28
29 .. math::
30
31 f = 2^{\frac{(d - 69)}{12}} \times 440 [Hz]
32
33 See `MIDI tuning standard <https://en.wikipedia.org/wiki/MIDI_tuning_standard>`_.
34 """
35 import math
36 return math.pow(2, (d - 69) / 12.) * 440
37
38 def _onsc2f(on, sc):
39 return _freq(_on2nnrange(on)[_SCALES[sc]])
40 # -------
41
42
43 # -------
44 def apply_butter(data, cutoff, fs, btype='bandpass', order=5):
45 from scipy.signal import butter, lfilter
46
47 nyq = 0.5 * fs # nyquist
48 normal_cutoff = cutoff / nyq
49 print(cutoff, normal_cutoff)
50
51 b, a = butter(order, normal_cutoff, btype=btype, analog=False)
52 return lfilter(b, a, data)
53 # -------
54
55
56 if __name__ == '__main__':
57 import os
58 import argparse
59 parser = argparse.ArgumentParser()
60 parser.add_argument("target")
61 args = parser.parse_args()
62
63 with WaveReader(args.target) as fi:
64 nchannels, width, rate, nframes, _, _ = fi.getparams()
65 onchannels = nchannels
66 raw = np.fromstring(fi.readframes(nframes), dtype=np.int16)
67 channels = raw[::2], raw[1::2]
68
69 def _i16(d):
70 _I16MAX = 2**15 - 1
71 return int(max(-_I16MAX, min(d.real, _I16MAX)))
72
73 pcm = [[], []]
74 for chn in range(len(channels)):
75 pcm[chn] = map(
76 _i16,
77 apply_butter(
78 channels[chn],
79 np.array([_onsc2f(2, "C"), _onsc2f(9, "C")]),
80 rate))
81
82 ofname = os.path.splitext(args.target)[0] + "_" + "butter" + '.wav'
83 with WaveWriter(ofname, onchannels, width, rate) as fo:
84 fo.writechannels(pcm)
一応 bandpass filter てことになるんだけれど、スペクトラムを描画しつつ確認してるわけじゃないので、うまくいってるのかがどうもわからん。それっぽいはそれっぽいんだけれど、どういう範囲で切り取っても結構ホワイトノイズが入ったように聞こえる音声になるんだけれど、そういうもんであろうか? もちっと冷静に考えないと正しさについてよくわかんない。
イコライザを作りたいわけでもないし、アナライザを作りたいんでもない、のではあるけれど、ワタシ的本題のためには結局は近しいことが必要なので、ちゃんと出来たほうがいいのには違いはない。まぁ地道にやるさ。