Windows MSYS シェルスクリプトとして書いた CGI を Python の CGIHTTPServer でテスト(改)

Windows MSYS シェルスクリプトとして書いた CGI を Python の CGIHTTPServer でテストの続き。

Python 3 の CGIHTTPRequestHandler がぶっ壊れている。2.7のCGIHTTPServerも少し壊れている。」を経て、なおかつ、unittest からサーバをその場で立ち上げて、テスト終了したらさようなら、にしたくて。

なんか10分で書いたら一発で動いて自分でもビックリしたけど、「壊れ」てる部分を直しつつ、「追加したい」ものを追加しつつ、「MSYS 固有の問題」を直しつつ、「だけれども公式配布物を直接書き換えることなく」、な、まぁひとまずは満足出来るものです:

testcgihttpserver.py
  1 # -*- coding: utf-8 -*-
  2 import copy
  3 import os
  4 import sys
  5 import subprocess
  6 import select
  7 
  8 
  9 def executable(path):
 10     """Test for executable file."""
 11     return os.access(path, os.X_OK)
 12 
 13 
 14 try:
 15     import CGIHTTPServer  # Python 2.7
 16     import urllib
 17 
 18     # ===============================================================
 19     class LocalPatchedCGIHTTPRequestHandler(CGIHTTPServer.CGIHTTPRequestHandler):
 20         def is_executable(self, path):
 21             """Test whether argument path is an executable file."""
 22             return executable(path)
 23     
 24         def run_cgi(self):
 25             """Execute a CGI script."""
 26             dir, rest = self.cgi_info
 27             path = dir + '/' + rest
 28             i = path.find('/', len(dir)+1)
 29             while i >= 0:
 30                 nextdir = path[:i]
 31                 nextrest = path[i+1:]
 32     
 33                 scriptdir = self.translate_path(nextdir)
 34                 if os.path.isdir(scriptdir):
 35                     dir, rest = nextdir, nextrest
 36                     i = path.find('/', len(dir)+1)
 37                 else:
 38                     break
 39     
 40             # find an explicit query string, if present.
 41             i = rest.rfind('?')
 42             if i >= 0:
 43                 rest, query = rest[:i], rest[i+1:]
 44             else:
 45                 query = ''
 46     
 47             # dissect the part after the directory name into a script name &
 48             # a possible additional path, to be stored in PATH_INFO.
 49             i = rest.find('/')
 50             if i >= 0:
 51                 script, rest = rest[:i], rest[i:]
 52             else:
 53                 script, rest = rest, ''
 54     
 55             scriptname = dir + '/' + script
 56             scriptfile = self.translate_path(scriptname)
 57             if not os.path.exists(scriptfile):
 58                 self.send_error(404, "No such CGI script (%r)" % scriptname)
 59                 return
 60             if not os.path.isfile(scriptfile):
 61                 self.send_error(403, "CGI script is not a plain file (%r)" %
 62                                 scriptname)
 63                 return
 64             ispy = self.is_python(scriptname)
 65             if not ispy:
 66                 if not (self.have_fork or self.have_popen2 or self.have_popen3):
 67                     self.send_error(403, "CGI script is not a Python script (%r)" %
 68                                     scriptname)
 69                     return
 70                 if not self.is_executable(scriptfile):
 71                     self.send_error(403, "CGI script is not executable (%r)" %
 72                                     scriptname)
 73                     return
 74     
 75             # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
 76             # XXX Much of the following could be prepared ahead of time!
 77             env = copy.deepcopy(os.environ)
 78             env['SERVER_SOFTWARE'] = self.version_string()
 79             env['SERVER_NAME'] = self.server.server_name
 80             env['GATEWAY_INTERFACE'] = 'CGI/1.1'
 81             env['SERVER_PROTOCOL'] = self.protocol_version
 82             env['SERVER_PORT'] = str(self.server.server_port)
 83             env['REQUEST_METHOD'] = self.command
 84             uqrest = urllib.unquote(rest)
 85             env['PATH_INFO'] = uqrest
 86             env['PATH_TRANSLATED'] = self.translate_path(uqrest)
 87             env['SCRIPT_NAME'] = scriptname
 88             if query:
 89                 env['QUERY_STRING'] = query
 90             host = self.address_string()
 91             if host != self.client_address[0]:
 92                 env['REMOTE_HOST'] = host
 93             env['REMOTE_ADDR'] = self.client_address[0]
 94             authorization = self.headers.getheader("authorization")
 95             if authorization:
 96                 authorization = authorization.split()
 97                 if len(authorization) == 2:
 98                     import base64, binascii
 99                     env['AUTH_TYPE'] = authorization[0]
100                     if authorization[0].lower() == "basic":
101                         try:
102                             authorization = base64.decodestring(authorization[1])
103                         except binascii.Error:
104                             pass
105                         else:
106                             authorization = authorization.split(':')
107                             if len(authorization) == 2:
108                                 env['REMOTE_USER'] = authorization[0]
109             # XXX REMOTE_IDENT
110             if self.headers.typeheader is None:
111                 env['CONTENT_TYPE'] = self.headers.type
112             else:
113                 env['CONTENT_TYPE'] = self.headers.typeheader
114             length = self.headers.getheader('content-length')
115             if length:
116                 env['CONTENT_LENGTH'] = length
117             referer = self.headers.getheader('referer')
118             if referer:
119                 env['HTTP_REFERER'] = referer
120             accept = []
121             for line in self.headers.getallmatchingheaders('accept'):
122                 if line[:1] in "\t\n\r ":
123                     accept.append(line.strip())
124                 else:
125                     accept = accept + line[7:].split(',')
126             env['HTTP_ACCEPT'] = ','.join(accept)
127             # ---------------------------------------- ADD BEGIN
128             accept_encoding = []
129             for line in self.headers.getallmatchingheaders('accept-encoding'):
130                 if line[:1] in "\t\n\r ":
131                     accept_encoding.append(line.strip())
132                 else:
133                     accept_encoding = accept_encoding + line[len('accept-encoding') + 1:].split(',')
134             env['HTTP_ACCEPT_ENCODING'] = ','.join(accept_encoding)
135     
136             accept_charset = []
137             for line in self.headers.getallmatchingheaders('accept-charset'):
138                 if line[:1] in "\t\n\r ":
139                     accept_charset.append(line.strip())
140                 else:
141                     accept_charset = accept_charset + line[len('accept-charset') + 1:].split(',')
142             env['HTTP_ACCEPT_CHARSET'] = ','.join(accept_charset)
143             # ---------------------------------------- ADD END
144             ua = self.headers.getheader('user-agent')
145             if ua:
146                 env['HTTP_USER_AGENT'] = ua
147             co = filter(None, self.headers.getheaders('cookie'))
148             if co:
149                 env['HTTP_COOKIE'] = ', '.join(co)
150             # XXX Other HTTP_* headers
151             # Since we're setting the env in the parent, provide empty
152             # values to override previously set values
153             for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
154                       'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
155                 env.setdefault(k, "")
156     
157             self.send_response(200, "Script output follows")
158     
159             decoded_query = query.replace('+', ' ')
160     
161             if self.have_fork:
162                 # Unix -- fork as we should
163                 args = [script]
164                 if '=' not in decoded_query:
165                     args.append(decoded_query)
166                 nobody = nobody_uid()
167                 self.wfile.flush() # Always flush before forking
168                 pid = os.fork()
169                 if pid != 0:
170                     # Parent
171                     pid, sts = os.waitpid(pid, 0)
172                     # throw away additional data [see bug #427345]
173                     while select.select([self.rfile], [], [], 0)[0]:
174                         if not self.rfile.read(1):
175                             break
176                     if sts:
177                         self.log_error("CGI script exit status %#x", sts)
178                     return
179                 # Child
180                 try:
181                     try:
182                         os.setuid(nobody)
183                     except os.error:
184                         pass
185                     os.dup2(self.rfile.fileno(), 0)
186                     os.dup2(self.wfile.fileno(), 1)
187                     os.execve(scriptfile, args, env)
188                 except:
189                     self.server.handle_error(self.request, self.client_address)
190                     os._exit(127)
191     
192             else:
193                 # Non Unix - use subprocess
194                 import subprocess
195                 cmdline = ["sh", scriptfile]
196                 if self.is_python(scriptfile):
197                     interp = sys.executable
198                     if interp.lower().endswith("w.exe"):
199                         # On Windows, use python.exe, not pythonw.exe
200                         interp = interp[:-5] + interp[-4:]
201                     cmdline = [interp, '-u'] + cmdline
202                 if '=' not in query:
203                     cmdline.append(query)
204     
205                 self.log_message("command: %s", subprocess.list2cmdline(cmdline))
206                 try:
207                     nbytes = int(length)
208                 except (TypeError, ValueError):
209                     nbytes = 0
210                 p = subprocess.Popen(cmdline,
211                                      stdin = subprocess.PIPE,
212                                      stdout = subprocess.PIPE,
213                                      stderr = subprocess.PIPE,
214                                      env = env
215                                     )
216                 if self.command.lower() == "post" and nbytes > 0:
217                     data = self.rfile.read(nbytes)
218                 else:
219                     data = None
220                 # throw away additional data [see bug #427345]
221                 while select.select([self.rfile._sock], [], [], 0)[0]:
222                     if not self.rfile._sock.recv(1):
223                         break
224                 stdout, stderr = p.communicate(data)
225                 self.wfile.write(stdout)
226                 if stderr:
227                     self.log_error('%s', stderr)
228                 p.stderr.close()
229                 p.stdout.close()
230                 status = p.returncode
231                 if status:
232                     self.log_error("CGI script exit status %#x", status)
233                 else:
234                     self.log_message("CGI script exited OK")
235     
236     def test(**kw):
237         """Test the HTTP request handler class.
238     
239         This runs an HTTP server on port 8000 (or the first command line
240         argument).
241     
242         """
243         kw["HandlerClass"] = LocalPatchedCGIHTTPRequestHandler
244         CGIHTTPServer.test(**kw)
245 
246 
247 except ImportError:  # Python 3.x
248     import http.server
249     import urllib.parse
250 
251     # ===============================================================
252     class LocalPatchedCGIHTTPRequestHandler(http.server.CGIHTTPRequestHandler):
253     
254         """Complete HTTP server with GET, HEAD and POST commands.
255     
256         GET and HEAD also support running CGI scripts.
257     
258         The POST command is *only* implemented for CGI scripts.
259     
260         """
261         def is_executable(self, path):
262             """Test whether argument path is an executable file."""
263             return executable(path)
264     
265         def run_cgi(self):
266             """Execute a CGI script."""
267             dir, rest = self.cgi_info
268             path = dir + '/' + rest
269             i = path.find('/', len(dir)+1)
270             while i >= 0:
271                 nextdir = path[:i]
272                 nextrest = path[i+1:]
273     
274                 scriptdir = self.translate_path(nextdir)
275                 if os.path.isdir(scriptdir):
276                     dir, rest = nextdir, nextrest
277                     i = path.find('/', len(dir)+1)
278                 else:
279                     break
280     
281             # find an explicit query string, if present.
282             i = rest.rfind('?')
283             if i >= 0:
284                 rest, query = rest[:i], rest[i+1:]
285             else:
286                 query = ''
287     
288             # dissect the part after the directory name into a script name &
289             # a possible additional path, to be stored in PATH_INFO.
290             i = rest.find('/')
291             if i >= 0:
292                 script, rest = rest[:i], rest[i:]
293             else:
294                 script, rest = rest, ''
295     
296             scriptname = dir + '/' + script
297             scriptfile = self.translate_path(scriptname)
298             if not os.path.exists(scriptfile):
299                 self.send_error(404, "No such CGI script (%r)" % scriptname)
300                 return
301             if not os.path.isfile(scriptfile):
302                 self.send_error(403, "CGI script is not a plain file (%r)" %
303                                 scriptname)
304                 return
305             ispy = self.is_python(scriptname)
306             if self.have_fork or not ispy:
307                 if not self.is_executable(scriptfile):
308                     self.send_error(403, "CGI script is not executable (%r)" %
309                                     scriptname)
310                     return
311     
312             # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
313             # XXX Much of the following could be prepared ahead of time!
314             env = copy.deepcopy(os.environ)
315             env['SERVER_SOFTWARE'] = self.version_string()
316             env['SERVER_NAME'] = self.server.server_name
317             env['GATEWAY_INTERFACE'] = 'CGI/1.1'
318             env['SERVER_PROTOCOL'] = self.protocol_version
319             env['SERVER_PORT'] = str(self.server.server_port)
320             env['REQUEST_METHOD'] = self.command
321             uqrest = urllib.parse.unquote(rest)
322             env['PATH_INFO'] = uqrest
323             env['PATH_TRANSLATED'] = self.translate_path(uqrest)
324             env['SCRIPT_NAME'] = scriptname
325             if query:
326                 env['QUERY_STRING'] = query
327             env['REMOTE_ADDR'] = self.client_address[0]
328             authorization = self.headers.get("authorization")
329             if authorization:
330                 authorization = authorization.split()
331                 if len(authorization) == 2:
332                     import base64, binascii
333                     env['AUTH_TYPE'] = authorization[0]
334                     if authorization[0].lower() == "basic":
335                         try:
336                             authorization = authorization[1].encode('ascii')
337                             authorization = base64.decodebytes(authorization).\
338                                             decode('ascii')
339                         except (binascii.Error, UnicodeError):
340                             pass
341                         else:
342                             authorization = authorization.split(':')
343                             if len(authorization) == 2:
344                                 env['REMOTE_USER'] = authorization[0]
345             # XXX REMOTE_IDENT
346             if self.headers.get('content-type') is None:
347                 env['CONTENT_TYPE'] = self.headers.get_content_type()
348             else:
349                 env['CONTENT_TYPE'] = self.headers['content-type']
350             length = self.headers.get('content-length')
351             if length:
352                 env['CONTENT_LENGTH'] = length
353             referer = self.headers.get('referer')
354             if referer:
355                 env['HTTP_REFERER'] = referer
356             accept = self.headers.get_all('accept', ())
357             env['HTTP_ACCEPT'] = ','.join(accept)
358             # ---------------------------------------- ADD BEGIN
359             accept_encoding = self.headers.get_all('accept-encoding', ())
360             env['HTTP_ACCEPT_ENCODING'] = ','.join(accept_encoding)
361     
362             accept_charset = self.headers.get_all('accept-charset', ())
363             env['HTTP_ACCEPT_CHARSET'] = ','.join(accept_charset)
364             # ---------------------------------------- ADD END
365             ua = self.headers.get('user-agent')
366             if ua:
367                 env['HTTP_USER_AGENT'] = ua
368             co = filter(None, self.headers.get_all('cookie', []))
369             cookie_str = ', '.join(co)
370             if cookie_str:
371                 env['HTTP_COOKIE'] = cookie_str
372             # XXX Other HTTP_* headers
373             # Since we're setting the env in the parent, provide empty
374             # values to override previously set values
375             for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
376                       'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
377                 env.setdefault(k, "")
378     
379             self.send_response(200, "Script output follows")
380             self.flush_headers()
381     
382             decoded_query = query.replace('+', ' ')
383     
384             if self.have_fork:
385                 # Unix -- fork as we should
386                 args = [script]
387                 if '=' not in decoded_query:
388                     args.append(decoded_query)
389                 nobody = nobody_uid()
390                 self.wfile.flush() # Always flush before forking
391                 pid = os.fork()
392                 if pid != 0:
393                     # Parent
394                     pid, sts = os.waitpid(pid, 0)
395                     # throw away additional data [see bug #427345]
396                     while select.select([self.rfile], [], [], 0)[0]:
397                         if not self.rfile.read(1):
398                             break
399                     if sts:
400                         self.log_error("CGI script exit status %#x", sts)
401                     return
402                 # Child
403                 try:
404                     try:
405                         os.setuid(nobody)
406                     except OSError:
407                         pass
408                     os.dup2(self.rfile.fileno(), 0)
409                     os.dup2(self.wfile.fileno(), 1)
410                     os.execve(scriptfile, args, env)
411                 except:
412                     self.server.handle_error(self.request, self.client_address)
413                     os._exit(127)
414     
415             else:
416                 # Non-Unix -- use subprocess
417                 import subprocess
418                 cmdline = ['sh', scriptfile]
419                 if self.is_python(scriptfile):
420                     interp = sys.executable
421                     if interp.lower().endswith("w.exe"):
422                         # On Windows, use python.exe, not pythonw.exe
423                         interp = interp[:-5] + interp[-4:]
424                     cmdline = [interp, '-u'] + cmdline
425                 if '=' not in query:
426                     cmdline.append(query)
427                 self.log_message("command: %s", subprocess.list2cmdline(cmdline))
428                 try:
429                     nbytes = int(length)
430                 except (TypeError, ValueError):
431                     nbytes = 0
432                 p = subprocess.Popen(cmdline,
433                                      stdin=subprocess.PIPE,
434                                      stdout=subprocess.PIPE,
435                                      stderr=subprocess.PIPE,
436                                      env = env
437                                      )
438                 if self.command.lower() == "post" and nbytes > 0:
439                     data = self.rfile.read(nbytes)
440                 else:
441                     data = None
442                 # throw away additional data [see bug #427345]
443                 while select.select([self.rfile._sock], [], [], 0)[0]:
444                     if not self.rfile._sock.recv(1):
445                         break
446                 stdout, stderr = p.communicate(data)
447                 self.wfile.write(stdout)
448                 if stderr:
449                     self.log_error('%s', stderr)
450                 p.stderr.close()
451                 p.stdout.close()
452                 status = p.returncode
453                 if status:
454                     self.log_error("CGI script exit status %#x", status)
455                 else:
456                     self.log_message("CGI script exited OK")
457 
458     def test(**kw):
459         """Test the HTTP request handler class.
460     
461         This runs an HTTP server on port 8000 (or the first command line
462         argument).
463     
464         """
465         kw["HandlerClass"] = LocalPatchedCGIHTTPRequestHandler
466         http.server.test(**kw)

元が「格調高」くないのに呼応して、真面目に整理しようとしてないんで、「スパゲッティのまま」複製しとります。

こういうことをしたかったの:

mycgitest.py
 1 # -*- coding: utf-8 -*-
 2 import unittest
 3 try:
 4     import urllib2
 5 except ImportError:  # Python 3.x
 6     import urllib.request as urllib2
 7 import json
 8 import sys
 9 
10 SERVICE_URL = "http://localhost:8000/cgi-bin/mycgi.cgi";
11 class TestAsCGI(unittest.TestCase):
12 
13     def setUp(self):
14         pass
15 
16     def _call(self, postdata, headers):
17         req = urllib2.Request(SERVICE_URL, data=postdata)
18         for k in headers:
19             req.add_header(k, headers[k])
20         #print(req.headers)
21         handler = urllib2.HTTPHandler(debuglevel=0)
22         opener = urllib2.build_opener(handler)
23 
24         f = opener.open(req)
25         #print(f.headers)
26         return f
27 
28     # --------------------------------------------------------
29     def test_01(self):
30         f = self._call({"a": "b"}, {
31                 "Accept-Charset": "MS_Kanji",
32                 "Accept-Encoding": "gzip;q=1,deflate;q=0.9",
33                 "Accept": "text/html"})
34 
35 
36 
37 def _testserver():
38     # testcgihttpserver はカレントフォルダの「下」に cgi-bin がいる
39     # ことをあてにしていて、mycgi.cgi、mycgitest.py ともに cgi-bin
40     # フォルダ内にいる、とすると、サーバは一つ上から起動する。
41     import os
42     os.chdir("..")
43     import testcgihttpserver
44     testcgihttpserver.test()
45 
46 
47 if __name__ == '__main__':
48     from multiprocessing import Process
49 
50     p = Process(target=_testserver)  # 裏でサーバを起こす
51     p.start()
52     try:
53         unittest.main()
54     finally:
55         p.terminate()