Commit df9d9fb6 authored by Julien Cristau's avatar Julien Cristau
Browse files

kill pyro-based mbox importer

parent 3a28497f4fab
"""cubicweb-ctl plugin providing the mboximport command
:organization: Logilab
:copyright: 2007-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
__docformat__ = "restructuredtext en"
import sys
from os.path import isdir
from cStringIO import StringIO
from cubicweb.toolsutils import CONNECT_OPTIONS, Command
from cubicweb.utils import admincnx
from cubicweb.cwctl import CWCTL
from cubes.email.mboximport import MBOXImporter
class MBOXImportCommand(Command):
"""Import files using the Unix mail box format into an cubicweb instance.
The instance must use the email package.
<pyro id>
pyro identifier of the instance where emails have to be imported.
<mbox file>
path to a file using the Unix MBOX format. If "-" is given, stdin is read.
"""
name = 'mboximport'
arguments = '<pyro id> <mbox file>...'
min_args = 2
options = CONNECT_OPTIONS + (
("interactive",
{'short': 'i', 'action' : 'store_true',
'default': False,
'help': 'ask confirmation to continue after an error.',
}),
("skip-sign",
{'short': 's', 'action' : 'store_true',
'default': False,
'help': 'skip email signature.',
}),
)
autocommit = True
def import_mbox_files(self, importer, filenames):
"""process `filenames` with `importer` and create corresponding
Email / EmailThread / etc. objects in the database.
"""
for fpath in filenames:
if fpath == '-':
stream = StringIO(sys.stdin.read())
importer.import_mbox_stream(stream)
elif isdir(fpath):
importer.import_maildir(fpath)
else:
importer.import_mbox(fpath)
if importer.error:
print 'failed to import the following messages:'
print '\n'.join(importer.error)
sys.exit(1)
def connect(self, appid):
"""create a connection to `appid`"""
cnx = admincnx(appid)
return cnx
def create_importer(self, cnx):
"""factory to instantiate the mbox importer"""
return MBOXImporter(cnx, verbose=True,
interactive=self.config.interactive,
skipsign=self.config.skip_sign,
autocommit=self.autocommit)
def run(self, args):
"""run the command with its specific arguments"""
with self.connect(args.pop(0)) as cnx:
importer = self.create_importer(cnx)
try:
self.import_mbox_files(importer, args)
except:
# without a correct connection handling we exhaust repository's
# connections pool.
# the repository should be more resilient against bad clients !
cnx.rollback()
raise
cnx.commit()
CWCTL.register(MBOXImportCommand)
"""import an mbox or a single email into an cubicweb application
:organization: Logilab
:copyright: 2007-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
__docformat__ = "restructuredtext en"
import mailbox
from itertools import combinations
from rfc822 import parsedate
from logilab.common.umessage import message_from_file
from cubicweb import Binary
class StreamMailbox(mailbox.mbox):
"""A read-only mbox format mailbox from stream."""
_mangle_from_ = True
def __init__(self, stream, factory=None, create=True):
"""Initialize a stream mailbox."""
self._message_factory = mailbox.mboxMessage
mailbox.Mailbox.__init__(self, '', factory, create)
self._file = stream
self._toc = None
self._next_key = 0
self._pending = False # No changes require rewriting the file.
self._locked = False
self._file_length = None # Used to record mailbox size
class MBOXImporter(object):
"""import content of a Unix mailbox into cubicweb as Email (and related) objects"""
def __init__(self, cnx, verbose=False, interactive=False,
skipsign=False, autocommit=False):
self.cnx = cnx
self.schema = cnx.get_schema()
self.execute = cnx.execute
self._verbose = verbose
self._interactive = interactive
self._skipsign = skipsign
self.created = {}
self.skipped = []
self.error = []
self.autocommit = autocommit
def autocommit_mode(self):
self.autocommit = True
def _notify_created(self, etype, eid):
if self._verbose:
print 'create', etype, eid
self.created.setdefault(etype, []).append(eid)
def _notify_skipped(self, messageid):
if self._verbose:
print 'skipping', messageid
self.skipped.append(messageid)
def import_mbox_stream(self, stream):
self._import(StreamMailbox(stream, message_from_file, create=False))
def import_mbox(self, path):
self._import(mailbox.mbox(path, message_from_file, create=False))
def import_maildir(self, path):
self._import(mailbox.Maildir(path, message_from_file, create=False))
def _import(self, mailbox):
for message in sorted(mailbox, key=lambda x:parsedate(x['Date'])):
try:
self.import_message(message)
if self.autocommit:
self.cnx.commit()
except Exception, ex:
import traceback
traceback.print_exc()
if self.autocommit:
msgid = message.get('message-id')
self.error.append(msgid)
self.cnx.rollback()
if self._interactive:
quest = 'failed to import message %s (%s). Continue [N/y] ?\n'
if raw_input(quest % (msgid, ex)).lower() != 'y':
break
else:
raise
def import_message(self, message):
# check this email hasn't been imported
msgid = message.get('message-id')
rset = self.execute('Email X WHERE X messageid %(id)s', {'id': msgid})
if rset:
self._notify_skipped(msgid)
return
# Email entity
# don't use the UMessage's headers() so the decoding can be done on the server side
headers = u'\n'.join(u'%s: %s' % header for header in message.message.items())
email = self.cnx.create_entity('Email', messageid=msgid,
headers=headers)
self._notify_created('email', email.eid)
# link to mailing list
self.mailinglist_link(message, email.eid)
# link to replied email if any
self._part_index = 0
self._context = None
self._alternatives = []
self.import_message_parts(message, email.eid)
def mailinglist_link(self, message, eid):
if not 'MailingList' in self.schema:
return
try:
listid = message.multi_addrs('list-id')[0][1]
except IndexError:
return
mlrset = self.execute('MailingList X WHERE X mlid %(id)s', {'id': listid})
if mlrset:
self.execute('SET X sent_on Y WHERE X eid %(x)s, Y eid %(y)s',
{'x': eid, 'y': mlrset[0][0]})
def import_message_parts(self, message, emaileid):
# XXX only parts and attachments are used not content, alternative...
if message.is_multipart():
self._context = message.get_content_type().split('/')[1]
if self._context == 'alternative':
self._alternatives.append([])
for part in message.get_payload():
self.import_message_parts(part, emaileid)
if self._context == 'alternative':
alternatives = self._alternatives.pop()
for eid1, eid2 in combinations(alternatives, 2):
self.execute('SET X alternative Y WHERE X eid %(x)s, Y eid %(y)s',
{'x': eid1, 'y': eid2})
self._context = None
else:
self._import_message_part(message, emaileid)
def _import_message_part(self, part, emaileid):
"""finally import a non multipart message (ie non MIME message or a
not compound part of a MIME message
"""
assert not part.is_multipart()
contenttype = part.get_content_type()
main, sub = contenttype.split('/')
data = part.get_payload(decode=True)
if main == 'text':
encoding = u'UTF-8'
elif contenttype == 'application/pgp-signature':
if self._skipsign:
return
encoding = u'ascii'
if isinstance(data, str):
data = unicode(data, encoding)
else:
encoding = None
name = part.get_filename()
if name or main != 'text' and contenttype != 'application/pgp-signature':
# suppose if we have a name, this is an attachement else this is a
# part/alternative
name = name or u'no name'
if isinstance(data, unicode):
data = data.encode('utf8')
epart = self.cnx.create_entity('File', data=Binary(data),
data_name=name,
data_format=contenttype,
data_encoding=encoding)
self._notify_created('file', epart.eid)
self.execute('SET X attachment Y WHERE X eid %(x)s, Y eid %(y)s',
{'x': emaileid, 'y': epart.eid})
else:
self._part_index += 1
epart = self.cnx.create_entity('EmailPart',
content=data,
content_format=contenttype,
ordernum=self._part_index)
self._notify_created('emailpart', epart.eid)
self.execute('SET X parts Y WHERE X eid %(x)s, Y eid %(y)s',
{'x': emaileid, 'y': epart.eid})
if self._context == 'alternative':
self._alternatives[-1].append(epart.eid)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment