Commit 08c7107d authored by François Ferry's avatar François Ferry
Browse files

style: paint it black

parent d7529c01f442
......@@ -2,28 +2,28 @@
"""cubicweb-dataprocessing application packaging information"""
modname = 'dataprocessing'
distname = 'cubicweb-dataprocessing'
modname = "dataprocessing"
distname = "cubicweb-dataprocessing"
numversion = (0, 3, 0)
version = '.'.join(str(num) for num in numversion)
version = ".".join(str(num) for num in numversion)
license = 'LGPL'
author = 'LOGILAB S.A. (Paris, FRANCE)'
author_email = 'contact@logilab.fr'
description = 'Data validation and transformation process'
web = 'http://www.cubicweb.org/project/%s' % distname
license = "LGPL"
author = "LOGILAB S.A. (Paris, FRANCE)"
author_email = "contact@logilab.fr"
description = "Data validation and transformation process"
web = "http://www.cubicweb.org/project/%s" % distname
__depends__ = {
'six': '>= 1.4.0',
'cubicweb': '>= 3.24.0.dev0',
'cubicweb-file': None,
"six": ">= 1.4.0",
"cubicweb": ">= 3.24.0.dev0",
"cubicweb-file": None,
}
__recommends__ = {}
classifiers = [
'Environment :: Web Environment',
'Framework :: CubicWeb',
'Programming Language :: Python',
'Programming Language :: JavaScript',
"Environment :: Web Environment",
"Framework :: CubicWeb",
"Programming Language :: Python",
"Programming Language :: JavaScript",
]
......@@ -29,33 +29,33 @@ from cubicweb.view import EntityAdapter
def process_type_from_etype(etype):
"""Return the type of data process from etype name"""
return etype[len('Data'):-len('Process')].lower()
return etype[len("Data") : -len("Process")].lower()
def fspath_from_eid(cnx, eid):
"""Return fspath for an entity with `data` attribute stored in BFSS"""
# XXX this assumes the file is managed by BFSS.
rset = cnx.execute('Any fspath(D) WHERE X data D, X eid %(feid)s',
{'feid': eid})
rset = cnx.execute("Any fspath(D) WHERE X data D, X eid %(feid)s", {"feid": eid})
if not rset:
raise Exception('Could not find file system path for #%d.' % eid)
return rset[0][0].read().decode('utf-8')
raise Exception("Could not find file system path for #%d." % eid)
return rset[0][0].read().decode("utf-8")
class TransformationStep(AnyEntity):
__regid__ = 'TransformationStep'
__regid__ = "TransformationStep"
def dc_title(self):
script = self.step_script[0]
return u'[{0}] {1}'.format(self.index, script.dc_title())
return u"[{0}] {1}".format(self.index, script.dc_title())
class DataProcessAdapter(EntityAdapter):
"""Interface for data processes"""
__regid__ = 'IDataProcess'
__select__ = (EntityAdapter.__select__ &
is_instance('DataTransformationProcess',
'DataValidationProcess'))
__regid__ = "IDataProcess"
__select__ = EntityAdapter.__select__ & is_instance(
"DataTransformationProcess", "DataValidationProcess"
)
@property
def process_type(self):
......@@ -66,8 +66,8 @@ class DataProcessAdapter(EntityAdapter):
"""Fire transition identified by *short* name `trname` of the
underlying workflowable entity.
"""
iwf = self.entity.cw_adapt_to('IWorkflowable')
return iwf.fire_transition('wft_dataprocess_' + trname, **kwargs)
iwf = self.entity.cw_adapt_to("IWorkflowable")
return iwf.fire_transition("wft_dataprocess_" + trname, **kwargs)
def execute_script(self, script, inputfpath, outfile, parameters=None):
"""Execute script in a subprocess using ``inputfpath`` (file-path) and
......@@ -79,24 +79,29 @@ class DataProcessAdapter(EntityAdapter):
params = json.loads(parameters)
for pname, pvalue in params.items():
if not isinstance(pvalue, text_type):
raise ValueError('invalid parameter value for "{0}": {1}, '
'must be a string'.format(pname, pvalue))
cmdline.extend(['--' + pname, pvalue])
raise ValueError(
'invalid parameter value for "{0}": {1}, '
"must be a string".format(pname, pvalue)
)
cmdline.extend(["--" + pname, pvalue])
proc = Popen(cmdline, stdout=outfile, stderr=PIPE)
self.info('starting subprocess with pid %s: %s',
proc.pid, list2cmdline(cmdline))
self.info(
"starting subprocess with pid %s: %s", proc.pid, list2cmdline(cmdline)
)
_, stderrdata = proc.communicate()
return proc.returncode, stderrdata
@property
def process_scripts(self):
"""Iterator on (script, parameters) for the data process."""
if self.process_type == 'validation':
if self.process_type == "validation":
return ((s, s.parameters) for s in self.entity.validation_script)
else:
rset = self._cw.execute(
'Any S,P ORDERBY I ASC WHERE'
' STP index I, STP step_script S, STP parameters P,'
' STP in_sequence SQ, X transformation_sequence SQ,'
' X eid %(eid)s', {'eid': self.entity.eid})
"Any S,P ORDERBY I ASC WHERE"
" STP index I, STP step_script S, STP parameters P,"
" STP in_sequence SQ, X transformation_sequence SQ,"
" X eid %(eid)s",
{"eid": self.entity.eid},
)
return ((self._cw.entity_from_eid(s), p) for s, p in rset)
......@@ -22,8 +22,7 @@ import os.path
import tempfile
from cubicweb import Binary
from cubicweb.predicates import (on_fire_transition, score_entity,
objectify_predicate)
from cubicweb.predicates import on_fire_transition, score_entity, objectify_predicate
from cubicweb.server import hook
from cubicweb.server.sources import storages
......@@ -31,41 +30,45 @@ from cubicweb_dataprocessing.entities import fspath_from_eid
class ServerStartupHook(hook.Hook):
__regid__ = 'dataprocessing.serverstartup'
events = ('server_startup', 'server_maintenance')
__regid__ = "dataprocessing.serverstartup"
events = ("server_startup", "server_maintenance")
def __call__(self):
bfssdir = os.path.join(self.repo.config.appdatahome, 'bfss')
bfssdir = os.path.join(self.repo.config.appdatahome, "bfss")
if not os.path.exists(bfssdir):
os.makedirs(bfssdir)
print('created', bfssdir)
print("created", bfssdir)
storage = storages.BytesFileSystemStorage(bfssdir)
storages.set_attribute_storage(self.repo, 'File', 'data', storage)
storages.set_attribute_storage(self.repo, "File", "data", storage)
@objectify_predicate
def process_missing_dependency(cls, req, rset=None, eidfrom=None,
eidto=None, **kwargs):
def process_missing_dependency(cls, req, rset=None, eidfrom=None, eidto=None, **kwargs):
"""Return 1 if the process has a dependency"""
if not eidfrom:
return 0
if req.entity_metas(eidfrom)['type'] != 'DataTransformationProcess':
if req.entity_metas(eidfrom)["type"] != "DataTransformationProcess":
return 0
if req.execute('Any X WHERE EXISTS(X process_depends_on Y),'
' X eid %(eid)s', {'eid': eidfrom}):
if req.execute(
"Any X WHERE EXISTS(X process_depends_on Y)," " X eid %(eid)s",
{"eid": eidfrom},
):
return 1
return 0
class AutoStartDataProcessHook(hook.Hook):
"""Automatically starts a data process when an input file is added."""
__regid__ = 'datacat.dataprocess-start-when-inputfile-added'
__select__ = (hook.Hook.__select__ &
hook.match_rtype('process_input_file') &
~process_missing_dependency())
events = ('after_add_relation', )
category = 'workflow'
__regid__ = "datacat.dataprocess-start-when-inputfile-added"
__select__ = (
hook.Hook.__select__
& hook.match_rtype("process_input_file")
& ~process_missing_dependency()
)
events = ("after_add_relation",)
category = "workflow"
def __call__(self):
StartDataProcessOp.get_instance(self._cw).add_data(self.eidfrom)
......@@ -76,21 +79,23 @@ def trinfo_concerns_a_dependency_process(trinfo):
another.
"""
process = trinfo.for_entity
if process.cw_etype != 'DataValidationProcess':
if process.cw_etype != "DataValidationProcess":
return 0
return 1 if process.reverse_process_depends_on else 0
class StartDataProcessWithDependencyHook(hook.Hook):
"""Starts a data process when its dependency terminated successfully."""
__regid__ = 'datacat.dataprocess-start-when-dependency-terminated'
__select__ = (hook.Hook.__select__ &
on_fire_transition('DataValidationProcess',
'wft_dataprocess_complete') &
score_entity(trinfo_concerns_a_dependency_process))
events = ('after_add_entity', )
category = 'workflow'
__regid__ = "datacat.dataprocess-start-when-dependency-terminated"
__select__ = (
hook.Hook.__select__
& on_fire_transition("DataValidationProcess", "wft_dataprocess_complete")
& score_entity(trinfo_concerns_a_dependency_process)
)
events = ("after_add_entity",)
category = "workflow"
def __call__(self):
vprocess = self.entity.for_entity
......@@ -108,20 +113,19 @@ class StartDataProcessOp(hook.DataOperationMixIn, hook.LateOperation):
def precommit_event(self):
for eid in self.get_data():
process = self.cnx.entity_from_eid(eid)
iprocess = process.cw_adapt_to('IDataProcess')
iprocess.fire_workflow_transition('start')
iprocess = process.cw_adapt_to("IDataProcess")
iprocess.fire_workflow_transition("start")
class StartDataProcessHook(hook.Hook):
__regid__ = 'datacat.dataprocess-start'
__select__ = (hook.Hook.__select__ &
(on_fire_transition('DataTransformationProcess',
'wft_dataprocess_start') |
on_fire_transition('DataValidationProcess',
'wft_dataprocess_start')))
__regid__ = "datacat.dataprocess-start"
__select__ = hook.Hook.__select__ & (
on_fire_transition("DataTransformationProcess", "wft_dataprocess_start")
| on_fire_transition("DataValidationProcess", "wft_dataprocess_start")
)
events = ('after_add_entity', )
category = 'workflow'
events = ("after_add_entity",)
category = "workflow"
def __call__(self):
process = self.entity.for_entity
......@@ -137,10 +141,10 @@ class ExecuteProcessScriptsOp(hook.DataOperationMixIn, hook.Operation):
def precommit_event(self):
for peid in self.get_data():
process = self.cnx.entity_from_eid(peid)
iprocess = process.cw_adapt_to('IDataProcess')
iprocess = process.cw_adapt_to("IDataProcess")
if not process.process_input_file:
msg = u'no input file'
iprocess.fire_workflow_transition('error', comment=msg)
msg = u"no input file"
iprocess.fire_workflow_transition("error", comment=msg)
return
inputfile = process.process_input_file[0]
inputfpath = fspath_from_eid(self.cnx, inputfile.eid)
......@@ -148,51 +152,61 @@ class ExecuteProcessScriptsOp(hook.DataOperationMixIn, hook.Operation):
with tempfile.NamedTemporaryFile(delete=False) as outfile:
try:
returncode, stderr = iprocess.execute_script(
script, inputfpath, outfile, params)
script, inputfpath, outfile, params
)
if returncode:
msg = u'\n'.join(
['error in transformation #%d of input file #%d' % (
idx, inputfile.eid),
'subprocess exited with status %d' % returncode])
msg = u"\n".join(
[
"error in transformation #%d of input file #%d"
% (idx, inputfile.eid),
"subprocess exited with status %d" % returncode,
]
)
self.cnx.create_entity(
'File', data=Binary(stderr), data_name=u'stderr',
data_format=u'text/plain',
reverse_process_stderr=process)
iprocess.fire_workflow_transition('error', comment=msg)
"File",
data=Binary(stderr),
data_name=u"stderr",
data_format=u"text/plain",
reverse_process_stderr=process,
)
iprocess.fire_workflow_transition("error", comment=msg)
return
finally:
if idx >= 1:
os.remove(inputfpath)
inputfpath = outfile.name
if iprocess.process_type == 'transformation':
if iprocess.process_type == "transformation":
output_format = script.output_format or inputfile.data_format
with open(outfile.name, mode='rb') as output:
with open(outfile.name, mode="rb") as output:
# XXX better copy the file rather than reading it...
self.cnx.create_entity(
'File', data=Binary(output.read()),
"File",
data=Binary(output.read()),
data_name=inputfile.data_name,
data_format=output_format,
produced_by=process)
produced_by=process,
)
os.remove(outfile.name)
iprocess.fire_workflow_transition('complete')
iprocess.fire_workflow_transition("complete")
class SetValidatedByHook(hook.Hook):
"""Set the `validated_by` relation update completion of the data
validation process.
"""
__regid__ = 'datacat.datavalidationprocess-completed-inputfile-validated_by'
__select__ = (hook.Hook.__select__ &
on_fire_transition('DataValidationProcess',
'wft_dataprocess_complete'))
events = ('after_add_entity', )
category = 'workflow'
__regid__ = "datacat.datavalidationprocess-completed-inputfile-validated_by"
__select__ = hook.Hook.__select__ & on_fire_transition(
"DataValidationProcess", "wft_dataprocess_complete"
)
events = ("after_add_entity",)
category = "workflow"
def __call__(self):
process = self.entity.for_entity
self._cw.execute(
'SET F validated_by VP WHERE F eid %(input)s, VP eid %(vp)s,'
' NOT F validated_by VP',
{'vp': process.eid,
'input': process.process_input_file[0].eid})
"SET F validated_by VP WHERE F eid %(input)s, VP eid %(vp)s,"
" NOT F validated_by VP",
{"vp": process.eid, "input": process.process_input_file[0].eid},
)
......@@ -3,43 +3,44 @@ def copy_script(eid, etype, **kwargs):
(ValidationScript or TransformationScript).
"""
script = cnx.entity_from_eid(eid)
kwargs['name'] = script.name
kwargs['implemented_by'] = script.implemented_by
kwargs["name"] = script.name
kwargs["implemented_by"] = script.implemented_by
new = create_entity(etype, **kwargs)
script.cw_delete()
return new
add_entity_type('ValidationScript')
add_entity_type('TransformationScript')
add_entity_type('TransformationSequence')
add_entity_type('TransformationStep')
add_entity_type("ValidationScript")
add_entity_type("TransformationScript")
add_entity_type("TransformationSequence")
add_entity_type("TransformationStep")
rset = rql('Any P,S WHERE P is DataTransformationProcess, P process_script S',
ask_confirm=False)
drop_relation_definition('DataTransformationProcess', 'process_script', 'Script')
rset = rql(
"Any P,S WHERE P is DataTransformationProcess, P process_script S",
ask_confirm=False,
)
drop_relation_definition("DataTransformationProcess", "process_script", "Script")
for peid, seid in rset:
script = copy_script(seid, 'TransformationScript')
seq = create_entity('TransformationSequence',
reverse_transformation_sequence=peid)
create_entity('TransformationStep', index=0, step_script=script,
in_sequence=seq)
script = copy_script(seid, "TransformationScript")
seq = create_entity("TransformationSequence", reverse_transformation_sequence=peid)
create_entity("TransformationStep", index=0, step_script=script, in_sequence=seq)
commit(ask_confirm=False)
rset = rql('Any P,S WHERE P is DataValidationProcess, P process_script S',
ask_confirm=False)
drop_relation_definition('DataValidationProcess', 'process_script', 'Script')
rset = rql(
"Any P,S WHERE P is DataValidationProcess, P process_script S", ask_confirm=False
)
drop_relation_definition("DataValidationProcess", "process_script", "Script")
for peid, seid in rset:
copy_script(seid, 'ValidationScript', reverse_validation_script=peid)
copy_script(seid, "ValidationScript", reverse_validation_script=peid)
commit(ask_confirm=False)
drop_relation_type('process_script')
drop_relation_definition('Script', 'implemented_by', 'File')
drop_entity_type('Script')
drop_relation_type("process_script")
drop_relation_definition("Script", "implemented_by", "File")
drop_entity_type("Script")
add_relation_type('process_stderr')
add_relation_type("process_stderr")
sync_schema_props_perms('DataTransformationProcess')
sync_schema_props_perms('DataValidationProcess')
sync_schema_props_perms('process_input_file')
sync_schema_props_perms('File', syncprops=False)
sync_schema_props_perms("DataTransformationProcess")
sync_schema_props_perms("DataValidationProcess")
sync_schema_props_perms("process_input_file")
sync_schema_props_perms("File", syncprops=False)
sync_schema_props_perms('transformation_sequence')
sync_schema_props_perms('TransformationSequence')
sync_schema_props_perms("transformation_sequence")
sync_schema_props_perms("TransformationSequence")
......@@ -16,30 +16,32 @@
"""cubicweb-dataprocessing schema"""
from yams.buildobjs import (ComputedRelation, EntityType, RelationDefinition,
Int, String)
from yams.buildobjs import ComputedRelation, EntityType, RelationDefinition, Int, String
from cubicweb import _
from cubicweb.schema import (RRQLExpression, ERQLExpression,
RQLConstraint, WorkflowableEntityType)
from cubicweb.schema import (
RRQLExpression,
ERQLExpression,
RQLConstraint,
WorkflowableEntityType,
)
from cubicweb_file.schema import File
DATAPROCESS_UPDATE_PERMS_RQLEXPR = (
'U in_group G, G name "users", '
'X in_state S, S name "wfs_dataprocess_initialized"')
'X in_state S, S name "wfs_dataprocess_initialized"'
)
class _DataProcess(WorkflowableEntityType):
__abstract__ = True
__permissions__ = {
'read': ('managers', 'users', 'guests'),
'update': ('managers',
ERQLExpression(DATAPROCESS_UPDATE_PERMS_RQLEXPR)),
'delete': ('managers',
ERQLExpression(DATAPROCESS_UPDATE_PERMS_RQLEXPR)),
'add': ('managers', 'users')
"read": ("managers", "users", "guests"),
"update": ("managers", ERQLExpression(DATAPROCESS_UPDATE_PERMS_RQLEXPR)),
"delete": ("managers", ERQLExpression(DATAPROCESS_UPDATE_PERMS_RQLEXPR)),
"add": ("managers", "users"),
}
......@@ -52,75 +54,87 @@ class DataValidationProcess(_DataProcess):
class process_depends_on(RelationDefinition):
subject = 'DataTransformationProcess'
object = 'DataValidationProcess'
cardinality = '??'
subject = "DataTransformationProcess"
object = "DataValidationProcess"
cardinality = "??"
class process_input_file(RelationDefinition):
__permissions__ = {
'read': ('managers', 'users', 'guests'),
'add': ('managers', RRQLExpression(
'U in_group G, G name "users", '
'S in_state ST, ST name "wfs_dataprocess_initialized"')),
'delete': ('managers', RRQLExpression('U has_update_permission S'))}
subject = ('DataTransformationProcess', 'DataValidationProcess')
object = 'File'
cardinality = '?*'
description = _('input file of the data process')
"read": ("managers", "users", "guests"),
"add": (
"managers",
RRQLExpression(
'U in_group G, G name "users", '
'S in_state ST, ST name "wfs_dataprocess_initialized"'
),
),
"delete": ("managers", RRQLExpression("U has_update_permission S")),
}
subject = ("DataTransformationProcess", "DataValidationProcess")
object = "File"
cardinality = "?*"
description = _("input file of the data process")
constraints = [
RQLConstraint('NOT EXISTS(SC implemented_by O)',
msg=_('file is used by a script')),
RQLConstraint(
"NOT EXISTS(SC implemented_by O)", msg=_("file is used by a script")
),
]
class validated_by(RelationDefinition):
"""A File may be validated by a validation process"""
__permissions__ = {'read': ('managers', 'users', 'guests'),
'add': (),
'delete': ()}
subject = 'File'
object = 'DataValidationProcess'
cardinality = '**'
__permissions__ = {"read": ("managers", "users", "guests"), "add": (), "delete": ()}
subject = "File"
object = "DataValidationProcess"
cardinality = "**"
class produced_by(RelationDefinition):
"""A File may be produced by a transformation process"""
__permissions__ = {'read': ('managers', 'users', 'guests'),
'add': (),
'delete': ()}
subject = 'File'
object = 'DataTransformationProcess'
cardinality = '?*'
__permissions__ = {"read": ("managers", "users", "guests"), "add": (), "delete": ()}
subject = "File"
object = "DataTransformationProcess"
cardinality = "?*"
class process_stderr(RelationDefinition):
__permissions__ = {
'read': ('managers', 'users', 'guests',),
'add': (),
'delete': (),
"read": (
"managers",
"users",
"guests",
),
"add": (),
"delete": (),
}
subject = ('DataTransformationProcess', 'DataValidationProcess')
object = 'File'
cardinality = '??'
subject = ("DataTransformationProcess", "DataValidationProcess")
object = "File"
cardinality = "??"
inlined = True
composite = 'subject'
description = _('standard error output')
composite = "subject"
description = _("standard error output")
# Set File permissions:
# * use `produced_by` relation to prevent modification of generated files