Commit 080b2d24 authored by Laurent Peuch's avatar Laurent Peuch
Browse files

Black the whole code base

parent 9bcd8ca20d6d
......@@ -23,41 +23,41 @@ __docformat__ = "restructuredtext en"
import os
from os.path import join
distname = 'logilab-common'
modname = 'common'
subpackage_of = 'logilab'
distname = "logilab-common"
modname = "common"
subpackage_of = "logilab"
subpackage_master = True
numversion = (1, 6, 1)
version = '.'.join([str(num) for num in numversion])
license = 'LGPL' # 2.1 or later
description = ("collection of low-level Python packages and modules"
" used by Logilab projects")
license = "LGPL" # 2.1 or later
description = "collection of low-level Python packages and modules" " used by Logilab projects"
web = "http://www.logilab.org/project/%s" % distname
mailinglist = "mailto://python-projects@lists.logilab.org"
author = "Logilab"
author_email = "contact@logilab.fr"
scripts = [join('bin', 'logilab-pytest')]
include_dirs = [join('test', 'data')]
scripts = [join("bin", "logilab-pytest")]
include_dirs = [join("test", "data")]
install_requires = [
'setuptools',
'mypy-extensions',
'typing_extensions',
"setuptools",
"mypy-extensions",
"typing_extensions",
]
tests_require = [
'pytz',
'egenix-mx-base',
"pytz",
"egenix-mx-base",
]
if os.name == 'nt':
install_requires.append('colorama')
if os.name == "nt":
install_requires.append("colorama")
classifiers = ["Topic :: Utilities",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
]
classifiers = [
"Topic :: Utilities",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
]
__import__('pkg_resources').declare_namespace(__name__)
__import__("pkg_resources").declare_namespace(__name__)
......@@ -31,19 +31,19 @@ import types
import pkg_resources
from typing import List, Sequence
__version__ = pkg_resources.get_distribution('logilab-common').version
__version__ = pkg_resources.get_distribution("logilab-common").version
# deprecated, but keep compatibility with pylint < 1.4.4
__pkginfo__ = types.ModuleType('__pkginfo__')
__pkginfo__ = types.ModuleType("__pkginfo__")
__pkginfo__.__package__ = __name__
# mypy output: Module has no attribute "version"
# logilab's magic
__pkginfo__.version = __version__ # type: ignore
sys.modules['logilab.common.__pkginfo__'] = __pkginfo__
sys.modules["logilab.common.__pkginfo__"] = __pkginfo__
STD_BLACKLIST = ('CVS', '.svn', '.hg', '.git', '.tox', 'debian', 'dist', 'build')
STD_BLACKLIST = ("CVS", ".svn", ".hg", ".git", ".tox", "debian", "dist", "build")
IGNORED_EXTENSIONS = ('.pyc', '.pyo', '.elc', '~', '.swp', '.orig')
IGNORED_EXTENSIONS = (".pyc", ".pyo", ".elc", "~", ".swp", ".orig")
# set this to False if you've mx DateTime installed but you don't want your db
# adapter to use it (should be set before you got a connection)
......@@ -52,12 +52,14 @@ USE_MX_DATETIME = True
class attrdict(dict):
"""A dictionary for which keys are also accessible as attributes."""
def __getattr__(self, attr: str) -> str:
try:
return self[attr]
except KeyError:
raise AttributeError(attr)
class dictattr(dict):
def __init__(self, proxy):
self.__proxy = proxy
......@@ -68,13 +70,17 @@ class dictattr(dict):
except AttributeError:
raise KeyError(attr)
class nullobject(object):
def __repr__(self):
return '<nullobject>'
return "<nullobject>"
def __bool__(self):
return False
__nonzero__ = __bool__
class tempattr(object):
def __init__(self, obj, attr, value):
self.obj = obj
......@@ -90,7 +96,6 @@ class tempattr(object):
setattr(self.obj, self.attr, self.oldvalue)
# flatten -----
# XXX move in a specific module and use yield instead
# do not mix flatten and translate
......@@ -105,10 +110,10 @@ class tempattr(object):
# except (TypeError, ValueError): return False
# return True
#
#def is_scalar(obj):
# def is_scalar(obj):
# return is_string_like(obj) or not iterable(obj)
#
#def flatten(seq):
# def flatten(seq):
# for item in seq:
# if is_scalar(item):
# yield item
......@@ -116,6 +121,7 @@ class tempattr(object):
# for subitem in flatten(item):
# yield subitem
def flatten(iterable, tr_func=None, results=None):
"""Flatten a list of list with any level.
......@@ -141,6 +147,7 @@ def flatten(iterable, tr_func=None, results=None):
# XXX is function below still used ?
def make_domains(lists):
"""
Given a list of lists, return a list of domain for each list to produce all
......@@ -157,7 +164,7 @@ def make_domains(lists):
for iterable in lists:
new_domain = iterable[:]
for i in range(len(domains)):
domains[i] = domains[i]*len(iterable)
domains[i] = domains[i] * len(iterable)
if domains:
missing = (len(domains[0]) - len(iterable)) / len(iterable)
i = 0
......@@ -173,6 +180,7 @@ def make_domains(lists):
# private stuff ################################################################
def _handle_blacklist(blacklist: Sequence[str], dirnames: List[str], filenames: List[str]) -> None:
"""remove files/directories in the black list
......@@ -183,4 +191,3 @@ def _handle_blacklist(blacklist: Sequence[str], dirnames: List[str], filenames:
dirnames.remove(norecurs)
elif norecurs in filenames:
filenames.remove(norecurs)
......@@ -47,7 +47,7 @@ class Cache(dict):
""" Warning : Cache.__init__() != dict.__init__().
Constructor does not take any arguments beside size.
"""
assert size >= 0, 'cache size must be >= 0 (0 meaning no caching)'
assert size >= 0, "cache size must be >= 0 (0 meaning no caching)"
self.size = size
self._usage: List = []
self._lock = Lock()
......@@ -74,12 +74,13 @@ class Cache(dict):
del self._usage[0]
self._usage.append(key)
else:
pass # key is already the most recently used key
pass # key is already the most recently used key
def __getitem__(self, key: _KeyType):
value = super(Cache, self).__getitem__(key)
self._update_usage(key)
return value
__getitem__ = locked(_acquire, _release)(__getitem__)
def __setitem__(self, key: _KeyType, item):
......@@ -87,24 +88,28 @@ class Cache(dict):
if self.size > 0:
super(Cache, self).__setitem__(key, item)
self._update_usage(key)
__setitem__ = locked(_acquire, _release)(__setitem__)
def __delitem__(self, key: _KeyType):
super(Cache, self).__delitem__(key)
self._usage.remove(key)
__delitem__ = locked(_acquire, _release)(__delitem__)
def clear(self):
super(Cache, self).clear()
self._usage = []
clear = locked(_acquire, _release)(clear)
def pop(self, key: _KeyType, default=_marker):
if key in self:
self._usage.remove(key)
#if default is _marker:
# if default is _marker:
# return super(Cache, self).pop(key)
return super(Cache, self).pop(key, default)
pop = locked(_acquire, _release)(pop)
def popitem(self):
......@@ -115,5 +120,3 @@ class Cache(dict):
def update(self, other):
raise NotImplementedError()
......@@ -52,9 +52,9 @@ import codecs
from typing import List, Any, Optional, Tuple
from _io import StringIO
BULLET = '*'
SUBBULLET = '-'
INDENT = ' ' * 4
BULLET = "*"
SUBBULLET = "-"
INDENT = " " * 4
class NoEntry(Exception):
......@@ -69,9 +69,10 @@ class Version(tuple):
"""simple class to handle soft version number has a tuple while
correctly printing it as X.Y.Z
"""
def __new__(cls, versionstr):
if isinstance(versionstr, str):
versionstr = versionstr.strip(' :') # XXX (syt) duh?
versionstr = versionstr.strip(" :") # XXX (syt) duh?
parsed = cls.parse(versionstr)
else:
parsed = versionstr
......@@ -79,26 +80,29 @@ class Version(tuple):
@classmethod
def parse(cls, versionstr: str) -> List[int]:
versionstr = versionstr.strip(' :')
versionstr = versionstr.strip(" :")
try:
return [int(i) for i in versionstr.split('.')]
return [int(i) for i in versionstr.split(".")]
except ValueError as ex:
raise ValueError("invalid literal for version '%s' (%s)" %
(versionstr, ex))
raise ValueError("invalid literal for version '%s' (%s)" % (versionstr, ex))
def __str__(self) -> str:
return '.'.join([str(i) for i in self])
return ".".join([str(i) for i in self])
# upstream change log #########################################################
class ChangeLogEntry(object):
"""a change log entry, i.e. a set of messages associated to a version and
its release date
"""
version_class = Version
def __init__(self, date: Optional[str] = None, version: Optional[str] = None, **kwargs: Any) -> None:
def __init__(
self, date: Optional[str] = None, version: Optional[str] = None, **kwargs: Any
) -> None:
self.__dict__.update(kwargs)
self.version: Optional[Version]
if version:
......@@ -116,8 +120,7 @@ class ChangeLogEntry(object):
"""complete the latest added message
"""
if not self.messages:
raise ValueError('unable to complete last message as '
'there is no previous message)')
raise ValueError("unable to complete last message as " "there is no previous message)")
if self.messages[-1][1]: # sub messages
self.messages[-1][1][-1].append(msg_suite)
else: # message
......@@ -125,29 +128,26 @@ class ChangeLogEntry(object):
def add_sub_message(self, sub_msg: str, key: Optional[Any] = None) -> None:
if not self.messages:
raise ValueError('unable to complete last message as '
'there is no previous message)')
raise ValueError("unable to complete last message as " "there is no previous message)")
if key is None:
self.messages[-1][1].append([sub_msg])
else:
raise NotImplementedError('sub message to specific key '
'are not implemented yet')
raise NotImplementedError("sub message to specific key " "are not implemented yet")
def write(self, stream: StringIO = sys.stdout) -> None:
"""write the entry to file """
stream.write(u'%s -- %s\n' % (self.date or '', self.version or ''))
stream.write("%s -- %s\n" % (self.date or "", self.version or ""))
for msg, sub_msgs in self.messages:
stream.write(u'%s%s %s\n' % (INDENT, BULLET, msg[0]))
stream.write(u''.join(msg[1:]))
stream.write("%s%s %s\n" % (INDENT, BULLET, msg[0]))
stream.write("".join(msg[1:]))
if sub_msgs:
stream.write(u'\n')
stream.write("\n")
for sub_msg in sub_msgs:
stream.write(u'%s%s %s\n' %
(INDENT * 2, SUBBULLET, sub_msg[0]))
stream.write(u''.join(sub_msg[1:]))
stream.write(u'\n')
stream.write("%s%s %s\n" % (INDENT * 2, SUBBULLET, sub_msg[0]))
stream.write("".join(sub_msg[1:]))
stream.write("\n")
stream.write(u'\n\n')
stream.write("\n\n")
class ChangeLog(object):
......@@ -155,23 +155,22 @@ class ChangeLog(object):
entry_class = ChangeLogEntry
def __init__(self, changelog_file: str, title: str = u'') -> None:
def __init__(self, changelog_file: str, title: str = "") -> None:
self.file = changelog_file
assert isinstance(title, type(u'')), 'title must be a unicode object'
assert isinstance(title, type("")), "title must be a unicode object"
self.title = title
self.additional_content = u''
self.additional_content = ""
self.entries: List[ChangeLogEntry] = []
self.load()
def __repr__(self):
return '<ChangeLog %s at %s (%s entries)>' % (self.file, id(self),
len(self.entries))
return "<ChangeLog %s at %s (%s entries)>" % (self.file, id(self), len(self.entries))
def add_entry(self, entry: ChangeLogEntry) -> None:
"""add a new entry to the change log"""
self.entries.append(entry)
def get_entry(self, version='', create=None):
def get_entry(self, version="", create=None):
""" return a given changelog entry
if version is omitted, return the current entry
"""
......@@ -197,7 +196,7 @@ class ChangeLog(object):
def load(self) -> None:
""" read a logilab's ChangeLog from file """
try:
stream = codecs.open(self.file, encoding='utf-8')
stream = codecs.open(self.file, encoding="utf-8")
except IOError:
return
......@@ -209,20 +208,20 @@ class ChangeLog(object):
words = sline.split()
# if new entry
if len(words) == 1 and words[0] == '--':
if len(words) == 1 and words[0] == "--":
expect_sub = False
last = self.entry_class()
self.add_entry(last)
# if old entry
elif len(words) == 3 and words[1] == '--':
elif len(words) == 3 and words[1] == "--":
expect_sub = False
last = self.entry_class(words[0], words[2])
self.add_entry(last)
# if title
elif sline and last is None:
self.title = '%s%s' % (self.title, line)
self.title = "%s%s" % (self.title, line)
# if new entry
elif sline and sline[0] == BULLET:
expect_sub = False
......@@ -243,14 +242,15 @@ class ChangeLog(object):
stream.close()
def format_title(self) -> str:
return u'%s\n\n' % self.title.strip()
return "%s\n\n" % self.title.strip()
def save(self):
"""write back change log"""
# filetutils isn't importable in appengine, so import locally
from logilab.common.fileutils import ensure_fs_mode
ensure_fs_mode(self.file, S_IWRITE)
self.write(codecs.open(self.file, 'w', encoding='utf-8'))
self.write(codecs.open(self.file, "w", encoding="utf-8"))
def write(self, stream: StringIO = sys.stdout) -> None:
"""write changelog to stream"""
......
......@@ -42,6 +42,7 @@ class BadCommandUsage(Exception):
Trigger display of command usage.
"""
class CommandError(Exception):
"""Raised when a command can't be processed and we want to display it and
exit, without traceback nor usage displayed.
......@@ -50,6 +51,7 @@ class CommandError(Exception):
# command line access point ####################################################
class CommandLine(dict):
"""Usage:
......@@ -77,9 +79,17 @@ class CommandLine(dict):
* `logger`, logger to propagate to commands, default to
`logging.getLogger(self.pgm))`
"""
def __init__(self, pgm=None, doc=None, copyright=None, version=None,
rcfile=None, logthreshold=logging.ERROR,
check_duplicated_command=True):
def __init__(
self,
pgm=None,
doc=None,
copyright=None,
version=None,
rcfile=None,
logthreshold=logging.ERROR,
check_duplicated_command=True,
):
if pgm is None:
pgm = basename(sys.argv[0])
self.pgm = pgm
......@@ -93,8 +103,9 @@ class CommandLine(dict):
def register(self, cls, force=False):
"""register the given :class:`Command` subclass"""
assert not self.check_duplicated_command or force or not cls.name in self, \
'a command %s is already defined' % cls.name
assert not self.check_duplicated_command or force or not cls.name in self, (
"a command %s is already defined" % cls.name
)
self[cls.name] = cls
return cls
......@@ -107,20 +118,22 @@ class CommandLine(dict):
Terminate by :exc:`SystemExit`
"""
init_log(debug=True, # so that we use StreamHandler
logthreshold=self.logthreshold,
logformat='%(levelname)s: %(message)s')
init_log(
debug=True, # so that we use StreamHandler
logthreshold=self.logthreshold,
logformat="%(levelname)s: %(message)s",
)
try:
arg = args.pop(0)
except IndexError:
self.usage_and_exit(1)
if arg in ('-h', '--help'):
if arg in ("-h", "--help"):
self.usage_and_exit(0)
if self.version is not None and arg in ('--version'):
if self.version is not None and arg in ("--version"):
print(self.version)
sys.exit(0)
rcfile = self.rcfile
if rcfile is not None and arg in ('-C', '--rc-file'):
if rcfile is not None and arg in ("-C", "--rc-file"):
try:
rcfile = args.pop(0)
arg = args.pop(0)
......@@ -129,19 +142,19 @@ class CommandLine(dict):
try:
command = self.get_command(arg)
except KeyError:
print('ERROR: no %s command' % arg)
print("ERROR: no %s command" % arg)
print()
self.usage_and_exit(1)
try:
sys.exit(command.main_run(args, rcfile))
except KeyboardInterrupt as exc:
print('Interrupted', end=' ')
print("Interrupted", end=" ")
if str(exc):
print(': %s' % exc, end=' ')
print(": %s" % exc, end=" ")
print()
sys.exit(4)
except BadCommandUsage as err:
print('ERROR:', err)
print("ERROR:", err)
print()
print(command.help())
sys.exit(1)
......@@ -166,32 +179,44 @@ class CommandLine(dict):
"""display usage for the main program (i.e. when no command supplied)
and exit
"""
print('usage:', self.pgm, end=' ')
print("usage:", self.pgm, end=" ")
if self.rcfile:
print('[--rc-file=<configuration file>]', end=' ')
print('<command> [options] <command argument>...')
print("[--rc-file=<configuration file>]", end=" ")
print("<command> [options] <command argument>...")
if self.doc:
print('\n%s' % self.doc)
print('''
print("\n%s" % self.doc)
print(
"""
Type "%(pgm)s <command> --help" for more information about a specific
command. Available commands are :\n''' % self.__dict__)
command. Available commands are :\n"""
% self.__dict__
)
max_len = max([len(cmd) for cmd in self])
padding = ' ' * max_len
padding = " " * max_len
for cmdname, cmd in sorted(self.items()):
if not cmd.hidden:
print(' ', (cmdname + padding)[:max_len], cmd.short_description())
print(" ", (cmdname + padding)[:max_len], cmd.short_description())
if self.rcfile:
print('''
print(
"""
Use --rc-file=<configuration file> / -C <configuration file> before the command
to specify a configuration file. Default to %s.
''' % self.rcfile)
print('''%(pgm)s -h/--help
display this usage information and exit''' % self.__dict__)
"""
% self.rcfile
)
print(
"""%(pgm)s -h/--help
display this usage information and exit"""
% self.__dict__
)
if self.version:
print('''%(pgm)s -v/--version
display version configuration and exit''' % self.__dict__)
print(
"""%(pgm)s -v/--version
display version configuration and exit"""
% self.__dict__
)
if self.copyright:
print('\n', self.copyright)
print("\n", self.copyright)
def usage_and_exit(self, status):
self.usage()
......@@ -200,6 +225,7 @@ to specify a configuration file. Default to %s.
# base command classes #########################################################
class Command(Configuration):
"""Base class for command line commands.
......@@ -219,8 +245,8 @@ class Command(Configuration):
* `options`, options list, as allowed by :mod:configuration
"""
arguments = ''
name = ''
arguments = ""
name = ""
# hidden from help ?
hidden = False
# max/min args, None meaning unspecified
......@@ -229,24 +255,23 @@ class Command(Configuration):
@classmethod
def description(cls):
return cls.__doc__.replace(' ', '')
return cls.__doc__.replace(" ", "")