Commit db7ef309 authored by Sylvain Thénault's avatar Sylvain Thénault
Browse files

drop common subpackage

parent 934e758a73ef
......@@ -7,47 +7,3 @@ hg stserver side and on the client side
:license: GNU Lesser General Public License, v2.1 -
from logilab.common.adbh import FunctionDescr
from cubicweb._exceptions import * # bw compat
from rql.utils import register_function, iter_funcnode_variables
class COMMA_JOIN(FunctionDescr):
supported_backends = ('postgres', 'sqlite',)
rtype = 'String'
def st_description(cls, funcnode, mainindex, tr):
return ', '.join(sorted(term.get_description(mainindex, tr)
for term in iter_funcnode_variables(funcnode)))
register_function(COMMA_JOIN) # XXX do not expose?
aggregat = True
register_function(CONCAT_STRINGS) # XXX bw compat
supported_backends = ('mysql', 'postgres', 'sqlite',)
class LIMIT_SIZE(FunctionDescr):
supported_backends = ('postgres', 'sqlite',)
rtype = 'String'
def st_description(cls, funcnode, mainindex, tr):
return funcnode.children[0].get_description(mainindex, tr)
supported_backends = ('mysql', 'postgres', 'sqlite',)
"""Common utilies to format / semd emails.
:organization: Logilab
:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
:contact: --
:license: GNU Lesser General Public License, v2.1 -
__docformat__ = "restructuredtext en"
from base64 import b64encode, b64decode
from itertools import repeat
from time import time
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
from email.MIMEImage import MIMEImage
from email.Header import Header
from socket import gethostname
except ImportError:
def gethostname(): # gae
return 'XXX'
from cubicweb.view import EntityView
from cubicweb.entity import Entity
def header(ustring):
return Header(ustring.encode('UTF-8'), 'UTF-8')
def addrheader(uaddr, uname=None):
# even if an email address should be ascii, encode it using utf8 since
# automatic tests may generate non ascii email address
addr = uaddr.encode('UTF-8')
if uname:
return '%s <%s>' % (header(uname).encode(), addr)
return addr
def construct_message_id(appid, eid, withtimestamp=True):
if withtimestamp:
addrpart = 'eid=%s&timestamp=%.10f' % (eid, time())
addrpart = 'eid=%s' % eid
# we don't want any equal sign nor trailing newlines
leftpart = b64encode(addrpart, '.-').rstrip().rstrip('=')
return '<%s@%s.%s>' % (leftpart, appid, gethostname())
def parse_message_id(msgid, appid):
if msgid[0] == '<':
msgid = msgid[1:]
if msgid[-1] == '>':
msgid = msgid[:-1]
values, qualif = msgid.split('@')
padding = len(values) % 4
values = b64decode(str(values + '='*padding), '.-')
values = dict(v.split('=') for v in values.split('&'))
fromappid, host = qualif.split('.', 1)
return None
if appid != fromappid or host != gethostname():
return None
return values
def format_mail(uinfo, to_addrs, content, subject="",
cc_addrs=(), msgid=None, references=(), config=None):
"""Sends an Email to 'e_addr' with content 'content', and subject 'subject'
to_addrs and cc_addrs are expected to be a list of email address without
assert type(content) is unicode, repr(content)
msg = MIMEText(content.encode('UTF-8'), 'plain', 'UTF-8')
# safety: keep only the first newline
subject = subject.splitlines()[0]
msg['Subject'] = header(subject)
if uinfo.get('email'):
email = uinfo['email']
elif config and config['sender-addr']:
email = unicode(config['sender-addr'])
email = u''
if uinfo.get('name'):
name = uinfo['name']
elif config and config['sender-addr']:
name = unicode(config['sender-name'])
name = u''
msg['From'] = addrheader(email, name)
if config and config['sender-addr'] and config['sender-addr'] != email:
appaddr = addrheader(config['sender-addr'], config['sender-name'])
msg['Reply-to'] = '%s, %s' % (msg['From'], appaddr)
elif email:
msg['Reply-to'] = msg['From']
if config is not None:
msg['X-CW'] = config.appid
unique_addrs = lambda addrs: sorted(set(addr for addr in addrs if addr is not None))
msg['To'] = ', '.join(addrheader(addr) for addr in unique_addrs(to_addrs))
if cc_addrs:
msg['Cc'] = ', '.join(addrheader(addr) for addr in unique_addrs(cc_addrs))
if msgid:
msg['Message-id'] = msgid
if references:
msg['References'] = ', '.join(references)
return msg
class HtmlEmail(MIMEMultipart):
def __init__(self, subject, textcontent, htmlcontent,
sendermail=None, sendername=None, recipients=None, ccrecipients=None):
MIMEMultipart.__init__(self, 'related')
self['Subject'] = header(subject)
self.preamble = 'This is a multi-part message in MIME format.'
# Attach alternative text message
alternative = MIMEMultipart('alternative')
msgtext = MIMEText(textcontent.encode('UTF-8'), 'plain', 'UTF-8')
# Attach html message
msghtml = MIMEText(htmlcontent.encode('UTF-8'), 'html', 'UTF-8')
if sendermail or sendername:
self['From'] = addrheader(sendermail, sendername)
if recipients:
self['To'] = ', '.join(addrheader(addr) for addr in recipients if addr is not None)
if ccrecipients:
self['Cc'] = ', '.join(addrheader(addr) for addr in ccrecipients if addr is not None)
def attach_image(self, data, htmlId):
image = MIMEImage(data)
image.add_header('Content-ID', '<%s>' % htmlId)
class NotificationView(EntityView):
"""abstract view implementing the "email" API (eg to simplify sending
# XXX refactor this class to work with len(rset) > 1
msgid_timestamp = True
# this is usually the method to call
def render_and_send(self, **kwargs):
"""generate and send an email message for this view"""
delayed = kwargs.pop('delay_to_commit', None)
for recipients, msg in self.render_emails(**kwargs):
if delayed is None:
self.send(recipients, msg)
elif delayed:
self.send_on_commit(recipients, msg)
self.send_now(recipients, msg)
def cell_call(self, row, col=0, **kwargs):
self.w(self._cw._(self.content) % self.context(**kwargs))
def render_emails(self, **kwargs):
"""generate and send emails for this view (one per recipient)"""
self._kwargs = kwargs
recipients = self.recipients()
if not recipients:'skipping %s notification, no recipients', self.__regid__)
if self.cw_rset is not None:
entity = self.cw_rset.get_entity(self.cw_row or 0, self.cw_col or 0)
# if the view is using timestamp in message ids, no way to reference
# previous email
if not self.msgid_timestamp:
refs = [self.construct_message_id(eid)
for eid in entity.notification_references(self)]
refs = ()
msgid = self.construct_message_id(entity.eid)
refs = ()
msgid = None
req = self._cw
self.user_data = req.user_data()
origlang = req.lang
for something in recipients:
if isinstance(something, Entity):
# hi-jack self._cw to get a session for the returned user
self._cw = self._cw.hijack_user(something)
emailaddr = something.get_email()
emailaddr, lang = something
# since the same view (eg self) may be called multiple time and we
# need a fresh stream at each iteration, reset it explicitly
self.w = None
# XXX call render before subject to set .row/.col attributes on the
# view
content = self.render(row=0, col=0, **kwargs)
subject = self.subject()
except SkipEmail:
except Exception, ex:
# shouldn't make the whole transaction fail because of rendering
# error (unauthorized or such)
msg = format_mail(self.user_data, [emailaddr], content, subject,
config=self._cw.vreg.config, msgid=msgid, references=refs)
yield [emailaddr], msg
# restore language
# recipients / email sending ###############################################
def recipients(self):
"""return a list of either 2-uple (email, language) or user entity to
who this email should be sent
# use super_session when available, we don't want to consider security
# when selecting recipients_finder
req = self._cw.super_session
except AttributeError:
req = self._cw
finder = self._cw.vreg['components'].select('recipients_finder', req,
row=self.cw_row or 0,
col=self.cw_col or 0)
return finder.recipients()
def send_now(self, recipients, msg):
self._cw.vreg.config.sendmails([(msg, recipients)])
def send_on_commit(self, recipients, msg):
raise NotImplementedError
send = send_now
# email generation helpers #################################################
def construct_message_id(self, eid):
return construct_message_id(self._cw.vreg.config.appid, eid, self.msgid_timestamp)
def format_field(self, attr, value):
return ':%(attr)s: %(value)s' % {'attr': attr, 'value': value}
def format_section(self, attr, value):
return '%(attr)s\n%(ul)s\n%(value)s\n' % {
'attr': attr, 'ul': '-'*len(attr), 'value': value}
def subject(self):
entity = self.cw_rset.get_entity(self.cw_row or 0, self.cw_col or 0)
subject = self._cw._(self.message)
etype = entity.dc_type()
eid = entity.eid
login = self.user_data['login']
return self._cw._('%(subject)s %(etype)s #%(eid)s (%(login)s)') % locals()
def context(self, **kwargs):
entity = self.cw_rset.get_entity(self.cw_row or 0, self.cw_col or 0)
for key, val in kwargs.iteritems():
if val and isinstance(val, unicode) and val.strip():
kwargs[key] = self._cw._(val)
kwargs.update({'user': self.user_data['login'],
'eid': entity.eid,
'etype': entity.dc_type(),
'url': entity.absolute_url(),
'title': entity.dc_long_title(),})
return kwargs
class SkipEmail(Exception):
"""raise this if you decide to skip an email during its generation"""
"""pre 3.6 bw compat"""
# pylint: disable-msg=W0614,W0401
from warnings import warn
warn('moved to cubicweb.mail', DeprecationWarning, stacklevel=2)
from cubicweb.mail import *
"""mixins of entity/views organized somewhat in a graph or tree structure
:organization: Logilab
:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
:contact: --
:license: GNU Lesser General Public License, v2.1 -
__docformat__ = "restructuredtext en"
from logilab.common.deprecation import deprecated
from logilab.common.decorators import cached
from cubicweb import typed_eid
from cubicweb.selectors import implements
from cubicweb.interfaces import IEmailable, ITree
class TreeMixIn(object):
"""base tree-mixin providing the tree interface
This mixin has to be inherited explicitly and configured using the
tree_attribute, parent_target and children_target class attribute to
benefit from this default implementation
tree_attribute = None
# XXX misnamed
parent_target = 'subject'
children_target = 'object'
def different_type_children(self, entities=True):
"""return children entities of different type as this entity.
according to the `entities` parameter, return entity objects or the
equivalent result set
res = self.related(self.tree_attribute, self.children_target,
if entities:
return [e for e in res if e.e_schema != self.e_schema]
return res.filtered_rset(lambda x: x.e_schema != self.e_schema, self.cw_col)
def same_type_children(self, entities=True):
"""return children entities of the same type as this entity.
according to the `entities` parameter, return entity objects or the
equivalent result set
res = self.related(self.tree_attribute, self.children_target,
if entities:
return [e for e in res if e.e_schema == self.e_schema]
return res.filtered_rset(lambda x: x.e_schema == self.e_schema, self.cw_col)
def iterchildren(self, _done=None):
if _done is None:
_done = set()
for child in self.children():
if child.eid in _done:
self.error('loop in %s tree', self.__regid__.lower())
yield child
def prefixiter(self, _done=None):
if _done is None:
_done = set()
if self.eid in _done:
yield self
for child in self.iterchildren(_done):
for entity in child.prefixiter(_done):
yield entity
except AttributeError:
def path(self):
"""returns the list of eids from the root object to this object"""
path = []
parent = self
while parent:
if parent.eid in path:
self.error('loop in %s tree', self.__regid__.lower())
# check we are not leaving the tree
if (parent.tree_attribute != self.tree_attribute or
parent.parent_target != self.parent_target):
parent = parent.parent()
except AttributeError:
return path
def iterparents(self):
def _uptoroot(self):
curr = self
while True:
curr = curr.parent()
if curr is None:
yield curr
return _uptoroot(self)
def notification_references(self, view):
"""used to control References field of email send on notification
for this entity. `view` is the notification view.
Should return a list of eids which can be used to generate message ids
of previously sent email
return self.path()[:-1]
## ITree interface ########################################################
def parent(self):
"""return the parent entity if any, else None (e.g. if we are on the
return self.related(self.tree_attribute, self.parent_target,
except (KeyError, IndexError):
return None
def children(self, entities=True, sametype=False):
"""return children entities
according to the `entities` parameter, return entity objects or the
equivalent result set
if sametype:
return self.same_type_children(entities)
return self.related(self.tree_attribute, self.children_target,
def children_rql(self):
return self.related_rql(self.tree_attribute, self.children_target)
def is_leaf(self):
return len(self.children()) == 0
def is_root(self):
return self.parent() is None
def root(self):
"""return the root object"""
return self._cw.entity_from_eid(self.path()[0])
class EmailableMixIn(object):
"""base mixin providing the default get_email() method used by
the massmailing view
NOTE: The default implementation is based on the
primary_email / use_email scheme
__implements__ = (IEmailable,)
def get_email(self):
if getattr(self, 'primary_email', None):
return self.primary_email[0].address
if getattr(self, 'use_email', None):
return self.use_email[0].address
return None
def allowed_massmail_keys(cls):
"""returns a set of allowed email substitution keys
The default is to return the entity's attribute list but an
entity class might override this method to allow extra keys.
For instance, the Person class might want to return a `companyname`
return set(rschema.type
for rschema, attrtype in cls.e_schema.attribute_definitions()
if attrtype.type not in ('Password', 'Bytes'))
def as_email_context(self):
"""returns the dictionary as used by the sendmail controller to
build email bodies.
NOTE: the dictionary keys should match the list returned by the
`allowed_massmail_keys` method.
return dict( (attr, getattr(self, attr)) for attr in self.allowed_massmail_keys() )
"""pluggable mixins system: plug classes registered in MI_REL_TRIGGERS on entity
classes which have the relation described by the dict's key.
NOTE: pluggable mixins can't override any method of the 'explicit' user classes tree
(eg without plugged classes). This includes bases Entity and AnyEntity classes.
('primary_email', 'subject'): EmailableMixIn,
('use_email', 'subject'): EmailableMixIn,
def _done_init(done, view, row, col):
"""handle an infinite recursion safety belt"""
if done is None:
done = set()
entity = view.cw_rset.get_entity(row, col)
if entity.eid in done:
msg = entity._cw._('loop in %(rel)s relation (%(eid)s)') % {
'rel': entity.tree_attribute,
'eid': entity.eid
return None, msg
return done, entity
class TreeViewMixIn(object):
"""a recursive tree view"""
__regid__ = 'tree'
item_vid = 'treeitem'
__select__ = implements(ITree)
def call(self, done=None, **kwargs):
if done is None:
done = set()
super(TreeViewMixIn, self).call(done=done, **kwargs)
def cell_call(self, row, col=0, vid=None, done=None, **kwargs):
done, entity = _done_init(done, self, row, col)
if done is None:
# entity is actually an error message
self.w(u'<li class="badcontent">%s</li>' % entity)
entity.view(vid or self.item_vid, w=self.w, **kwargs)
relatedrset = entity.children(entities=False)
self.wview(self.__regid__, relatedrset, 'null', done=done, **kwargs)
def open_item(self, entity):
self.w(u'<li class="%s">\n' % entity.__regid__.lower())
def close_item(self, entity):
class TreePathMixIn(object):
"""a recursive path view"""
__regid__ = 'path'
item_vid = 'oneline'
separator = u'&#160;&gt;&#160;'
def call(self, **kwargs):
self.w(u'<div class="pathbar">')
super(TreePathMixIn, self).call(**kwargs)
def cell_call(self, row, col=0, vid=None, done=None, **kwargs):
done, entity = _done_init(done, self, row, col)
if done is None:
# entity is actually an error message
self.w(u'<span class="badcontent">%s</span>' % entity)
parent = entity.parent()
if parent:
parent.view(self.__regid__, w=self.w, done=done)
entity.view(vid or self.item_vid, w=self.w)
class ProgressMixIn(object):
"""provide default implementations for IProgress interface methods"""
# This is an adapter isn't it ?
def cost(self):
return self.progress_info()['estimated']
def revised_cost(self):
return self.progress_info().get('estimatedcorrected', self.cost)