Source code for hummingbird.processes.wps_compliance_checker

import os

from compliance_checker.runner import ComplianceChecker, CheckSuite
from compliance_checker import __version__ as cchecker_version

from pywps import Process
from pywps import LiteralInput
from pywps import ComplexInput, ComplexOutput
from pywps import Format, FORMATS
from import Metadata
from import ProcessError

import logging
LOGGER = logging.getLogger("PYWPS")

[docs]class CChecker(Process): def __init__(self): inputs = [ LiteralInput('test', 'Test Suite', data_type='string', abstract="Select the test you want to run." " Default: cf (climate forecast conventions)", min_occurs=1, max_occurs=1, default='cf', allowed_values=['cf', 'ioos']), LiteralInput('criteria', 'Criteria', data_type='string', abstract="Define the criteria for the checks. Either Strict, Normal or Lenient." " Defaults to Normal.", min_occurs=1, max_occurs=1, default='normal', allowed_values=['strict', 'normal', 'lenient']), ComplexInput('dataset', 'Dataset', abstract='You may provide a URL or upload a NetCDF file.', min_occurs=0, max_occurs=1, supported_formats=[FORMATS.NETCDF]), ComplexInput('dataset_opendap', 'Remote OpenDAP Data URL', abstract="Or provide a remote OpenDAP data URL," " for example:" "", # noqa min_occurs=0, max_occurs=1, supported_formats=[FORMATS.DODS]), LiteralInput('format', 'Output Format', data_type='string', abstract="The format of the check reporst. Either text, json or html. Defaults to html.", min_occurs=1, max_occurs=1, default='html', allowed_values=['text', 'json', 'html']), ] outputs = [ ComplexOutput('output', 'Check Report', abstract="Report of check result.", as_reference=True, supported_formats=[Format('text/html')]), ] super(CChecker, self).__init__( self._handler, identifier="cchecker", title="IOOS Compliance Checker", version=cchecker_version, abstract="Runs the IOOS Compliance Checker tool to" " check datasets against compliance standards." " Each compliance standard is executed" " by a Check Suite, which functions similar to a" " Python standard Unit Test." " A Check Suite runs one or more checks against a dataset," " returning a list of Results which are then aggregated" " into a summary." " Development and maintenance for the compliance" " checker is done by the Integrated Ocean Observing System (IOOS).", metadata=[ Metadata('Birdhouse', ''), Metadata('User Guide', ''), Metadata('CF Conventions', ''), Metadata('IOOS', ''), Metadata('Compliance Checker on GitHub', ''), Metadata('IOOS Compliance Online Checker', ''), ], inputs=inputs, outputs=outputs, status_supported=True, store_supported=True, ) def _handler(self, request, response): dataset = None if 'dataset_opendap' in request.inputs: dataset = request.inputs['dataset_opendap'][0].url LOGGER.debug("opendap dataset url: {}".format(dataset)) elif 'dataset' in request.inputs: dataset = request.inputs['dataset'][0].file LOGGER.debug("opendap dataset file: {}".format(dataset)) if not dataset: raise ProcessError("You need to provide a Dataset.") output_format = request.inputs['format'][0].data check_suite = CheckSuite() check_suite.load_all_available_checkers() output_file = os.path.join( self.workdir, "check_report.{}".format(output_format))"checking dataset {}".format(dataset)) ComplianceChecker.run_checker( dataset, checker_names=[ for checker in request.inputs['test']], verbose=True, criteria=request.inputs['criteria'][0].data, output_filename=output_file, output_format=output_format) response.outputs['output'].file = output_file response.update_status("compliance checker finshed.", 100) return response