Commit 36acbda1 authored by Laurent Peuch's avatar Laurent Peuch
Browse files

style: black whole project

parent 29be52b1eef2
......@@ -19,12 +19,12 @@
modname = "mtconverter"
distname = "logilab-mtconverter"
subpackage_of = 'logilab'
subpackage_of = "logilab"
numversion = (0, 9, 0)
version = '.'.join([str(num) for num in numversion])
version = ".".join([str(num) for num in numversion])
license = 'LGPL'
license = "LGPL"
web = "http://www.logilab.org/project/%s" % distname
mailinglist = "mailto://python-projects@lists.logilab.org"
......@@ -33,11 +33,11 @@ author = "Sylvain Thenault"
author_email = "contact@logilab.fr"
install_requires = [
'setuptools',
'logilab-common',
'lxml',
'html2text',
]
"setuptools",
"logilab-common",
"lxml",
"html2text",
]
classifiers = [
"Programming Language :: Python",
......
__import__('pkg_resources').declare_namespace(__name__)
__import__("pkg_resources").declare_namespace(__name__)
......@@ -32,6 +32,7 @@ __docformat__ = "restructuredtext en"
import locale
import mimetypes
import re
try:
maketrans = bytes.maketrans
except AttributeError:
......@@ -42,7 +43,8 @@ from io import BytesIO
from html.entities import name2codepoint
import pkg_resources
__version__ = pkg_resources.get_distribution('logilab-mtconverter').version
__version__ = pkg_resources.get_distribution("logilab-mtconverter").version
try:
import chardet
......@@ -55,19 +57,20 @@ try:
except locale.Error:
DEFAULT_ENCODING = locale.getpreferredencoding(do_setlocale=False)
BINARY_ENCODINGS = set(('gzip', 'bzip2', 'base64'))
BINARY_ENCODINGS = set(("gzip", "bzip2", "base64"))
TEXT_MIMETYPES = set(('application/xml', 'application/xhtml+xml'))
TEXT_MIMETYPES = set(("application/xml", "application/xhtml+xml"))
UNICODE_POLICY = 'strict'
UNICODE_POLICY = "strict"
_CHARSET_DECL_RGX = '(?:charset|(?:(?:en)?coding))[=:\s"\']*([^\s"\']*)'.encode('ascii')
_CHARSET_DECL_RGX = "(?:charset|(?:(?:en)?coding))[=:\s\"']*([^\s\"']*)".encode("ascii")
CHARSET_DECL_RGX = re.compile(_CHARSET_DECL_RGX, re.I | re.S)
CHARSET_DECL_SEARCH_SIZE = 512
CHARDET_MIN_SIZE = 20
CHARDET_CONFIDENCE_THRESHOLD = 0.75
def need_guess(mimetype, encoding):
"""return True if we can complete given mimetype / encoding information"""
if not mimetype:
......@@ -76,45 +79,53 @@ def need_guess(mimetype, encoding):
return True
return False
def is_text_mimetype(mimetype):
return (mimetype.startswith('text/') or mimetype in TEXT_MIMETYPES)
return mimetype.startswith("text/") or mimetype in TEXT_MIMETYPES
def guess_encoding(buffer, fallbackencoding=None):
"""try to guess encoding from a buffer"""
if hasattr(buffer, 'getvalue'): # may be a StringIO
if hasattr(buffer, "getvalue"): # may be a StringIO
buffer = buffer.getvalue()
# try to get a character set declaration
m = CHARSET_DECL_RGX.search(buffer[:CHARSET_DECL_SEARCH_SIZE])
if m is not None:
guessed = m.group(1).decode('ascii')
guessed = m.group(1).decode("ascii")
try:
# ensure encoding is known by python
codecs.lookup(guessed)
return guessed
except LookupError:
pass
if buffer.lstrip().startswith('<?xml'.encode('ascii')):
if buffer.lstrip().startswith("<?xml".encode("ascii")):
# xml files with no encoding declaration default to UTF-8
return 'UTF-8'
return "UTF-8"
# use text analysis if enough data
if chardet is not None and len(buffer) > CHARDET_MIN_SIZE:
detected = chardet.detect(buffer)
if detected['confidence'] >= CHARDET_CONFIDENCE_THRESHOLD:
return detected['encoding']
if detected["confidence"] >= CHARDET_CONFIDENCE_THRESHOLD:
return detected["encoding"]
return fallbackencoding or DEFAULT_ENCODING
def guess_mimetype_and_encoding(format=None, encoding=None, data=None,
filename=None, fallbackencoding=None,
fallbackmimetype=u'application/octet-stream'):
if format and format.split('/')[-1] in BINARY_ENCODINGS:
format = None # try to do better
def guess_mimetype_and_encoding(
format=None,
encoding=None,
data=None,
filename=None,
fallbackencoding=None,
fallbackmimetype=u"application/octet-stream",
):
if format and format.split("/")[-1] in BINARY_ENCODINGS:
format = None # try to do better
if filename and not format:
format, enc = mimetypes.guess_type(filename)
if format:
if not encoding:
encoding = enc
elif enc:
format = u'application/%s' % enc
format = u"application/%s" % enc
else:
format = fallbackmimetype
if not encoding and data and format and is_text_mimetype(format):
......@@ -123,40 +134,50 @@ def guess_mimetype_and_encoding(format=None, encoding=None, data=None,
CONTROL_CHARS = [bytes((ci,)) for ci in range(32)]
TR_CONTROL_CHARS = [' '] * len(CONTROL_CHARS)
for c in ('\n', '\r', '\t'):
TR_CONTROL_CHARS = [" "] * len(CONTROL_CHARS)
for c in ("\n", "\r", "\t"):
TR_CONTROL_CHARS[ord(c)] = c
TR_CONTROL_CHARS[ord('\f')] = '\n'
TR_CONTROL_CHARS[ord('\v')] = '\n'
TR_CONTROL_CHARS = [c.encode('ascii') for c in TR_CONTROL_CHARS]
ESC_CAR_TABLE = maketrans(''.encode('ascii').join(CONTROL_CHARS),
''.encode('ascii').join(TR_CONTROL_CHARS))
ESC_UCAR_TABLE = ESC_CAR_TABLE.decode('latin1')
TR_CONTROL_CHARS[ord("\f")] = "\n"
TR_CONTROL_CHARS[ord("\v")] = "\n"
TR_CONTROL_CHARS = [c.encode("ascii") for c in TR_CONTROL_CHARS]
ESC_CAR_TABLE = maketrans(
"".encode("ascii").join(CONTROL_CHARS), "".encode("ascii").join(TR_CONTROL_CHARS)
)
ESC_UCAR_TABLE = ESC_CAR_TABLE.decode("latin1")
# XXX deprecate at some point (once less used :)
#@obsolete('use xml_escape')
# @obsolete('use xml_escape')
def html_escape(data):
return xml_escape(data)
def xml_escape(data):
"""escapes XML forbidden characters in attributes and PCDATA"""
if isinstance(data, str):
data = data.translate(ESC_UCAR_TABLE)
else:
data = data.translate(ESC_CAR_TABLE)
return (data.replace('&','&amp;').replace('<','&lt;').replace('>','&gt;')
.replace('"','&quot;').replace("'",'&#39;'))
return (
data.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#39;")
)
def html_unescape(data):
"""unescapes XML/HTML entities"""
for entityname, codepoint in name2codepoint.items():
data = data.replace('&%s;' % entityname, chr(codepoint))
return data.replace('&#39;', "'")
data = data.replace("&%s;" % entityname, chr(codepoint))
return data.replace("&#39;", "'")
class TransformData(object):
"""wrapper arround transformed data to add extra infos such as MIME
type and encoding in case it applies
"""
def __init__(self, data, mimetype, encoding=None, **kwargs):
self.__dict__.update(kwargs)
self.data = data
......@@ -177,8 +198,10 @@ class TransformData(object):
if self.encoding in BINARY_ENCODINGS:
self.binary_decode()
elif self.is_binary():
raise Exception("can't decode binary stream (mime type: %s, encoding: %s)"
% (self.mimetype, self.encoding))
raise Exception(
"can't decode binary stream (mime type: %s, encoding: %s)"
% (self.mimetype, self.encoding)
)
if self.encoding:
encoding = self.encoding
else:
......@@ -187,32 +210,35 @@ class TransformData(object):
def encode(self, encoding=None):
"""return the data as an encoded string"""
if (encoding is None or self.encoding == encoding) and \
isinstance(self.data, bytes):
if (encoding is None or self.encoding == encoding) and isinstance(
self.data, bytes
):
return self.data
encoding = encoding or self.encoding or 'utf8'
encoding = encoding or self.encoding or "utf8"
return self.decode().encode(encoding)
def is_binary(self):
return (not is_text_mimetype(self.mimetype)
or self.encoding in BINARY_ENCODINGS)
return not is_text_mimetype(self.mimetype) or self.encoding in BINARY_ENCODINGS
def check_encoding(self):
if is_text_mimetype(self.mimetype) and self.is_binary():
raise TransformError()
def binary_decode(self):
if self.encoding == 'gzip':
if self.encoding == "gzip":
import gzip
stream = gzip.GzipFile(fileobj=BytesIO(self.data))
self.data = stream.read()
self.encoding = guess_encoding(self.data)
elif self.encoding == 'bzip2':
elif self.encoding == "bzip2":
import bz2
self.data = bz2.decompress(BytesIO(self.data)) # StringIO or not?
self.data = bz2.decompress(BytesIO(self.data)) # StringIO or not?
self.encoding = guess_encoding(self.data)
elif self.encoding == 'base64':
elif self.encoding == "base64":
import base64
self.data = base64.decodestring(self.data)
self.encoding = guess_encoding(self.data)
......@@ -220,9 +246,12 @@ class TransformData(object):
class MtConverterError(Exception):
"""base class for this package's errors"""
class MissingBinary(MtConverterError):
"""raised when a system binary on whic rely a transform has not been found
"""
class TransformError(MtConverterError):
"""raised when something can't be transformed due to missing necessary
transforms
......@@ -258,12 +287,18 @@ def register_pygments_transforms(engine, verb=True):
def register_base_transforms(engine, verb=True):
from logilab.mtconverter.transforms import cmdtransforms, text_to_text, \
xml_to_text, text_to_html, xlog_to_html
from logilab.mtconverter.transforms import (
cmdtransforms,
text_to_text,
xml_to_text,
text_to_html,
xlog_to_html,
)
from logilab.mtconverter.transforms.python import python_to_html
from logilab.mtconverter.transforms.htmltransform import html_to_formatted_text
from logilab.mtconverter.transforms.odt2text import odt_to_unformatted_text
from logilab.mtconverter.transforms.pgpsignature import pgpsignature_to_text
engine.add_transform(text_to_text())
engine.add_transform(xml_to_text())
engine.add_transform(text_to_html())
......
......@@ -23,13 +23,14 @@ from logilab.mtconverter.transform import TransformsChain
def split_mimetype(mimetype):
try:
main, sub = mimetype.split('/')
main, sub = mimetype.split("/")
except ValueError:
raise TransformError('bad mime type %s' % mimetype)
raise TransformError("bad mime type %s" % mimetype)
if not (main and sub):
raise TransformError('bad mime type %s' % mimetype)
raise TransformError("bad mime type %s" % mimetype)
return main, sub
class TransformEngine(object):
"""mimetype oriented conversions engine"""
......@@ -73,10 +74,12 @@ class TransformEngine(object):
if not path:
if trdata.mimetype == targetmimetype:
return trdata
raise TransformError('no transformation path from %s to %s'
% (trdata.mimetype, targetmimetype))
raise TransformError(
"no transformation path from %s to %s"
% (trdata.mimetype, targetmimetype)
)
if len(path) > 1:
transform = TransformsChain('aname', path)
transform = TransformsChain("aname", path)
else:
transform = path[0]
return transform.convert(trdata)
......@@ -84,14 +87,16 @@ class TransformEngine(object):
def _map_transform(self, transform):
"""map transform to internal structures"""
if not (transform.inputs and transform.output):
raise TransformError('transform is missing input or output')
if split_mimetype(transform.output)[1] == '*':
raise TransformError('bad output mime type, wildcard only allowed in inputs')
raise TransformError("transform is missing input or output")
if split_mimetype(transform.output)[1] == "*":
raise TransformError(
"bad output mime type, wildcard only allowed in inputs"
)
if transform.name in self.transforms:
raise TransformError('a transform named %s already exists' % transform.name)
raise TransformError("a transform named %s already exists" % transform.name)
for mt in transform.inputs:
main, sub = split_mimetype(mt)
if sub == '*':
if sub == "*":
inmap = self._mtmainmap.setdefault(main, {})
else:
inmap = self._mtmap.setdefault(mt, {})
......@@ -107,7 +112,7 @@ class TransformEngine(object):
inputs = transform.inputs
for mt in inputs:
main, sub = split_mimetype(mt)
if sub == '*':
if sub == "*":
inmap = self._mtmainmap[main]
else:
inmap = self._mtmap[mt]
......@@ -173,4 +178,3 @@ class TransformEngine(object):
if required:
requirements.append(name)
path.pop()
......@@ -24,6 +24,7 @@ class Transform(object):
"""a transform is converting some content in a acceptable MIME type
into another MIME type
"""
name = None
inputs = ()
output = None
......@@ -32,7 +33,7 @@ class Transform(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
if not getattr(self, 'name', None):
if not getattr(self, "name", None):
self.name = self.__class__.__name__
def convert(self, trdata):
......@@ -42,7 +43,7 @@ class Transform(object):
:rtype: `TransformData`
"""
# this is not true when transform accept wildcard
#assert trdata.mimetype in self.inputs
# assert trdata.mimetype in self.inputs
trdata.data = self._convert(trdata)
trdata.mimetype = self.output
if self.output_encoding:
......@@ -56,8 +57,8 @@ class Transform(object):
class TransformsChain(list):
"""A chain of transforms used to transform data"""
inputs = ('application/octet-stream',)
output = 'application/octet-stream'
inputs = ("application/octet-stream",)
output = "application/octet-stream"
name = None
def __init__(self, name=None, *args):
......@@ -96,8 +97,8 @@ class TransformsChain(list):
self.inputs = self[0].inputs
self.output = self[-1].output
for i in range(len(self)):
if hasattr(self[-i-1], 'output_encoding'):
self.output_encoding = self[-i-1].output_encoding
if hasattr(self[-i - 1], "output_encoding"):
self.output_encoding = self[-i - 1].output_encoding
break
else:
try:
......
......@@ -37,73 +37,78 @@ import re
from logilab.mtconverter import xml_escape
from logilab.mtconverter.transform import Transform
class IdentityTransform(Transform):
"""identity transform: leave the content unchanged"""
def _convert(self, trdata):
return trdata.data
class text_to_text(IdentityTransform):
inputs = ('text/*',)
output = 'text/plain'
inputs = ("text/*",)
output = "text/plain"
class rest_to_text(Transform):
inputs = ('text/rest', 'text/x-rst')
output = 'text/plain'
inputs = ("text/rest", "text/x-rst")
output = "text/plain"
def _convert(self, trdata):
res = []
for line in trdata.data.splitlines():
sline = line.lstrip()
if sline.startswith('.. '):
if sline.startswith(".. "):
continue
res.append(line)
return '\n'.join(res)
return "\n".join(res)
_TAG_PROG = re.compile(r"</?.*?>", re.U)
_TAG_PROG = re.compile(r'</?.*?>', re.U)
class xml_to_text(Transform):
inputs = ('application/xml',)
output = 'text/plain'
inputs = ("application/xml",)
output = "text/plain"
def _convert(self, trdata):
return _TAG_PROG.sub(' ', trdata.data)
return _TAG_PROG.sub(" ", trdata.data)
class text_to_html(Transform):
inputs = ('text/plain',)
output = 'text/html'
inputs = ("text/plain",)
output = "text/html"
def _convert(self, trdata):
res = ['<p>']
res = ["<p>"]
for line in trdata.data.splitlines():
line = line.strip()
if not line:
if not res[-1].endswith('<p>'):
res.append('</p><p>')
if not res[-1].endswith("<p>"):
res.append("</p><p>")
else:
res.append(xml_escape(line) + '<br/>')
res.append('</p>')
return '\n'.join(res)
res.append(xml_escape(line) + "<br/>")
res.append("</p>")
return "\n".join(res)
class text_to_html_pre(Transform):
"""variant for text 2 html transformation : simply wrap text into pre tags
"""
inputs = ('text/plain',)
output = 'text/html'
inputs = ("text/plain",)
output = "text/html"
def _convert(self, trdata):
res = ['<pre>']
res = ["<pre>"]
res.append(xml_escape(trdata.data))
res.append('</pre>')
return '\n'.join(res)
res.append("</pre>")
return "\n".join(res)
class xlog_to_html(Transform):
inputs = ('text/x-log',)
output = 'text/html'
inputs = ("text/x-log",)
output = "text/html"
def _convert(self, trdata):
return '\n'.join([xml_escape(x)+'<BR/>' for x in trdata.data.splitlines()])
return "\n".join([xml_escape(x) + "<BR/>" for x in trdata.data.splitlines()])
......@@ -22,22 +22,24 @@ import subprocess
from logilab.mtconverter import MissingBinary
from logilab.mtconverter.transform import Transform
bin_search_path = [path for path in os.environ['PATH'].split(os.pathsep)
if os.path.isdir(path)]
bin_search_path = [
path for path in os.environ["PATH"].split(os.pathsep) if os.path.isdir(path)
]
def bin_search(binary):
"""search the bin_search_path for a given binary returning its fullname or
raises MissingBinary"""
result = None
mode = os.R_OK | os.X_OK
mode = os.R_OK | os.X_OK
for path in bin_search_path:
pathbin = os.path.join(path, binary)
if os.access(pathbin, mode) == 1:
return pathbin
break
raise MissingBinary('Unable to find binary "%s" in %s' %
(binary, os.pathsep.join(bin_search_path)))
raise MissingBinary(
'Unable to find binary "%s" in %s' % (binary, os.pathsep.join(bin_search_path))
)
class POpenTransform(Transform):
......@@ -52,10 +54,9 @@ class POpenTransform(Transform):
cmdargs = ""
use_stdin = True
input_encoding = None
#output_encoding = 'utf-8'
# output_encoding = 'utf-8'
def __init__(self, name=None, binary=None, cmdargs=None, use_stdin=None,
**kwargs):
def __init__(self, name=None, binary=None, cmdargs=None, use_stdin=None, **kwargs):
if name is not None:
self.name = name
if binary is not None:
......@@ -69,19 +70,24 @@ class POpenTransform(Transform):
def _command_line(self, trdata):
return "%s %s" % (self.binary, self.cmdargs)
def _convert(self, trdata):
command = self._command_line(trdata)
data = trdata.encode(self.input_encoding)
if not self.use_stdin:
tmpfile, tmpname = mkstemp(text=False) # create tmp
os.write(tmpfile, data) # write data to tmp using a file descriptor
os.close(tmpfile) # close it so the other process can read it
command = command % {'infile' : tmpname} # apply tmp name to command
tmpfile, tmpname = mkstemp(text=False) # create tmp
os.write(tmpfile, data) # write data to tmp using a file descriptor
os.close(tmpfile) # close it so the other process can read it
command = command % {"infile": tmpname} # apply tmp name to command
data = None
cmd = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, close_fds=True)
cmd = subprocess.Popen(
command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True,
)
out, _ = cmd.communicate(data)
if not self.use_stdin:
# remove tmp file
......@@ -91,9 +97,9 @@ class POpenTransform(Transform):
class pdf_to_text(POpenTransform):
name = "pdf_to_text"
inputs = ('application/pdf',)
output = 'text/plain'