testlib.py 68.8 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
"""Run tests.
root's avatar
root committed
3
4

This will find all modules whose name match a given prefix in the test
5
directory, and run them. Various command line options provide
root's avatar
root committed
6
7
8
9
additional facilities.

Command line options:

10
 -v: verbose -- run tests in verbose mode with output to stdout
11
 -q: quiet   -- don't print anything except if a test fails
12
13
14
15
16
 -t: testdir -- directory where the tests will be found
 -x: exclude -- add a test to exclude
 -p: profile -- profiled execution
 -c: capture -- capture standard out/err during tests
 -d: dbc     -- enable design-by-contract
17
 -m: match   -- only run test matching the tag pattern which follow
root's avatar
root committed
18
19
20
21

If no non-option arguments are present, prefixes used are 'test',
'regrtest', 'smoketest' and 'unittest'.

22
23
24
:copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: General Public License version 2 - http://www.gnu.org/licenses
root's avatar
root committed
25
"""
26
27
28
29
30
__docformat__ = "restructuredtext en"
# modified copy of some functions from test/regrtest.py from PyXml
# disable camel case warning
# pylint: disable-msg=C0103

root's avatar
root committed
31
import sys
32
import os, os.path as osp
adim's avatar
adim committed
33
import re
34
import time
root's avatar
root committed
35
36
import getopt
import traceback
37
import inspect
root's avatar
root committed
38
39
import unittest
import difflib
40
import types
41
import tempfile
42
import math
43
from shutil import rmtree
44
from operator import itemgetter
root's avatar
root committed
45
from warnings import warn
46
from compiler.consts import CO_GENERATOR
Pierre-Yves David's avatar
Pierre-Yves David committed
47
from ConfigParser import ConfigParser
root's avatar
root committed
48
49
50
51
52
53
54
55
56
57

try:
    from test import test_support
except ImportError:
    # not always available
    class TestSupport:
        def unload(self, test):
            pass
    test_support = TestSupport()

58
59
from logilab.common.deprecation import class_renamed, deprecated_function, \
     obsolete
Pierre-Yves David's avatar
Pierre-Yves David committed
60
# pylint: disable-msg=W0622
61
from logilab.common.compat import set, enumerate, any, sorted
Pierre-Yves David's avatar
Pierre-Yves David committed
62
# pylint: enable-msg=W0622
root's avatar
root committed
63
from logilab.common.modutils import load_module_from_name
64
from logilab.common.debugger import Debugger, colorize_source
65
from logilab.common.decorators import cached
66
from logilab.common import textutils
67
        
root's avatar
root committed
68
69
70
71
72
73

__all__ = ['main', 'unittest_main', 'find_tests', 'run_test', 'spawn']

DEFAULT_PREFIXES = ('test', 'regrtest', 'smoketest', 'unittest',
                    'func', 'validation')

74
75
ENABLE_DBC = False

76
FILE_RESTART = ".pytest.restart"
77

78
79
80
# used by unittest to count the number of relevant levels in the traceback
__unittest = 1

81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

def with_tempdir(callable):
    """A decorator ensuring no temporary file left when the function return
    Work only for temporary file create with the tempfile module"""
    def proxy(*args, **kargs):
        
        old_tmpdir = tempfile.gettempdir()
        new_tmpdir = tempfile.mkdtemp("-logilab-common-testlib","temp-dir-")
        tempfile.tempdir = new_tmpdir
        try:
            return callable(*args, **kargs)
        finally:
            try:
                rmtree(new_tmpdir, ignore_errors=True)
            finally:
                tempfile.tempdir = old_tmpdir
    return proxy

99

100
def main(testdir=None, exitafter=True):
root's avatar
root committed
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    """Execute a test suite.

    This also parses command-line options and modifies its behaviour
    accordingly.

    tests -- a list of strings containing test names (optional)
    testdir -- the directory in which to look for tests (optional)

    Users other than the Python test suite will certainly want to
    specify testdir; if it's omitted, the directory containing the
    Python test suite is searched for.

    If the tests argument is omitted, the tests listed on the
    command-line will be used.  If that's empty, too, then all *.py
    files beginning with test_ will be used.

    """

    try:
120
        opts, args = getopt.getopt(sys.argv[1:], 'hvqxr:t:pcd', ['help'])
root's avatar
root committed
121
122
123
124
125
    except getopt.error, msg:
        print msg
        print __doc__
        return 2
    verbose = 0
Nicolas Chauvat's avatar
Nicolas Chauvat committed
126
127
    quiet = False
    profile = False
root's avatar
root committed
128
    exclude = []
129
    capture = 0
root's avatar
root committed
130
131
    for o, a in opts:
        if o == '-v':
Nicolas Chauvat's avatar
Nicolas Chauvat committed
132
            verbose += 1
root's avatar
root committed
133
        elif o == '-q':
Nicolas Chauvat's avatar
Nicolas Chauvat committed
134
            quiet = True
root's avatar
root committed
135
136
137
138
139
140
            verbose = 0
        elif o == '-x':
            exclude.append(a)
        elif o == '-t':
            testdir = a
        elif o == '-p':
Nicolas Chauvat's avatar
Nicolas Chauvat committed
141
            profile = True
142
        elif o == '-c':
143
            capture += 1
144
145
146
        elif o == '-d':
            global ENABLE_DBC
            ENABLE_DBC = True
"Sylvain ext:(%22)'s avatar
"Sylvain ext:(%22) committed
147
        elif o in ('-h', '--help'):
root's avatar
root committed
148
149
            print __doc__
            sys.exit(0)
Nicolas Chauvat's avatar
Nicolas Chauvat committed
150
151
152
153

    args = [item.rstrip('.py') for item in args]
    exclude = [item.rstrip('.py') for item in exclude]

154
155
156
157
    if testdir is not None:
        os.chdir(testdir)
    sys.path.insert(0, '')
    tests = find_tests('.', args or DEFAULT_PREFIXES, excludes=exclude)
root's avatar
root committed
158
159
160
161
162
163
    # Tell tests to be moderately quiet
    test_support.verbose = verbose
    if profile:
        print >> sys.stderr, '** profiled run'
        from hotshot import Profile
        prof = Profile('stones.prof')
164
        start_time, start_ctime = time.time(), time.clock()
root's avatar
root committed
165
        good, bad, skipped, all_result = prof.runcall(run_tests, tests, quiet,
166
                                                      verbose, None, capture)
167
        end_time, end_ctime = time.time(), time.clock()
root's avatar
root committed
168
169
        prof.close()
    else:
170
        start_time, start_ctime = time.time(), time.clock()
Pierre-Yves David's avatar
Pierre-Yves David committed
171
172
        good, bad, skipped, all_result = run_tests(tests, quiet, verbose, None,
            capture)
173
        end_time, end_ctime = time.time(), time.clock()
root's avatar
root committed
174
175
176
    if not quiet:
        print '*'*80
        if all_result:
Pierre-Yves David's avatar
Pierre-Yves David committed
177
178
179
            print 'Ran %s test cases in %0.2fs (%0.2fs CPU)' % (
                all_result.testsRun, end_time - start_time,
                end_ctime - start_ctime), 
root's avatar
root committed
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
            if all_result.errors:
                print ', %s errors' % len(all_result.errors),
            if all_result.failures:
                print ', %s failed' % len(all_result.failures),
            if all_result.skipped:
                print ', %s skipped' % len(all_result.skipped),
            print
        if good:
            if not bad and not skipped and len(good) > 1:
                print "All",
            print _count(len(good), "test"), "OK."
        if bad:
            print _count(len(bad), "test"), "failed:",
            print ', '.join(bad)
        if skipped:
            print _count(len(skipped), "test"), "skipped:",
            print ', '.join(['%s (%s)' % (test, msg) for test, msg in skipped])
    if profile:
        from hotshot import stats
        stats = stats.load('stones.prof')
        stats.sort_stats('time', 'calls')
        stats.print_stats(30)
202
203
204
205
    if exitafter:
        sys.exit(len(bad) + len(skipped))
    else:
        sys.path.pop(0)
adim's avatar
adim committed
206
        return len(bad)
207
208
main = obsolete("testlib.main() is obsolete, use the pytest tool instead")(main)

root's avatar
root committed
209

210
def run_tests(tests, quiet, verbose, runner=None, capture=0):
211
212
213
214
    """Execute a list of tests.

    :rtype: tuple
    :return: tuple (list of passed tests, list of failed tests, list of skipped tests)
root's avatar
root committed
215
216
217
218
219
220
221
222
223
224
    """
    good = []
    bad = []
    skipped = []
    all_result = None
    for test in tests:
        if not quiet:
            print 
            print '-'*80
            print "Executing", test
225
        result = run_test(test, verbose, runner, capture)
root's avatar
root committed
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
        if type(result) is type(''):
            # an unexpected error occured
            skipped.append( (test, result))
        else:
            if all_result is None:
                all_result = result
            else:
                all_result.testsRun += result.testsRun
                all_result.failures += result.failures
                all_result.errors += result.errors
                all_result.skipped += result.skipped
            if result.errors or result.failures:
                bad.append(test)
                if verbose:
                    print "test", test, \
                          "failed -- %s errors, %s failures" % (
                        len(result.errors), len(result.failures))
            else:
                good.append(test)
            
    return good, bad, skipped, all_result
    
def find_tests(testdir,
               prefixes=DEFAULT_PREFIXES, suffix=".py",
               excludes=(),
Nicolas Chauvat's avatar
Nicolas Chauvat committed
251
               remove_suffix=True):
root's avatar
root committed
252
253
254
255
256
    """
    Return a list of all applicable test modules.
    """
    tests = []
    for name in os.listdir(testdir):
Nicolas Chauvat's avatar
Nicolas Chauvat committed
257
        if not suffix or name.endswith(suffix):
root's avatar
root committed
258
            for prefix in prefixes:
Nicolas Chauvat's avatar
Nicolas Chauvat committed
259
                if name.startswith(prefix):
260
261
                    if remove_suffix and name.endswith(suffix):
                        name = name[:-len(suffix)]
root's avatar
root committed
262
263
264
265
266
267
                    if name not in excludes:
                        tests.append(name)
    tests.sort()
    return tests


268
def run_test(test, verbose, runner=None, capture=0):
root's avatar
root committed
269
270
271
272
273
274
275
276
277
278
279
280
    """
    Run a single test.

    test -- the name of the test
    verbose -- if true, print more messages
    """
    test_support.unload(test)
    try:
        m = load_module_from_name(test, path=sys.path)
#        m = __import__(test, globals(), locals(), sys.path)
        try:
            suite = m.suite
281
            if callable(suite):
root's avatar
root committed
282
283
284
285
286
                suite = suite()
        except AttributeError:
            loader = unittest.TestLoader()
            suite = loader.loadTestsFromModule(m)
        if runner is None:
287
            runner = SkipAwareTextTestRunner(capture=capture) # verbosity=0)
root's avatar
root committed
288
289
290
291
        return runner.run(suite)
    except KeyboardInterrupt, v:
        raise KeyboardInterrupt, v, sys.exc_info()[2]
    except:
Adrien Di Mascio's avatar
Adrien Di Mascio committed
292
        # raise
root's avatar
root committed
293
294
295
296
297
298
299
300
301
302
303
304
305
306
        type, value = sys.exc_info()[:2]
        msg = "test %s crashed -- %s : %s" % (test, type, value)
        if verbose:
            traceback.print_exc()
        return msg

def _count(n, word):
    """format word according to n"""
    if n == 1:
        return "%d %s" % (n, word)
    else:
        return "%d %ss" % (n, word)


307
308
    

root's avatar
root committed
309
## PostMortem Debug facilities #####
Adrien Di Mascio's avatar
Adrien Di Mascio committed
310
def start_interactive_mode(result):
root's avatar
root committed
311
312
    """starts an interactive shell so that the user can inspect errors
    """
Adrien Di Mascio's avatar
Adrien Di Mascio committed
313
314
    debuggers = result.debuggers
    descrs = result.error_descrs + result.fail_descrs
315
316
317
318
319
320
321
    if len(debuggers) == 1:
        # don't ask for test name if there's only one failure
        debuggers[0].start()
    else:
        while True:
            testindex = 0
            print "Choose a test to debug:"
Adrien Di Mascio's avatar
Adrien Di Mascio committed
322
            # order debuggers in the same way than errors were printed
Pierre-Yves David's avatar
Pierre-Yves David committed
323
324
            print "\n".join(['\t%s : %s' % (i, descr) for i, (_, descr)
                in enumerate(descrs)])
325
326
327
328
329
330
331
332
333
334
            print "Type 'exit' (or ^D) to quit"
            print
            try:
                todebug = raw_input('Enter a test name: ')
                if todebug.strip().lower() == 'exit':
                    print
                    break
                else:
                    try:
                        testindex = int(todebug)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
335
                        debugger = debuggers[descrs[testindex][0]]
336
                    except (ValueError, IndexError):
Pierre-Yves David's avatar
Pierre-Yves David committed
337
                        print "ERROR: invalid test number %r" % (todebug, )
338
339
340
                    else:
                        debugger.start()
            except (EOFError, KeyboardInterrupt):
root's avatar
root committed
341
342
343
344
345
346
347
348
349
                print
                break


# test utils ##################################################################
from cStringIO import StringIO

class SkipAwareTestResult(unittest._TextTestResult):

Adrien Di Mascio's avatar
Adrien Di Mascio committed
350
    def __init__(self, stream, descriptions, verbosity,
351
                 exitfirst=False, capture=0, printonly=None,
352
                 pdbmode=False, cvg=None, colorize=False):
adim's avatar
adim committed
353
354
        super(SkipAwareTestResult, self).__init__(stream,
                                                  descriptions, verbosity)
root's avatar
root committed
355
356
        self.skipped = []
        self.debuggers = []
357
358
        self.fail_descrs = []
        self.error_descrs = []
359
        self.exitfirst = exitfirst
Adrien Di Mascio's avatar
Adrien Di Mascio committed
360
        self.capture = capture
adim's avatar
adim committed
361
        self.printonly = printonly
362
        self.pdbmode = pdbmode
363
        self.cvg = cvg
364
        self.colorize = colorize
365
        self.pdbclass = Debugger
366
        self.verbose = verbosity > 1
367
368
369
370
371

    def descrs_for(self, flavour):
        return getattr(self, '%s_descrs' % flavour.lower())
    
    def _create_pdb(self, test_descr, flavour):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
372
        self.descrs_for(flavour).append( (len(self.debuggers), test_descr) )
373
        if self.pdbmode:
374
            self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
375

376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
    def _exc_info_to_string(self, err, test):
        """Converts a sys.exc_info()-style tuple of values into a string.

        This method is overridden here because we want to colorize
        lines if --color is passed, and display local variables if
        --verbose is passed
        """
        exctype, exc, tb = err
        output = ['Traceback (most recent call last)']
        frames = inspect.getinnerframes(tb)
        colorize = self.colorize
        # count number of relevant levels in the traceback: skip first since
        # it's our own _proceed function, and then start counting, using
        # unittest's heuristic
        nb_frames_skipped = self._count_relevant_tb_levels(tb.tb_next)
        for index, (frame, filename, lineno, funcname, ctx, ctxindex) in enumerate(frames):
            if not (0 < index <= nb_frames_skipped):
                continue
            filename = osp.abspath(filename)
395
396
397
398
            if ctx is None: # pyc files or C extensions for instance
                source = '<no source available>'
            else:
                source = ''.join(ctx)
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
            if colorize:
                filename = textutils.colorize_ansi(filename, 'magenta')
                source = colorize_source(source)
            output.append('  File "%s", line %s, in %s' % (filename, lineno, funcname))
            output.append('    %s' % source.strip())
            if self.verbose:
                output.append('%r == %r' % (dir(frame), test.__module__))
                output.append('')
                output.append('    ' + ' local variables '.center(66, '-'))
                for varname, value in sorted(frame.f_locals.items()):
                    output.append('    %s: %r' % (varname, value))
                    if varname == 'self': # special handy processing for self
                        for varname, value in sorted(vars(value).items()):
                            output.append('      self.%s: %r' % (varname, value))
                output.append('    ' + '-' * 66)
                output.append('')
415
        output.append(''.join(traceback.format_exception_only(exctype, exc)))
416
417
        return '\n'.join(output)

root's avatar
root committed
418
    def addError(self, test, err):
Pierre-Yves David's avatar
Pierre-Yves David committed
419
420
        """err ==  (exc_type, exc, tcbk)"""
        exc_type, exc, _ = err # 
root's avatar
root committed
421
422
423
        if exc_type == TestSkipped:
            self.addSkipped(test, exc)
        else:
424
425
            if self.exitfirst:
                self.shouldStop = True
426
            descr = self.getDescription(test)
adim's avatar
adim committed
427
            super(SkipAwareTestResult, self).addError(test, err)
428
            self._create_pdb(descr, 'error')
root's avatar
root committed
429
430

    def addFailure(self, test, err):
431
432
        if self.exitfirst:
            self.shouldStop = True
433
        descr = self.getDescription(test)
Adrien Di Mascio's avatar
oops    
Adrien Di Mascio committed
434
        super(SkipAwareTestResult, self).addFailure(test, err)
435
        self._create_pdb(descr, 'fail')
root's avatar
root committed
436
437

    def addSkipped(self, test, reason):
438
        self.skipped.append((test, self.getDescription(test), reason))
root's avatar
root committed
439
440
441
442
443
444
        if self.showAll:
            self.stream.writeln("SKIPPED")
        elif self.dots:
            self.stream.write('S')

    def printErrors(self):
adim's avatar
adim committed
445
        super(SkipAwareTestResult, self).printErrors()
root's avatar
root committed
446
447
448
        self.printSkippedList()
        
    def printSkippedList(self):
Pierre-Yves David's avatar
Pierre-Yves David committed
449
        for _, descr, err in self.skipped: # test, descr, err
root's avatar
root committed
450
            self.stream.writeln(self.separator1)
451
            self.stream.writeln("%s: %s" % ('SKIPPED', descr))
root's avatar
root committed
452
453
            self.stream.writeln("\t%s" % err)

Adrien Di Mascio's avatar
Adrien Di Mascio committed
454
    def printErrorList(self, flavour, errors):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
455
        for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
456
            self.stream.writeln(self.separator1)
457
            if self.colorize:
458
459
460
461
                self.stream.writeln("%s: %s" % (
                    textutils.colorize_ansi(flavour, color='red'), descr))
            else:
                self.stream.writeln("%s: %s" % (flavour, descr))
462

Adrien Di Mascio's avatar
Adrien Di Mascio committed
463
            self.stream.writeln(self.separator2)
464
            self.stream.writeln(err)
465

Adrien Di Mascio's avatar
Adrien Di Mascio committed
466
467
468
469
            try:
                output, errput = test.captured_output()
            except AttributeError:
                pass # original unittest
adim's avatar
adim committed
470
            else:
Adrien Di Mascio's avatar
Adrien Di Mascio committed
471
472
                if output:
                    self.stream.writeln(self.separator2)
Pierre-Yves David's avatar
Pierre-Yves David committed
473
474
                    self.stream.writeln("captured stdout".center(
                        len(self.separator2)))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
475
476
477
                    self.stream.writeln(self.separator2)
                    self.stream.writeln(output)
                else:
Pierre-Yves David's avatar
Pierre-Yves David committed
478
479
                    self.stream.writeln('no stdout'.center(
                        len(self.separator2)))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
480
481
                if errput:
                    self.stream.writeln(self.separator2)
Pierre-Yves David's avatar
Pierre-Yves David committed
482
483
                    self.stream.writeln("captured stderr".center(
                        len(self.separator2)))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
484
485
486
                    self.stream.writeln(self.separator2)
                    self.stream.writeln(errput)
                else:
Pierre-Yves David's avatar
Pierre-Yves David committed
487
488
                    self.stream.writeln('no stderr'.center(
                        len(self.separator2)))
Adrien Di Mascio's avatar
Adrien Di Mascio committed
489

root's avatar
root committed
490

491
492
493
494
def run(self, result, runcondition=None, options=None):
    for test in self._tests:
        if result.shouldStop:
            break
495
496
497
498
499
500
501
502
        try:
            test(result, runcondition, options)
        except TypeError:
            # this might happen if a raw unittest.TestCase is defined
            # and used with python (and not pytest)
            warn("%s should extend lgc.testlib.TestCase instead of unittest.TestCase"
                 % test)
            test(result)
503
504
    return result
unittest.TestSuite.run = run
505
506
507
508

# backward compatibility: TestSuite might be imported from lgc.testlib
TestSuite = unittest.TestSuite

509
510
511
512
# python2.3 compat
def __call__(self, *args, **kwds):
    return self.run(*args, **kwds)
unittest.TestSuite.__call__ = __call__
513

514

root's avatar
root committed
515
516
class SkipAwareTextTestRunner(unittest.TextTestRunner):

Adrien Di Mascio's avatar
Adrien Di Mascio committed
517
    def __init__(self, stream=sys.stderr, verbosity=1,
518
                 exitfirst=False, capture=False, printonly=None,
Pierre-Yves David's avatar
Pierre-Yves David committed
519
                 pdbmode=False, cvg=None, test_pattern=None,
520
                 skipped_patterns=(), colorize=False, options=None):
adim's avatar
adim committed
521
522
        super(SkipAwareTextTestRunner, self).__init__(stream=stream,
                                                      verbosity=verbosity)
523
        self.exitfirst = exitfirst
Adrien Di Mascio's avatar
Adrien Di Mascio committed
524
        self.capture = capture
adim's avatar
adim committed
525
        self.printonly = printonly
526
        self.pdbmode = pdbmode
527
        self.cvg = cvg
528
529
        self.test_pattern = test_pattern
        self.skipped_patterns = skipped_patterns
530
        self.colorize = colorize
Adrien Di Mascio's avatar
Adrien Di Mascio committed
531
        self.options = options
532
533

    def _this_is_skipped(self, testedname):
534
        return any([(pat in testedname) for pat in self.skipped_patterns])
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551

    def _runcondition(self, test, skipgenerator=True):
        if isinstance(test, InnerTest):
            testname = test.name
        else:
            if isinstance(test, TestCase):
                meth = test._get_test_method()
                func = meth.im_func
                testname = '%s.%s' % (meth.im_class.__name__, func.__name__)
            elif isinstance(test, types.FunctionType):
                func = test
                testname = func.__name__
            elif isinstance(test, types.MethodType):
                func = test.im_func
                testname = '%s.%s' % (test.im_class.__name__, func.__name__)
            else:
                return True # Not sure when this happens
Pierre-Yves David's avatar
Pierre-Yves David committed
552

553
            if is_generator(func) and skipgenerator:
Pierre-Yves David's avatar
Pierre-Yves David committed
554
555
                return self.does_match_tags(func) # Let inner tests decide at run time

556
557
558
        # print 'testname', testname, self.test_pattern
        if self._this_is_skipped(testname):
            return False # this was explicitly skipped
Pierre-Yves David's avatar
Pierre-Yves David committed
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
        if self.test_pattern is not None:
            try:
                classpattern, testpattern = self.test_pattern.split('.')
                klass, name = testname.split('.')
                if classpattern not in klass or testpattern not in name:
                    return False
            except ValueError:
                if self.test_pattern not in testname:
                    return False

        return self.does_match_tags(test)

    def does_match_tags(self, test):
        if self.options is not None:
            tags_pattern = getattr(self.options, 'tags_pattern', None)
            if tags_pattern is not None:
                tags = getattr(test, 'tags', Tags())
                return tags.match(tags_pattern)
        return True # no pattern
578
    
root's avatar
root committed
579
    def _makeResult(self):
Pierre-Yves David's avatar
Pierre-Yves David committed
580
581
        return SkipAwareTestResult(self.stream, self.descriptions,
                                   self.verbosity, self.exitfirst, self.capture,
582
583
                                   self.printonly, self.pdbmode, self.cvg,
                                   self.colorize)
root's avatar
root committed
584

585
586
587
588
    def run(self, test):
        "Run the given test case or test suite."
        result = self._makeResult()
        startTime = time.time()
Adrien Di Mascio's avatar
Adrien Di Mascio committed
589
        test(result, self._runcondition, self.options)
590
591
592
593
594
595
596
597
598
        stopTime = time.time()
        timeTaken = stopTime - startTime
        result.printErrors()
        self.stream.writeln(result.separator2)
        run = result.testsRun
        self.stream.writeln("Ran %d test%s in %.3fs" %
                            (run, run != 1 and "s" or "", timeTaken))
        self.stream.writeln()
        if not result.wasSuccessful():
599
            if self.colorize:
600
601
602
                self.stream.write(textutils.colorize_ansi("FAILED", color='red'))
            else:
                self.stream.write("FAILED")
603
        else:
604
            if self.colorize:
605
606
607
                self.stream.write(textutils.colorize_ansi("OK", color='green'))
            else:
                self.stream.write("OK")
608
609
610
611
612
613
614
615
616
617
618
619
620
621
        failed, errored, skipped = map(len, (result.failures, result.errors,
             result.skipped))
        
        det_results = []
        for name, value in (("failures", result.failures),
                            ("errors",result.errors),
                            ("skipped", result.skipped)):
            if value:
                det_results.append("%s=%i" % (name, len(value)))
        if det_results:
            self.stream.write(" (")
            self.stream.write(', '.join(det_results))
            self.stream.write(")")
        self.stream.writeln("")
622
623
624
        return result


625
class keywords(dict):
626
    """Keyword args (**kwargs) support for generative tests."""
627
628

class starargs(tuple):
629
    """Variable arguments (*args) for generative tests."""
630
631
632
633
    def __new__(cls, *args):
        return tuple.__new__(cls, args)


634
635
636

class NonStrictTestLoader(unittest.TestLoader):
    """
637
638
639
640
    Overrides default testloader to be able to omit classname when
    specifying tests to run on command line.

    For example, if the file test_foo.py contains ::
641
642
643
644
645
646
647
648
649
    
        class FooTC(TestCase):
            def test_foo1(self): # ...
            def test_foo2(self): # ...
            def test_bar1(self): # ...

        class BarTC(TestCase):
            def test_bar2(self): # ...

650
651
652
653
654
    'python test_foo.py' will run the 3 tests in FooTC
    'python test_foo.py FooTC' will run the 3 tests in FooTC
    'python test_foo.py test_foo' will run test_foo1 and test_foo2
    'python test_foo.py test_foo1' will run test_foo1
    'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2
655
656
    """

Adrien Di Mascio's avatar
Adrien Di Mascio committed
657
658
659
660
661
662
663
664
    def __init__(self):
        self.skipped_patterns = []

    def loadTestsFromNames(self, names, module=None):
        suites = []
        for name in names:
            suites.extend(self.loadTestsFromName(name, module))
        return self.suiteClass(suites)
665
666
667
668

    def _collect_tests(self, module):
        tests = {}
        for obj in vars(module).values():
669
670
            if (issubclass(type(obj), (types.ClassType, type)) and
                 issubclass(obj, unittest.TestCase)):
671
                classname = obj.__name__
672
                if classname[0] == '_' or self._this_is_skipped(classname):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
673
                    continue
674
675
676
677
678
679
                methodnames = []
                # obj is a TestCase class
                for attrname in dir(obj):
                    if attrname.startswith(self.testMethodPrefix):
                        attr = getattr(obj, attrname)
                        if callable(attr):
680
                            methodnames.append(attrname)
681
682
683
                # keep track of class (obj) for convenience
                tests[classname] = (obj, methodnames)
        return tests
684
685
686
687
688
689

    def loadTestsFromSuite(self, module, suitename):
        try:
            suite = getattr(module, suitename)()
        except AttributeError:
            return []
690
691
692
693
694
        assert hasattr(suite, '_tests'), \
               "%s.%s is not a valid TestSuite" % (module.__name__, suitename)
        # python2.3 does not implement __iter__ on suites, we need to return
        # _tests explicitly
        return suite._tests
695
    
696
697
698
699
    def loadTestsFromName(self, name, module=None):
        parts = name.split('.')
        if module is None or len(parts) > 2:
            # let the base class do its job here
adim's avatar
adim committed
700
            return [super(NonStrictTestLoader, self).loadTestsFromName(name)]
701
702
703
704
705
706
        tests = self._collect_tests(module)
        # import pprint
        # pprint.pprint(tests)
        collected = []
        if len(parts) == 1:
            pattern = parts[0]
Pierre-Yves David's avatar
Pierre-Yves David committed
707
708
            if callable(getattr(module, pattern, None)
                    )  and pattern not in tests:
709
710
                # consider it as a suite
                return self.loadTestsFromSuite(module, pattern)
711
712
713
714
            if pattern in tests:
                # case python unittest_foo.py MyTestTC
                klass, methodnames = tests[pattern]
                for methodname in methodnames:
Pierre-Yves David's avatar
Pierre-Yves David committed
715
716
                    collected = [klass(methodname)
                        for methodname in methodnames]
717
718
719
            else:
                # case python unittest_foo.py something
                for klass, methodnames in tests.values():
Pierre-Yves David's avatar
Pierre-Yves David committed
720
721
                    collected += [klass(methodname)
                        for methodname in methodnames]
722
723
724
725
726
        elif len(parts) == 2:
            # case "MyClass.test_1"
            classname, pattern = parts
            klass, methodnames = tests.get(classname, (None, []))
            for methodname in methodnames:
727
                collected = [klass(methodname) for methodname in methodnames]
728
729
        return collected

Adrien Di Mascio's avatar
Adrien Di Mascio committed
730
    def _this_is_skipped(self, testedname):
731
        return any([(pat in testedname) for pat in self.skipped_patterns])
Adrien Di Mascio's avatar
Adrien Di Mascio committed
732

Adrien Di Mascio's avatar
Adrien Di Mascio committed
733
734
735
736
    def getTestCaseNames(self, testCaseClass):
        """Return a sorted sequence of method names found within testCaseClass
        """
        is_skipped = self._this_is_skipped
737
738
        classname = testCaseClass.__name__
        if classname[0] == '_' or is_skipped(classname):
Adrien Di Mascio's avatar
Adrien Di Mascio committed
739
            return []
Pierre-Yves David's avatar
Pierre-Yves David committed
740
741
        testnames = super(NonStrictTestLoader, self).getTestCaseNames(
                testCaseClass)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
742
        return [testname for testname in testnames if not is_skipped(testname)]
Adrien Di Mascio's avatar
Adrien Di Mascio committed
743

Adrien Di Mascio's avatar
Adrien Di Mascio committed
744
    
root's avatar
root committed
745
746
747
748
749
750
751
752
753
class SkipAwareTestProgram(unittest.TestProgram):
    # XXX: don't try to stay close to unittest.py, use optparse
    USAGE = """\
Usage: %(progName)s [options] [test] [...]

Options:
  -h, --help       Show this message
  -v, --verbose    Verbose output
  -i, --pdb        Enable test failure inspection
754
  -x, --exitfirst  Exit on first failure
755
  -c, --capture    Captures and prints standard out/err only on errors
Pierre-Yves David's avatar
Pierre-Yves David committed
756
757
  -p, --printonly  Only prints lines matching specified pattern
                   (implies capture)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
758
  -s, --skip       skip test matching this pattern (no regexp for now)
root's avatar
root committed
759
  -q, --quiet      Minimal output
760
  --color          colorize tracebacks
root's avatar
root committed
761

Pierre-Yves David's avatar
Pierre-Yves David committed
762
763
  -m, --match      Run only test whose tag match this pattern

root's avatar
root committed
764
765
766
767
768
769
770
Examples:
  %(progName)s                               - run default set of tests
  %(progName)s MyTestSuite                   - run suite 'MyTestSuite'
  %(progName)s MyTestCase.testSomething      - run MyTestCase.testSomething
  %(progName)s MyTestCase                    - run all 'test*' test methods
                                               in MyTestCase
"""
Adrien Di Mascio's avatar
Adrien Di Mascio committed
771
    def __init__(self, module='__main__', defaultTest=None, batchmode=False,
772
                 cvg=None, options=None, outstream=sys.stderr):
773
        self.batchmode = batchmode
774
        self.cvg = cvg
Adrien Di Mascio's avatar
Adrien Di Mascio committed
775
        self.options = options
776
        self.outstream = outstream
adim's avatar
adim committed
777
        super(SkipAwareTestProgram, self).__init__(
778
779
            module=module, defaultTest=defaultTest,
            testLoader=NonStrictTestLoader())
780
    
root's avatar
root committed
781
782
    def parseArgs(self, argv):
        self.pdbmode = False
783
        self.exitfirst = False
784
        self.capture = 0
adim's avatar
adim committed
785
        self.printonly = None
786
787
        self.skipped_patterns = []
        self.test_pattern = None
Pierre-Yves David's avatar
Pierre-Yves David committed
788
        self.tags_pattern = None
789
        self.colorize = False
root's avatar
root committed
790
791
        import getopt
        try:
792
            options, args = getopt.getopt(argv[1:], 'hHvixrqcp:s:m:',
Pierre-Yves David's avatar
Pierre-Yves David committed
793
                                          ['help', 'verbose', 'quiet', 'pdb',
794
                                           'exitfirst', 'restart', 'capture', 'printonly=',
795
                                           'skip=', 'color', 'match='])
root's avatar
root committed
796
            for opt, value in options:
Pierre-Yves David's avatar
Pierre-Yves David committed
797
                if opt in ('-h', '-H', '--help'):
root's avatar
root committed
798
799
800
                    self.usageExit()
                if opt in ('-i', '--pdb'):
                    self.pdbmode = True
801
802
                if opt in ('-x', '--exitfirst'):
                    self.exitfirst = True
803
804
805
                if opt in ('-r', '--restart'):
                    self.restart = True
                    self.exitfirst = True
Pierre-Yves David's avatar
Pierre-Yves David committed
806
                if opt in ('-q', '--quiet'):
root's avatar
root committed
807
                    self.verbosity = 0
Pierre-Yves David's avatar
Pierre-Yves David committed
808
                if opt in ('-v', '--verbose'):
root's avatar
root committed
809
                    self.verbosity = 2
Adrien Di Mascio's avatar
Adrien Di Mascio committed
810
                if opt in ('-c', '--capture'):
811
                    self.capture += 1
adim's avatar
adim committed
812
813
                if opt in ('-p', '--printonly'):
                    self.printonly = re.compile(value)
Adrien Di Mascio's avatar
Adrien Di Mascio committed
814
                if opt in ('-s', '--skip'):
Pierre-Yves David's avatar
Pierre-Yves David committed
815
816
                    self.skipped_patterns = [pat.strip() for pat in
                        value.split(', ')]
817
818
                if opt == '--color':
                    self.colorize = True
Pierre-Yves David's avatar
Pierre-Yves David committed
819
820
821
                if opt in ('-m', '--match'):
                    #self.tags_pattern = value
                    self.options["tag_pattern"] = value
Adrien Di Mascio's avatar
Adrien Di Mascio committed
822
            self.testLoader.skipped_patterns = self.skipped_patterns
adim's avatar
adim committed
823
824
            if self.printonly is not None:
                self.capture += 1
root's avatar
root committed
825
            if len(args) == 0 and self.defaultTest is None:
826
                suitefunc = getattr(self.module, 'suite', None)
Pierre-Yves David's avatar
Pierre-Yves David committed
827
828
                if isinstance(suitefunc, (types.FunctionType,
                        types.MethodType)):
829
830
831
                    self.test = self.module.suite()
                else:
                    self.test = self.testLoader.loadTestsFromModule(self.module)
root's avatar
root committed
832
833
                return
            if len(args) > 0:
834
                self.test_pattern = args[0]
root's avatar
root committed
835
836
                self.testNames = args
            else:
Pierre-Yves David's avatar
Pierre-Yves David committed
837
                self.testNames = (self.defaultTest, )
root's avatar
root committed
838
839
840
841
842
843
            self.createTests()
        except getopt.error, msg:
            self.usageExit(msg)


    def runTests(self):
844
845
        if hasattr(self.module, 'setup_module'):
            try:
Adrien Di Mascio's avatar
Adrien Di Mascio committed
846
                self.module.setup_module(self.options)
847
848
849
            except Exception, exc:
                print 'setup_module error:', exc
                sys.exit(1)
850
        self.testRunner = SkipAwareTextTestRunner(verbosity=self.verbosity,
851
                                                  stream=self.outstream,
852
853
854
855
856
857
858
                                                  exitfirst=self.exitfirst,
                                                  capture=self.capture,
                                                  printonly=self.printonly,
                                                  pdbmode=self.pdbmode,
                                                  cvg=self.cvg,
                                                  test_pattern=self.test_pattern,
                                                  skipped_patterns=self.skipped_patterns,
859
                                                  colorize=self.colorize,
860
                                                  options=self.options)
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877

        def removeSucceededTests(obj, succTests):
            """ Recurcive function that removes succTests from
            a TestSuite or TestCase
            """
            if isinstance(obj, TestSuite):
                removeSucceededTests(obj._tests, succTests)
            if isinstance(obj, list):
                for el in obj[:]:
                    if isinstance(el, TestSuite):
                        removeSucceededTests(el, succTests)
                    elif isinstance(el, TestCase):
                        descr = '.'.join((el.__class__.__module__,
                                el.__class__.__name__,
                                el._testMethodName))
                        if descr in succTests:
                            obj.remove(el)
878
879
        # take care, self.options may be None
        if getattr(self.options, 'restart', False):
880
            # retrieve succeeded tests from FILE_RESTART
881
            try:
882
883
                restartfile = open(FILE_RESTART, 'r')
                try:
Aurelien Campeas's avatar
Aurelien Campeas committed
884
885
886
887
888
889
                    try:
                        succeededtests = list(elem.rstrip('\n\r') for elem in 
                            restartfile.readlines())
                        removeSucceededTests(self.test, succeededtests)
                    except Exception, e:
                        raise e
890
891
892
893
                finally:
                    restartfile.close()
            except Exception ,e:
                raise "Error while reading \
894
895
succeeded tests into", osp.join(os.getcwd(),FILE_RESTART)

root's avatar
root committed
896
        result = self.testRunner.run(self.test)
897
898
        if hasattr(self.module, 'teardown_module'):
            try:
Adrien Di Mascio's avatar
Adrien Di Mascio committed
899
                self.module.teardown_module(self.options)
900
901
902
            except Exception, exc:
                print 'teardown_module error:', exc
                sys.exit(1)
root's avatar
root committed
903
        if os.environ.get('PYDEBUG'):
Pierre-Yves David's avatar
Pierre-Yves David committed
904
905
            warn("PYDEBUG usage is deprecated, use -i / --pdb instead",
                DeprecationWarning)
root's avatar
root committed
906
907
            self.pdbmode = True
        if result.debuggers and self.pdbmode:
Adrien Di Mascio's avatar
Adrien Di Mascio committed
908
            start_interactive_mode(result)
909
910
911
        if not self.batchmode:
            sys.exit(not result.wasSuccessful())
        self.result = result
root's avatar
root committed
912

Adrien Di Mascio's avatar
Adrien Di Mascio committed
913

914
915
916
917
918
919


class FDCapture: 
    """adapted from py lib (http://codespeak.net/py)
    Capture IO to/from a given os-level filedescriptor.
    """
adim's avatar
adim committed
920
    def __init__(self, fd, attr='stdout', printonly=None):
921
922
        self.targetfd = fd
        self.tmpfile = os.tmpfile() # self.maketempfile()
adim's avatar
adim committed
923
        self.printonly = printonly
924
925
926
927
928
929
        # save original file descriptor
        self._savefd = os.dup(fd)
        # override original file descriptor
        os.dup2(self.tmpfile.fileno(), fd)
        # also modify sys module directly
        self.oldval = getattr(sys, attr)
adim's avatar
adim committed
930
        setattr(sys, attr, self) # self.tmpfile)
931
        self.attr = attr
adim's avatar
adim committed
932
933
934
935
936
937
938
939
940
941

    def write(self, msg):
        # msg might be composed of several lines
        for line in msg.splitlines():
            line += '\n' # keepdend=True is not enough
            if self.printonly is None or self.printonly.search(line) is None:
                self.tmpfile.write(line)
            else:
                os.write(self._savefd, line)
        
942
943
944
945
946
947
948
949
950
##     def maketempfile(self):
##         tmpf = os.tmpfile()
##         fd = os.dup(tmpf.fileno())
##         newf = os.fdopen(fd, tmpf.mode, 0) # No buffering
##         tmpf.close()
##         return newf
        
    def restore(self):
        """restore original fd and returns captured output"""
Pierre-Yves David's avatar
Pierre-Yves David committed
951
        #XXX: hack hack hack
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
        self.tmpfile.flush()
        try:
            ref_file = getattr(sys, '__%s__' % self.attr)
            ref_file.flush()
        except AttributeError:
            pass
        if hasattr(self.oldval, 'flush'):
            self.oldval.flush()
        # restore original file descriptor
        os.dup2(self._savefd, self.targetfd)
        # restore sys module
        setattr(sys, self.attr, self.oldval)
        # close backup descriptor
        os.close(self._savefd)
        # go to beginning of file and read it
        self.tmpfile.seek(0)
        return self.tmpfile.read()


adim's avatar
adim committed
971
def _capture(which='stdout', printonly=None):
972
973
974
    """private method, should not be called directly
    (cf. capture_stdout() and capture_stderr())
    """
Pierre-Yves David's avatar
Pierre-Yves David committed
975
976
    assert which in ('stdout', 'stderr'
        ), "Can only capture stdout or stderr, not %s" % which
977
978
979
980
    if which == 'stdout':
        fd = 1
    else:
        fd = 2
adim's avatar
adim committed
981
    return FDCapture(fd, which, printonly)
982
    
adim's avatar
adim committed
983
def capture_stdout(printonly=None):
984
985
986
987
988
    """captures the standard output

    returns a handle object which has a `restore()` method.
    The restore() method returns the captured stdout and restores it
    """
adim's avatar
adim committed
989
    return _capture('stdout', printonly)
990
        
adim's avatar
adim committed
991
def capture_stderr(printonly=None):
992
993
994
995
996
    """captures the standard error output

    returns a handle object which has a `restore()` method.
    The restore() method returns the captured stderr and restores it
    """
adim's avatar
adim committed
997
    return _capture('stderr', printonly)
998

Adrien Di Mascio's avatar
Adrien Di Mascio committed
999

1000
def unittest_main(module='__main__', defaultTest=None,
1001
1002
                  batchmode=False, cvg=None, options=None,
                  outstream=sys.stderr):
1003
1004
    """use this functon if you want to have the same functionality
    as unittest.main"""
1005
1006
    return SkipAwareTestProgram(module, defaultTest, batchmode,
                                cvg, options, outstream)
root's avatar
root committed
1007
1008
1009

class TestSkipped(Exception):
    """raised when a test is skipped"""
1010
1011
1012
1013
1014

def is_generator(function):
    flags = function.func_code.co_flags
    return flags & CO_GENERATOR

1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039

def parse_generative_args(params):
    args = []
    varargs = ()
    kwargs = {}
    flags = 0 # 2 <=> starargs, 4 <=> kwargs
    for param in params:
        if isinstance(param, starargs):
            varargs = param
            if flags:
                raise TypeError('found starargs after keywords !')
            flags |= 2
            args += list(varargs)
        elif isinstance(param, keywords):
            kwargs = param
            if flags & 4:
                raise TypeError('got multiple keywords parameters')
            flags |= 4
        elif flags & 2 or flags & 4:
            raise TypeError('found parameters after kwargs or args')
        else:
            args.append(param)

    return args, kwargs

1040
1041
1042
1043
1044
class InnerTest(tuple):
    def __new__(cls, name, *data):
        instance = tuple.__new__(cls, data)
        instance.name = name
        return instance
1045
1046
1047
1048
1049

class ClassGetProperty(object):
    """this is a simple property-like class but for
    class attributes.
    """
1050
    
1051
1052
1053
    def __init__(self, getter):
        self.getter = getter

Pierre-Yves David's avatar
Pierre-Yves David committed
1054
1055
    def __get__(self, obj, objtype): # pylint: disable-msg=W0613
        "__get__(objn objtype) -> objtype"
1056
1057
        return self.getter(objtype)

1058

root's avatar
root committed
1059
1060
1061
class TestCase(unittest.TestCase):
    """unittest.TestCase with some additional methods"""

1062
    capture = False
1063
    pdbclass = Debugger
1064
    
1065
    def __init__(self, methodName='runTest'):
adim's avatar
adim committed
1066
        super(TestCase, self).__init__(methodName)
1067
1068
1069
1070
        # internal API changed in python2.5
        if sys.version_info >= (2, 5):
            self.__exc_info = self._exc_info
            self.__testMethodName = self._testMethodName
1071
1072
1073
        else:
            # let's give easier access to _testMethodName to every subclasses
            self._testMethodName = self.__testMethodName
Adrien Di Mascio's avatar
Adrien Di Mascio committed
1074
1075
        self._captured_stdout = ""
        self._captured_stderr = ""
adim's avatar
adim committed
1076
1077
        self._out = []
        self._err = []
1078
        self._current_test_descr = None
Adrien Di Mascio's avatar
Adrien Di Mascio committed
1079
        self._options_ = None
1080