Windows MSYS シェルスクリプトとして書いた CGI を Python の CGIHTTPServer でテストの続き。
「Python 3 の CGIHTTPRequestHandler がぶっ壊れている。2.7のCGIHTTPServerも少し壊れている。」を経て、なおかつ、unittest からサーバをその場で立ち上げて、テスト終了したらさようなら、にしたくて。
なんか10分で書いたら一発で動いて自分でもビックリしたけど、「壊れ」てる部分を直しつつ、「追加したい」ものを追加しつつ、「MSYS 固有の問題」を直しつつ、「だけれども公式配布物を直接書き換えることなく」、な、まぁひとまずは満足出来るものです:
1 # -*- coding: utf-8 -*-
2 import copy
3 import os
4 import sys
5 import subprocess
6 import select
7
8
9 def executable(path):
10 """Test for executable file."""
11 return os.access(path, os.X_OK)
12
13
14 try:
15 import CGIHTTPServer # Python 2.7
16 import urllib
17
18 # ===============================================================
19 class LocalPatchedCGIHTTPRequestHandler(CGIHTTPServer.CGIHTTPRequestHandler):
20 def is_executable(self, path):
21 """Test whether argument path is an executable file."""
22 return executable(path)
23
24 def run_cgi(self):
25 """Execute a CGI script."""
26 dir, rest = self.cgi_info
27 path = dir + '/' + rest
28 i = path.find('/', len(dir)+1)
29 while i >= 0:
30 nextdir = path[:i]
31 nextrest = path[i+1:]
32
33 scriptdir = self.translate_path(nextdir)
34 if os.path.isdir(scriptdir):
35 dir, rest = nextdir, nextrest
36 i = path.find('/', len(dir)+1)
37 else:
38 break
39
40 # find an explicit query string, if present.
41 i = rest.rfind('?')
42 if i >= 0:
43 rest, query = rest[:i], rest[i+1:]
44 else:
45 query = ''
46
47 # dissect the part after the directory name into a script name &
48 # a possible additional path, to be stored in PATH_INFO.
49 i = rest.find('/')
50 if i >= 0:
51 script, rest = rest[:i], rest[i:]
52 else:
53 script, rest = rest, ''
54
55 scriptname = dir + '/' + script
56 scriptfile = self.translate_path(scriptname)
57 if not os.path.exists(scriptfile):
58 self.send_error(404, "No such CGI script (%r)" % scriptname)
59 return
60 if not os.path.isfile(scriptfile):
61 self.send_error(403, "CGI script is not a plain file (%r)" %
62 scriptname)
63 return
64 ispy = self.is_python(scriptname)
65 if not ispy:
66 if not (self.have_fork or self.have_popen2 or self.have_popen3):
67 self.send_error(403, "CGI script is not a Python script (%r)" %
68 scriptname)
69 return
70 if not self.is_executable(scriptfile):
71 self.send_error(403, "CGI script is not executable (%r)" %
72 scriptname)
73 return
74
75 # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
76 # XXX Much of the following could be prepared ahead of time!
77 env = copy.deepcopy(os.environ)
78 env['SERVER_SOFTWARE'] = self.version_string()
79 env['SERVER_NAME'] = self.server.server_name
80 env['GATEWAY_INTERFACE'] = 'CGI/1.1'
81 env['SERVER_PROTOCOL'] = self.protocol_version
82 env['SERVER_PORT'] = str(self.server.server_port)
83 env['REQUEST_METHOD'] = self.command
84 uqrest = urllib.unquote(rest)
85 env['PATH_INFO'] = uqrest
86 env['PATH_TRANSLATED'] = self.translate_path(uqrest)
87 env['SCRIPT_NAME'] = scriptname
88 if query:
89 env['QUERY_STRING'] = query
90 host = self.address_string()
91 if host != self.client_address[0]:
92 env['REMOTE_HOST'] = host
93 env['REMOTE_ADDR'] = self.client_address[0]
94 authorization = self.headers.getheader("authorization")
95 if authorization:
96 authorization = authorization.split()
97 if len(authorization) == 2:
98 import base64, binascii
99 env['AUTH_TYPE'] = authorization[0]
100 if authorization[0].lower() == "basic":
101 try:
102 authorization = base64.decodestring(authorization[1])
103 except binascii.Error:
104 pass
105 else:
106 authorization = authorization.split(':')
107 if len(authorization) == 2:
108 env['REMOTE_USER'] = authorization[0]
109 # XXX REMOTE_IDENT
110 if self.headers.typeheader is None:
111 env['CONTENT_TYPE'] = self.headers.type
112 else:
113 env['CONTENT_TYPE'] = self.headers.typeheader
114 length = self.headers.getheader('content-length')
115 if length:
116 env['CONTENT_LENGTH'] = length
117 referer = self.headers.getheader('referer')
118 if referer:
119 env['HTTP_REFERER'] = referer
120 accept = []
121 for line in self.headers.getallmatchingheaders('accept'):
122 if line[:1] in "\t\n\r ":
123 accept.append(line.strip())
124 else:
125 accept = accept + line[7:].split(',')
126 env['HTTP_ACCEPT'] = ','.join(accept)
127 # ---------------------------------------- ADD BEGIN
128 accept_encoding = []
129 for line in self.headers.getallmatchingheaders('accept-encoding'):
130 if line[:1] in "\t\n\r ":
131 accept_encoding.append(line.strip())
132 else:
133 accept_encoding = accept_encoding + line[len('accept-encoding') + 1:].split(',')
134 env['HTTP_ACCEPT_ENCODING'] = ','.join(accept_encoding)
135
136 accept_charset = []
137 for line in self.headers.getallmatchingheaders('accept-charset'):
138 if line[:1] in "\t\n\r ":
139 accept_charset.append(line.strip())
140 else:
141 accept_charset = accept_charset + line[len('accept-charset') + 1:].split(',')
142 env['HTTP_ACCEPT_CHARSET'] = ','.join(accept_charset)
143 # ---------------------------------------- ADD END
144 ua = self.headers.getheader('user-agent')
145 if ua:
146 env['HTTP_USER_AGENT'] = ua
147 co = filter(None, self.headers.getheaders('cookie'))
148 if co:
149 env['HTTP_COOKIE'] = ', '.join(co)
150 # XXX Other HTTP_* headers
151 # Since we're setting the env in the parent, provide empty
152 # values to override previously set values
153 for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
154 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
155 env.setdefault(k, "")
156
157 self.send_response(200, "Script output follows")
158
159 decoded_query = query.replace('+', ' ')
160
161 if self.have_fork:
162 # Unix -- fork as we should
163 args = [script]
164 if '=' not in decoded_query:
165 args.append(decoded_query)
166 nobody = nobody_uid()
167 self.wfile.flush() # Always flush before forking
168 pid = os.fork()
169 if pid != 0:
170 # Parent
171 pid, sts = os.waitpid(pid, 0)
172 # throw away additional data [see bug #427345]
173 while select.select([self.rfile], [], [], 0)[0]:
174 if not self.rfile.read(1):
175 break
176 if sts:
177 self.log_error("CGI script exit status %#x", sts)
178 return
179 # Child
180 try:
181 try:
182 os.setuid(nobody)
183 except os.error:
184 pass
185 os.dup2(self.rfile.fileno(), 0)
186 os.dup2(self.wfile.fileno(), 1)
187 os.execve(scriptfile, args, env)
188 except:
189 self.server.handle_error(self.request, self.client_address)
190 os._exit(127)
191
192 else:
193 # Non Unix - use subprocess
194 import subprocess
195 cmdline = ["sh", scriptfile]
196 if self.is_python(scriptfile):
197 interp = sys.executable
198 if interp.lower().endswith("w.exe"):
199 # On Windows, use python.exe, not pythonw.exe
200 interp = interp[:-5] + interp[-4:]
201 cmdline = [interp, '-u'] + cmdline
202 if '=' not in query:
203 cmdline.append(query)
204
205 self.log_message("command: %s", subprocess.list2cmdline(cmdline))
206 try:
207 nbytes = int(length)
208 except (TypeError, ValueError):
209 nbytes = 0
210 p = subprocess.Popen(cmdline,
211 stdin = subprocess.PIPE,
212 stdout = subprocess.PIPE,
213 stderr = subprocess.PIPE,
214 env = env
215 )
216 if self.command.lower() == "post" and nbytes > 0:
217 data = self.rfile.read(nbytes)
218 else:
219 data = None
220 # throw away additional data [see bug #427345]
221 while select.select([self.rfile._sock], [], [], 0)[0]:
222 if not self.rfile._sock.recv(1):
223 break
224 stdout, stderr = p.communicate(data)
225 self.wfile.write(stdout)
226 if stderr:
227 self.log_error('%s', stderr)
228 p.stderr.close()
229 p.stdout.close()
230 status = p.returncode
231 if status:
232 self.log_error("CGI script exit status %#x", status)
233 else:
234 self.log_message("CGI script exited OK")
235
236 def test(**kw):
237 """Test the HTTP request handler class.
238
239 This runs an HTTP server on port 8000 (or the first command line
240 argument).
241
242 """
243 kw["HandlerClass"] = LocalPatchedCGIHTTPRequestHandler
244 CGIHTTPServer.test(**kw)
245
246
247 except ImportError: # Python 3.x
248 import http.server
249 import urllib.parse
250
251 # ===============================================================
252 class LocalPatchedCGIHTTPRequestHandler(http.server.CGIHTTPRequestHandler):
253
254 """Complete HTTP server with GET, HEAD and POST commands.
255
256 GET and HEAD also support running CGI scripts.
257
258 The POST command is *only* implemented for CGI scripts.
259
260 """
261 def is_executable(self, path):
262 """Test whether argument path is an executable file."""
263 return executable(path)
264
265 def run_cgi(self):
266 """Execute a CGI script."""
267 dir, rest = self.cgi_info
268 path = dir + '/' + rest
269 i = path.find('/', len(dir)+1)
270 while i >= 0:
271 nextdir = path[:i]
272 nextrest = path[i+1:]
273
274 scriptdir = self.translate_path(nextdir)
275 if os.path.isdir(scriptdir):
276 dir, rest = nextdir, nextrest
277 i = path.find('/', len(dir)+1)
278 else:
279 break
280
281 # find an explicit query string, if present.
282 i = rest.rfind('?')
283 if i >= 0:
284 rest, query = rest[:i], rest[i+1:]
285 else:
286 query = ''
287
288 # dissect the part after the directory name into a script name &
289 # a possible additional path, to be stored in PATH_INFO.
290 i = rest.find('/')
291 if i >= 0:
292 script, rest = rest[:i], rest[i:]
293 else:
294 script, rest = rest, ''
295
296 scriptname = dir + '/' + script
297 scriptfile = self.translate_path(scriptname)
298 if not os.path.exists(scriptfile):
299 self.send_error(404, "No such CGI script (%r)" % scriptname)
300 return
301 if not os.path.isfile(scriptfile):
302 self.send_error(403, "CGI script is not a plain file (%r)" %
303 scriptname)
304 return
305 ispy = self.is_python(scriptname)
306 if self.have_fork or not ispy:
307 if not self.is_executable(scriptfile):
308 self.send_error(403, "CGI script is not executable (%r)" %
309 scriptname)
310 return
311
312 # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
313 # XXX Much of the following could be prepared ahead of time!
314 env = copy.deepcopy(os.environ)
315 env['SERVER_SOFTWARE'] = self.version_string()
316 env['SERVER_NAME'] = self.server.server_name
317 env['GATEWAY_INTERFACE'] = 'CGI/1.1'
318 env['SERVER_PROTOCOL'] = self.protocol_version
319 env['SERVER_PORT'] = str(self.server.server_port)
320 env['REQUEST_METHOD'] = self.command
321 uqrest = urllib.parse.unquote(rest)
322 env['PATH_INFO'] = uqrest
323 env['PATH_TRANSLATED'] = self.translate_path(uqrest)
324 env['SCRIPT_NAME'] = scriptname
325 if query:
326 env['QUERY_STRING'] = query
327 env['REMOTE_ADDR'] = self.client_address[0]
328 authorization = self.headers.get("authorization")
329 if authorization:
330 authorization = authorization.split()
331 if len(authorization) == 2:
332 import base64, binascii
333 env['AUTH_TYPE'] = authorization[0]
334 if authorization[0].lower() == "basic":
335 try:
336 authorization = authorization[1].encode('ascii')
337 authorization = base64.decodebytes(authorization).\
338 decode('ascii')
339 except (binascii.Error, UnicodeError):
340 pass
341 else:
342 authorization = authorization.split(':')
343 if len(authorization) == 2:
344 env['REMOTE_USER'] = authorization[0]
345 # XXX REMOTE_IDENT
346 if self.headers.get('content-type') is None:
347 env['CONTENT_TYPE'] = self.headers.get_content_type()
348 else:
349 env['CONTENT_TYPE'] = self.headers['content-type']
350 length = self.headers.get('content-length')
351 if length:
352 env['CONTENT_LENGTH'] = length
353 referer = self.headers.get('referer')
354 if referer:
355 env['HTTP_REFERER'] = referer
356 accept = self.headers.get_all('accept', ())
357 env['HTTP_ACCEPT'] = ','.join(accept)
358 # ---------------------------------------- ADD BEGIN
359 accept_encoding = self.headers.get_all('accept-encoding', ())
360 env['HTTP_ACCEPT_ENCODING'] = ','.join(accept_encoding)
361
362 accept_charset = self.headers.get_all('accept-charset', ())
363 env['HTTP_ACCEPT_CHARSET'] = ','.join(accept_charset)
364 # ---------------------------------------- ADD END
365 ua = self.headers.get('user-agent')
366 if ua:
367 env['HTTP_USER_AGENT'] = ua
368 co = filter(None, self.headers.get_all('cookie', []))
369 cookie_str = ', '.join(co)
370 if cookie_str:
371 env['HTTP_COOKIE'] = cookie_str
372 # XXX Other HTTP_* headers
373 # Since we're setting the env in the parent, provide empty
374 # values to override previously set values
375 for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
376 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
377 env.setdefault(k, "")
378
379 self.send_response(200, "Script output follows")
380 self.flush_headers()
381
382 decoded_query = query.replace('+', ' ')
383
384 if self.have_fork:
385 # Unix -- fork as we should
386 args = [script]
387 if '=' not in decoded_query:
388 args.append(decoded_query)
389 nobody = nobody_uid()
390 self.wfile.flush() # Always flush before forking
391 pid = os.fork()
392 if pid != 0:
393 # Parent
394 pid, sts = os.waitpid(pid, 0)
395 # throw away additional data [see bug #427345]
396 while select.select([self.rfile], [], [], 0)[0]:
397 if not self.rfile.read(1):
398 break
399 if sts:
400 self.log_error("CGI script exit status %#x", sts)
401 return
402 # Child
403 try:
404 try:
405 os.setuid(nobody)
406 except OSError:
407 pass
408 os.dup2(self.rfile.fileno(), 0)
409 os.dup2(self.wfile.fileno(), 1)
410 os.execve(scriptfile, args, env)
411 except:
412 self.server.handle_error(self.request, self.client_address)
413 os._exit(127)
414
415 else:
416 # Non-Unix -- use subprocess
417 import subprocess
418 cmdline = ['sh', scriptfile]
419 if self.is_python(scriptfile):
420 interp = sys.executable
421 if interp.lower().endswith("w.exe"):
422 # On Windows, use python.exe, not pythonw.exe
423 interp = interp[:-5] + interp[-4:]
424 cmdline = [interp, '-u'] + cmdline
425 if '=' not in query:
426 cmdline.append(query)
427 self.log_message("command: %s", subprocess.list2cmdline(cmdline))
428 try:
429 nbytes = int(length)
430 except (TypeError, ValueError):
431 nbytes = 0
432 p = subprocess.Popen(cmdline,
433 stdin=subprocess.PIPE,
434 stdout=subprocess.PIPE,
435 stderr=subprocess.PIPE,
436 env = env
437 )
438 if self.command.lower() == "post" and nbytes > 0:
439 data = self.rfile.read(nbytes)
440 else:
441 data = None
442 # throw away additional data [see bug #427345]
443 while select.select([self.rfile._sock], [], [], 0)[0]:
444 if not self.rfile._sock.recv(1):
445 break
446 stdout, stderr = p.communicate(data)
447 self.wfile.write(stdout)
448 if stderr:
449 self.log_error('%s', stderr)
450 p.stderr.close()
451 p.stdout.close()
452 status = p.returncode
453 if status:
454 self.log_error("CGI script exit status %#x", status)
455 else:
456 self.log_message("CGI script exited OK")
457
458 def test(**kw):
459 """Test the HTTP request handler class.
460
461 This runs an HTTP server on port 8000 (or the first command line
462 argument).
463
464 """
465 kw["HandlerClass"] = LocalPatchedCGIHTTPRequestHandler
466 http.server.test(**kw)
元が「格調高」くないのに呼応して、真面目に整理しようとしてないんで、「スパゲッティのまま」複製しとります。
こういうことをしたかったの:
1 # -*- coding: utf-8 -*-
2 import unittest
3 try:
4 import urllib2
5 except ImportError: # Python 3.x
6 import urllib.request as urllib2
7 import json
8 import sys
9
10 SERVICE_URL = "http://localhost:8000/cgi-bin/mycgi.cgi";
11 class TestAsCGI(unittest.TestCase):
12
13 def setUp(self):
14 pass
15
16 def _call(self, postdata, headers):
17 req = urllib2.Request(SERVICE_URL, data=postdata)
18 for k in headers:
19 req.add_header(k, headers[k])
20 #print(req.headers)
21 handler = urllib2.HTTPHandler(debuglevel=0)
22 opener = urllib2.build_opener(handler)
23
24 f = opener.open(req)
25 #print(f.headers)
26 return f
27
28 # --------------------------------------------------------
29 def test_01(self):
30 f = self._call({"a": "b"}, {
31 "Accept-Charset": "MS_Kanji",
32 "Accept-Encoding": "gzip;q=1,deflate;q=0.9",
33 "Accept": "text/html"})
34
35
36
37 def _testserver():
38 # testcgihttpserver はカレントフォルダの「下」に cgi-bin がいる
39 # ことをあてにしていて、mycgi.cgi、mycgitest.py ともに cgi-bin
40 # フォルダ内にいる、とすると、サーバは一つ上から起動する。
41 import os
42 os.chdir("..")
43 import testcgihttpserver
44 testcgihttpserver.test()
45
46
47 if __name__ == '__main__':
48 from multiprocessing import Process
49
50 p = Process(target=_testserver) # 裏でサーバを起こす
51 p.start()
52 try:
53 unittest.main()
54 finally:
55 p.terminate()