Skip to content
Snippets Groups Projects
Commit d26811c0 authored by Benjamin Regler's avatar Benjamin Regler
Browse files

:bookmark: Updated NomadQuery API Interface to use the new Nomad Query Language v2.0

parent b6f64de2
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
Benjamin Regler - Apache 2.0 License Benjamin Regler - Apache 2.0 License
@license http://www.apache.org/licenses/LICENSE-2.0 @license http://www.apache.org/licenses/LICENSE-2.0
@author Benjamin Regler @author Benjamin Regler
@version 1.0.0 @version 2.0.0
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
...@@ -31,13 +31,13 @@ import random ...@@ -31,13 +31,13 @@ import random
if sys.version_info.major > 2: if sys.version_info.major > 2:
# For Python 3.0 and later # For Python 3.0 and later
from urllib.parse import quote, unquote_plus from urllib.parse import quote, unquote_plus, urlencode
from urllib.request import urlopen, Request from urllib.request import urlopen, Request
else: else:
# Fall back to Python 2's urllib2 # Fall back to Python 2's urllib2
from urllib import quote, unquote_plus from urllib import quote, unquote_plus, urlencode
from urllib2 import urlopen, Request from urllib2 import urlopen, Request
class NomadQueryResult(object): class NomadQueryResult(object):
"""Nomad Query Result class """Nomad Query Result class
...@@ -47,48 +47,81 @@ class NomadQueryResult(object): ...@@ -47,48 +47,81 @@ class NomadQueryResult(object):
def __init__(self, query, response, version=1.0): def __init__(self, query, response, version=1.0):
"""Constructor. """Constructor.
Arguments: Arguments:
query {dict} -- Query information, i.e., query filter, context, query {dict} -- Query information, i.e., query filter, context,
group_by, endpoint, and URL group_by, endpoint, and URL
response {dict} -- Response of the Nomad Query API response {dict} -- Response of the Nomad Query API
Keyword Arguments: Keyword Arguments:
version {number} -- Version of the Nomad Query data file version {number} -- Version of the Nomad Query data file
(default: {1.0}) (default: {1.0})
""" """
self._uri = [] self._uri = []
self._download_url = ''
self._query = query or {} self._query = query or {}
self._timestamp = int(time.time()) self._timestamp = int(time.time())
self._response = response.get('result', {})
# Load response information
self._load(response, version)
def _load(self, response, version):
"""Load response information
Arguments:
response {dict} -- Response of the Nomad Query API
version {float} -- Version of the Nomad Query data file
"""
# Set version of the Nomad Query data file
self._version = version self._version = version
# Construct download path # Initialize
path = response.get('path', '') if version == 1.0:
self._download_url = self._query.get('endpoint', '') + 'download/' + \ self._response = response.get('result', {})
path.split('_')[-1] + '?file=' + quote(path.encode('utf-8')) + '.json'
# Get Nomad URIs # Construct download path
response = NomadQuery().request(self._download_url) path = response.get('path', '')
if response['status'] == 'success': self._download_url = self._query.get('endpoint', '') + \
regex = re.compile(r'(?<=/[a-zA-Z0-9\-_]{3}/)[^\.]+') 'download/' + path.split('_')[-1] + '?file=' + \
paths = response['data'].get('result', []) quote(path.encode('utf-8')) + '.json'
for path in paths: # Get Nomad URIs
match = regex.search(path) response = NomadQuery.request(self._download_url)
if match: if response['status'] == 'success':
# Substitute prefixes regex = re.compile(r'(?<=/[a-zA-Z0-9\-_]{3}/)[^.]+')
groups = match.group(0).split('/') paths = response['data'].get('result', [])
groups[0] = 'N' + groups[0][1:] # Normalized
if len(groups) == 2: for path in paths:
groups[1] = 'C' + groups[1][1:] # Computed match = regex.search(path)
if match:
# Substitute prefixes
groups = match.group(0).split('/')
groups[0] = 'N' + groups[0][1:] # Normalized
self._uri.append('nmd://' + '/'.join(groups)) if len(groups) == 2:
groups[1] = 'C' + groups[1][1:] # Computed
self._uri.append('nmd://' + '/'.join(groups))
elif version == 2.0:
self._response = response.get('data', {})
# Construct and get Nomad URIs
for entry in self._response:
if not entry['type'].lower().endswith('calculation'):
continue
# Get archive gid
context = entry['attributes']['metadata']['archive_context']
gid = context['archive_gid'][0]
# Assemble Nomad Uri
uri = 'nmd://N' + gid[1:] + '/' + entry['id']
self._uri.append(uri)
def version(self): def version(self):
"""Get the version of the Nomad Query data file. """Get the version of the Nomad Query data file.
Returns: Returns:
float -- Version of the Nomad Query data file float -- Version of the Nomad Query data file
""" """
...@@ -107,6 +140,10 @@ class NomadQueryResult(object): ...@@ -107,6 +140,10 @@ class NomadQueryResult(object):
Returns: Returns:
str -- The download URL of the query str -- The download URL of the query
Deprecated:
Since version 2.0.0, this method is no longer used by internal code
and not recommended.
""" """
return self._download_url return self._download_url
...@@ -142,23 +179,25 @@ class NomadQuery(object): ...@@ -142,23 +179,25 @@ class NomadQuery(object):
""" """
# Version of the Nomad Query API # Version of the Nomad Query API
__version__ = 1.0 __version__ = 2.0
# Nomad API endpoint
endpoint = os.environ.get('NOMAD_BASE_URI','https://analytics-toolkit.nomad-coe.eu') + '/api/'
# Private user path # Private user path
user_path = '/data/private' user_path = '/data/private'
def __init__(self, username='', endpoint=''): # Nomad API endpoints
endpoint = 'https://analytics-toolkit.nomad-coe.eu/api/'
query_endpoint = 'https://analytics-toolkit.nomad-coe.eu/archive/nql-api/'
def __init__(self, username='', endpoint='', query_endpoint=''):
"""Constructor. """Constructor.
Keyword Arguments: Keyword Arguments:
username {str} -- Current username. Leave empty to auto-detect username {str} -- Current username. Leave empty to auto-detect
username (default: {''}) username (default: {''})
endpoint {str} -- Endpoint of the Nomad API (default: endpoint {str} -- Endpoint of the Nomad API (default:
${NOMAD_BASE_URI}/api if set, otherwise {'https://analytics-toolkit.nomad-coe.eu/api/'})
{'https://analytics-toolkit.nomad-coe.eu/api/'}) query_endpoint {str} -- Endpoint of the Nomad Query API (default:
{'https://analytics-toolkit.nomad-coe.eu/nql-api/'})
""" """
self._username = '' self._username = ''
self._base_path = '' self._base_path = ''
...@@ -170,11 +209,14 @@ class NomadQuery(object): ...@@ -170,11 +209,14 @@ class NomadQuery(object):
if len(paths) == 1 and paths[0].lower() != 'nomad': if len(paths) == 1 and paths[0].lower() != 'nomad':
username = paths[0] username = paths[0]
# Set username and overwrite endpoint, if required # Set username and overwrite endpoints, if required
self.username(username) self.username(username)
if endpoint: if endpoint:
self.endpoint = str(endpoint) self.endpoint = str(endpoint)
if query_endpoint:
self.query_endpoint = str(query_endpoint)
def username(self, username=''): def username(self, username=''):
"""Get or set the username. """Get or set the username.
...@@ -190,62 +232,6 @@ class NomadQuery(object): ...@@ -190,62 +232,6 @@ class NomadQuery(object):
'nomad-query') 'nomad-query')
return self._username return self._username
def request(self, url, timeout=10):
"""Request a URL
Arguments:
url {str} -- The URL of a web address
Keyword Arguments:
timeout {number} -- Timeout of the request in seconds (default: {10})
Returns:
dict -- A dictionary with success status, response data, or
error message
"""
# Default request response
result = {
'url': url,
'status': 'error',
'message': 'Unknown error. Please inform the Nomad team to '
'solve this problem.'
}
try:
# Get URL
response = urlopen(Request(url), timeout=timeout)
if response.code != 200:
raise RuntimeError(result['message'])
# Read response
data = json.loads(response.read().decode('utf-8'), 'utf-8')
# Populate result
result.pop('message')
result.update({
'status': 'success',
'data': data
})
except Exception as exc:
exc = sys.exc_info()[1]
response = result.copy()
# Get error message
message = exc
if sys.version_info <= (2, 5) and hasattr(exc, 'message'):
message = exc.message
elif hasattr(exc, 'reason'):
message = exc.reason
response['message'] = str(message)
# Fix error message
if response['message'].endswith('timed out'):
response['message'] = 'Connection timed out. The Nomad ' + \
'Analytics API Service is currently unavailable.'
# Return result
return result
def resolve(self, nmd, path='', recursive=True, timeout=10): def resolve(self, nmd, path='', recursive=True, timeout=10):
"""Resolve a Nomad URI. """Resolve a Nomad URI.
...@@ -303,7 +289,7 @@ class NomadQuery(object): ...@@ -303,7 +289,7 @@ class NomadQuery(object):
if not os.path.isdir(base_path): if not os.path.isdir(base_path):
return queries return queries
# Get all stored queries # Get all stored queries
for filename in os.listdir(base_path): for filename in os.listdir(base_path):
path = os.path.join(base_path, filename) path = os.path.join(base_path, filename)
if os.path.isfile(path): if os.path.isfile(path):
...@@ -322,17 +308,22 @@ class NomadQuery(object): ...@@ -322,17 +308,22 @@ class NomadQuery(object):
queries.sort(key=lambda x: -x['timestamp']) queries.sort(key=lambda x: -x['timestamp'])
return queries return queries
def query(self, query, group_by='', context='', timeout=10): def query(self, query, group_by='', timeout=10, **kwargs):
"""Query the Nomad Database. """Query the Nomad Database.
Arguments: Arguments:
query {str} -- The query string (see Nomad API reference) query {str} -- The query string (see Nomad API reference)
Keyword Arguments: Keyword Arguments:
group_by {str} -- Group-by field. (default: {''}) group_by {str} -- Group-by field. (default: {''})
context {str} -- Query context. Leave empty to use num_results {int} -- Number of calculations to return
`single_configuration_calculation` (default: {''}) (default: {10000})
timeout {number} -- Timeout of the request in seconds (default: {10}) num_groups {int} -- Number of distinct calculation groups to return
(default: {10})
context {str} -- Deprecated: Query context. Leave empty to use
`single_configuration_calculation` (default: {''})
compat {bool} -- Compatibility mode (default: {True})
timeout {number} -- Timeout of the request in seconds (default: {10})
Returns: Returns:
NomadQueryResult -- The Nomad query result NomadQueryResult -- The Nomad query result
...@@ -343,17 +334,27 @@ class NomadQuery(object): ...@@ -343,17 +334,27 @@ class NomadQuery(object):
RuntimeError -- Unknown error. Please inform the Nomad team to RuntimeError -- Unknown error. Please inform the Nomad team to
solve this problem. solve this problem.
""" """
# Set default context
if not context:
context = 'single_configuration_calculation'
# Construct URL # Construct URL
url = self.endpoint + ('queryGroup/' if group_by else 'query/') + context url = self.query_endpoint + ('search_grouped' if group_by else 'search')
params = {
'source_fields': 'archive_gid',
'sort_field': 'calculation_gid',
'num_results': max(min(kwargs.get('num_results', 10000), 10000), 1),
'format': 'nested'
}
# Normalize query - compatibility fallback
if kwargs.get('compat', True):
query = self._normalize(query)
# Add query # Add query
url += '?filter=' + quote(query.strip()) params['query'] = query.strip()
if group_by: if group_by:
url += quote(' GROUPBY ' + group_by.strip().lower()) params['group_by'] = group_by.strip().lower()
params['num_groups'] = max(kwargs.get('num_groups', 10), 1)
# Construct URL
url += '?' + urlencode(params).replace('+', '%20')
# Read URL # Read URL
response = self.request(url, timeout=timeout) response = self.request(url, timeout=timeout)
...@@ -362,21 +363,18 @@ class NomadQuery(object): ...@@ -362,21 +363,18 @@ class NomadQuery(object):
# Check connection timeout # Check connection timeout
response = response['data'] response = response['data']
if 'timed_out' in response['result'] and response['result']['timed_out']: if response['meta'].get('is_timed_out', False) or \
response['meta'].get('is_terminated_early', False):
response['message'] = 'Connection timed out.' response['message'] = 'Connection timed out.'
# Check for additional error messages
if 'message' in response or 'msg' in response:
raise RuntimeError(response.get('message', response['msg']))
# Construct Nomad Query response # Construct Nomad Query response
query = { query = {
'context': context, 'endpoint': self.query_endpoint,
'endpoint': self.endpoint, 'query': params.get('query', ''),
'filter': query.strip(), 'group_by': params.get('group_by', ''),
'group_by': group_by.strip().lower(),
'url': url 'url': url
} }
return NomadQueryResult(query, response, self.__version__) return NomadQueryResult(query, response, self.__version__)
def fetch(self, name_or_index='', resolve=False, **params): def fetch(self, name_or_index='', resolve=False, **params):
...@@ -531,6 +529,97 @@ class NomadQuery(object): ...@@ -531,6 +529,97 @@ class NomadQuery(object):
data['data'] = self._resolve(data['uri'], **params) data['data'] = self._resolve(data['uri'], **params)
return data return data
@staticmethod
def request(url, timeout=10):
"""Request a URL
Arguments:
url {str} -- The URL of a web address
Keyword Arguments:
timeout {number} -- Timeout of the request in seconds (default: {10})
Returns:
dict -- A dictionary with success status, response data, or
error message
"""
# Default request response
result = {
'url': url,
'status': 'error',
'message': 'Unknown error. Please inform the Nomad team to '
'solve this problem.'
}
try:
# Get URL
response = urlopen(Request(url), timeout=timeout)
# Check response code
if response.code != 200:
raise RuntimeError(result['message'])
# Read response
data = json.loads(response.read().decode('utf-8'), 'utf-8')
# Populate result
result.pop('message')
result.update({
'status': 'success',
'data': data
})
except Exception as exc:
exc = sys.exc_info()[1]
response = result.copy()
# Get error message
message = exc
if sys.version_info <= (2, 5) and hasattr(exc, 'message'):
message = exc.message
elif hasattr(exc, 'reason'):
message = exc.reason
response['message'] = str(message)
# Fix error message
if response['message'].endswith('timed out'):
response['message'] = 'Connection timed out. The Nomad ' + \
'Analytics API Service is currently unavailable.'
# Return result
return result
def _normalize(self, query):
"""[Protected] Normalize query syntax
Arguments:
query {str} -- The query string (see Nomad API reference)
Returns:
str -- The normalized query string
"""
# Convert nomad query syntax v1 to v2
if re.search(r'(?<!\\):', query):
values = re.split('\sand\s', query, 0, re.I)
# Convert query
regex = re.compile(r'([^:]+):(.+)')
for i in range(len(values)):
match = regex.search(values[i])
if match:
# Make sure strings are properly escaped
value = map(str.strip, match.group(2).split(','))
value = ','.join((v if v.isdigit()
else '"' + v.strip('\'" ') + '"')
for v in value)
# Replace colons with equal symbols
values[i] = match.group(1) + ' = ' + value
# Rebuild query
query = ' AND '.join(values)
return query
def _resolve(self, paths, size=None, seed=None, **params): def _resolve(self, paths, size=None, seed=None, **params):
"""[Protected] Resolve Nomad URIs. """[Protected] Resolve Nomad URIs.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment