Commit e600bb15 authored by Denis Laxalde's avatar Denis Laxalde
Browse files

Implement CKANPublishable adapter and entity creation and update hooks

Essentially move code from an application cube and add tests for hooks.

Closes #4502966.
parent 14cd4f8f6aa2
Publish data to a CKAN instance
This cube enables data publishing to a CKAN opendata portal.
The cube essentially provides an adapter ``ICKANPublishable`` which relies on
a ``ckan_dataset_id`` attribute on the adapted entity. As its name suggests,
the latter is used to relate a CubicWeb entity to a CKAN dataset (the
fundamental entity type in CKAN data model). Most of the synchronization to
the CKAN instance is done in hooks so usually one would set tight security
rules for this attribute, e.g.:
class MyEntity(EntityType):
ckan_dataset_id = String(
description=_('identifier of corresponding CKAN dataset'),
__permissions__={'read': ('managers', 'users', 'guests'),
'add': (),
'update': ()},
......@@ -13,7 +13,9 @@ author_email = ''
description = 'Publish data to a CKAN instance'
web = '' % distname
__depends__ = {'cubicweb': '>= 3.19.5'}
__depends__ = {'cubicweb': '>= 3.19.5',
'requests': None,
__recommends__ = {}
classifiers = [
......@@ -21,6 +21,7 @@ BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-buildroot
BuildRequires: %{python} %{python}-setuptools
Requires: cubicweb >= 3.19.5
Requires: %{python}-requests
Publish data to a CKAN instance
......@@ -10,6 +10,7 @@ Package: cubicweb-ckanpublish
Architecture: all
cubicweb-common (>= 3.19.5),
Description: Publish data to a CKAN instance
......@@ -15,3 +15,101 @@
# with this program. If not, see <>.
"""cubicweb-ckanpublish entity's classes"""
import re
import unicodedata
from cubicweb.predicates import relation_possible
from cubicweb.view import EntityAdapter
from cubes.ckanpublish.utils import ckan_post
def slugify(value):
"""Converts to lowercase, removes non-word characters (alphanumerics and
underscores) and converts spaces to hyphens. Also strips leading and
trailing whitespace.
Adapted from django.utils.text and novaclient.utils.
if not isinstance(value, unicode):
value = unicode(value)
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
value = unicode(re.sub('[^\w\s-]', '', value).strip().lower())
return re.sub('[-\s]+', '-', value)
class CKANPublishableAdapter(EntityAdapter):
"""Adapter for entity that can be mapped to a CKAN dataset"""
__regid__ = 'ICKANPublishable'
__select__ = (EntityAdapter.__select__ &
relation_possible('ckan_dataset_id', role='subject'))
def ckan_name(self):
"""name field suitable for CKAN (must be unique, hence the eid prefix)
mainattr = self.entity.e_schema.main_attribute().type
name = getattr(self.entity, mainattr)
return str(self.entity.eid) + '-' + slugify(name).lower()
def ckan_notes(self):
"""Build the CKAN dataset notes attribute
This contains the entity description along with the list of related
if 'description' not in self.entity.e_schema.subject_relations():
# XXX CKAN uses Markdown syntax for notes field.
return self.entity.printable_value('description', format='text/plain')
def ckan_get_organization_id(self, orgname):
"""Retrieve the ID of an organization given its name"""
data = {'organizations': [orgname],
'all_fields': True}
res = ckan_post(self._cw.vreg.config, 'organization_list', data)
if res:
return res[0]['id']
raise Exception('no organization named %s in CKAN instance' %
def dataset_title(self):
"""Title of the CKAN dataset"""
return self.entity.dc_title()
def ckan_data(self):
"""Return a dict with all data to build a CKAN dataset from entity"""
data = {'name': self.ckan_name,
'title': self.dataset_title(),
'notes': self.ckan_notes(),
'maintainer': None,
'maintainer_email': None,
orgname = self._cw.vreg.config.get('ckan-organization')
data['owner_org'] = self.ckan_get_organization_id(orgname)
maintainer = self.dataset_maintainer()
if maintainer:
data['maintainer'] = maintainer.dc_long_title()
if maintainer.primary_email:
data['maintainer_email'] = maintainer.primary_email[0].address
data['tags'] = list(self.dataset_tags())
data['extras'] = list(self.dataset_extras())
return data
def dataset_extras(self):
"""Extra fields for the dataset"""
return []
def dataset_tags(self):
"""Yield tag data for entity"""
if self.entity.e_schema.has_relation('tags', role='object'):
for tag in self.entity.reverse_tags:
yield {'name': slugify(}
def dataset_maintainer(self):
"""May return a CWUser entity corresponding to the maintainer of
dataset-like entity.
return None
......@@ -15,3 +15,89 @@
# with this program. If not, see <>.
"""cubicweb-ckanpublish specific hooks and operations"""
from requests.exceptions import RequestException
from cubicweb import ValidationError
from cubicweb.predicates import adaptable, score_entity
from cubicweb.server import hook
from cubes.ckanpublish.utils import (ckan_post, CKANPostError,
def create_dataset(config, eid, data):
"""Create a CKAN dataset and set `ckan_dataset_id` attribute or
respective entity. Return the dataset id.
res = ckan_post(config, 'package_create', data)
return res['id']
except (CKANPostError, RequestException) as exc:
raise ValidationError(eid, {'ckan_dataset_id': unicode(exc)})
def update_dataset(config, eid, datasetid, udata):
"""Update an existing CKAN dataset"""
data = ckan_post(config, 'package_show', {'id': datasetid})
ckan_post(config, 'package_update', data)
except (CKANPostError, RequestException) as exc:
raise ValidationError(eid, {'ckan_dataset_id': unicode(exc)})
def delete_dataset(config, eid, datasetid):
"""Delete a CKAN dataset"""
ckan_post(config, 'package_delete', {'id': datasetid})
except (CKANPostError, RequestException) as exc:
raise ValidationError(eid, {'ckan_dataset_id': unicode(exc)})
class DeleteCKANDataSetHook(hook.Hook):
"""Delete CKAN dataset upon deletion of the corresponding entity"""
__regid__ = 'ckanpublish.delete-ckan-dataset'
__select__ = (hook.Hook.__select__ & ckan_instance_configured &
adaptable('ICKANPublishable') &
score_entity(lambda x: x.ckan_dataset_id))
events = ('before_delete_entity', )
def __call__(self):
class AddOrUpdateCKANDataSetHook(hook.Hook):
"""Add or update a CKAN dataset upon addition or update of an entity"""
__regid__ = 'ckanpublish.add-update-ckan-dataset'
__select__ = (hook.Hook.__select__ & ckan_instance_configured &
events = ('after_add_entity', 'after_update_entity', )
def __call__(self):
class CKANDatasetOp(hook.DataOperationMixIn, hook.Operation):
"""Operation to create, update or delete a CKAN dataset"""
def precommit_event(self):
for eid in self.get_data():
entity = self.cnx.entity_from_eid(eid)
datasetid = entity.ckan_dataset_id
config = self.cnx.vreg.config
if self.cnx.deleted_in_transaction(eid):
delete_dataset(config, eid, datasetid)'deleted CKAN dataset %s', datasetid)
data = entity.cw_adapt_to('ICKANPublishable').ckan_data()
if datasetid is not None:
update_dataset(config, eid, datasetid, data)'updated %s fields in CKAN dataset %s',
data.keys(), datasetid)
datasetid = create_dataset(config, eid, data)
'SET X ckan_dataset_id %(dsid)s WHERE X eid %(eid)s',
{'eid': eid, 'dsid': datasetid})'created CKAN dataset %s', datasetid)
from logilab.common.configuration import REQUIRED
options = (
{'type' : 'string',
'default': REQUIRED,
'help': u'base url of the CKAN instance to push data to',
'group': 'ckan', 'level': 0,
{'type' : 'string',
'default': REQUIRED,
'help': u'an API key for the CKAN instance',
'group': 'ckan', 'level': 0,
{'type' : 'string',
'default': REQUIRED,
'help': u'the organization under which dataset will be created',
'group': 'ckan', 'level': 0,
from cubicweb.predicates import is_instance
from cubes.ckanpublish.entities import CKANPublishableAdapter
class CWDataSetCKANPublish(CKANPublishableAdapter):
__select__ = CKANPublishableAdapter.__select__ & is_instance('CWDataSet')
def dataset_maintainer(self):
if self.entity.maintainer:
return self.entity.maintainer[0]
from cubicweb.server import hook
from cubes.ckanpublish.hooks import CKANDatasetOp
class AddUpdateMaintainerHook(hook.Hook):
__regid__ = 'ckanpublish-tests.add-update-maintainer'
__select__ = (hook.Hook.__select__ &
hook.match_rtype('maintainer', frometypes=('CWDataSet')))
events = ('after_add_relation', 'after_delete_relation')
def __call__(self):
from yams.buildobjs import EntityType, String, SubjectRelation
class CWDataSet(EntityType):
name = String(required=True)
description = String()
ckan_dataset_id = String(
__permissions__={'read': ('managers', 'users', 'guests'),
'add': (),
'update': ()},
maintainer = SubjectRelation('CWUser', cardinality='?*')
"""cubicweb-ckanpublish unit tests for hooks"""
from cubicweb.devtools.testlib import CubicWebTC
from cubes.ckanpublish.utils import ckan_post, CKANPostError
class CKANPublishHooksTC(CubicWebTC):
dataset_owner_org = None
def setUpClass(cls):
from ckanconfig import baseurl, apikey, organization
except ImportError:
cls.__unittest_skip__ = True
cls.__unittest_skip_why__ = 'no CKAN instance configuration found'
cls.ckan_config = {'ckan-baseurl': baseurl,
'ckan-api-key': apikey,
'ckan-organization': organization}
cls.dataset_owner_org = organization
def setup_database(self):
for k, v in self.ckan_config.items():
self.config.global_set_option(k, v)
def tearDown(self):
with self.admin_access.repo_cnx() as cnx:
# Delete Table linked to a CKAN dataset, so that the latter gets
# deleted.
# However, datasets will still have to be purge from the web ui.
cnx.execute('DELETE CWDataSet X WHERE EXISTS(X ckan_dataset_id I)')
super(CKANPublishHooksTC, self).tearDown()
def test_entity_creation(self):
with self.admin_access.repo_cnx() as cnx:
entity = cnx.create_entity('CWDataSet', name=u'buz buz ?!',
description=u'opendata buzzzz')
yield self._check_entity_create, cnx, entity
yield self._check_entity_update, cnx, entity
yield self._check_entity_delete, cnx, entity
def _check_entity_create(self, cnx, entity):
self.set_description('entity creation')
result = ckan_post(self.ckan_config, 'package_show',
{'id': entity.ckan_dataset_id})
self.assertEqual(result['name'], '%s-buz-buz' % entity.eid)
self.assertEqual(result['notes'], entity.description)
cpublish = entity.cw_adapt_to('ICKANPublishable')
organization_id = cpublish.ckan_get_organization_id(
self.assertEqual(result['owner_org'], organization_id)
def _check_entity_update(self, cnx, entity):
self.set_description('entity update')
entity.cw_set(description=u'no this is actually serious')
result = ckan_post(self.ckan_config, 'package_show',
{'id': entity.ckan_dataset_id})
self.assertEqual(result['notes'], entity.description)
user = self.create_user(cnx, 'toto', firstname=u'T.',
surname=u'Oto', email=u'to@t.o')
result = ckan_post(self.ckan_config, 'package_show',
{'id': entity.ckan_dataset_id})
self.assertEqual(result['maintainer'], 'T. Oto')
self.assertEqual(result['maintainer_email'], 'to@t.o')
def _check_entity_delete(self, cnx, entity):
self.set_description('entity deletion')
ckanid = entity.ckan_dataset_id
result = ckan_post(self.ckan_config, 'package_show',
{'id': ckanid})
self.assertEqual(result['state'], 'deleted')
if __name__ == '__main__':
from logilab.common.testlib import unittest_main
# copyright 2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact --
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <>.
"""cubicweb-cg33catsi utilities"""
import json
from urllib import basejoin
import requests
from cubicweb.predicates import objectify_predicate
class CKANPostError(Exception):
"""CKAN post action error"""
def ckan_post(config, action, data=None):
url = basejoin(config['ckan-baseurl'], 'api/3/action/' + action)
headers = {'Authorization': config['ckan-api-key'],
'Content-Type': 'application/json'}
r =, headers=headers, data=json.dumps(data or {}))
if not r.ok:
error = r.json()['error']
raise CKANPostError('action %s failed: %s' % (action, error))
return r.json()['result']
def ckan_instance_configured(cls, req, **kwargs):
"""Return 1 if CKAN instance configuration is defined.
(Mostly useful in tests to disable CKAN hooks.)
config = req.vreg.config
for option in ('ckan-baseurl', 'ckan-api-key', 'ckan-organization'):
if not config.get(option):
req.error('CKAN instance configuration incomplete, missing "%s" '
'option' % option)
return 0
return 1
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment