Something went wrong on our end
-
Markus Scheidgen authoredMarkus Scheidgen authored
app.py 6.61 KiB
# Copyright 2016-2018 Ioan Vancea, Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Quick and dirty app to expose a few materials properties via API
# for a minimal search GUI - optional for Deliverable D2.1
# Properties to be exposed:
# - code_name Table: material (column name: content)
# - basis_set_type Table: material
# - xc_treatment_basis Table: material
# - formula Table: material
# - nr_atoms Table: element (column name: count)
# - space_group Table: material
# - nr_results The number of results found
import json
import psycopg2
import sys
from psycopg2.extras import RealDictCursor
from flask import Flask, jsonify, make_response, request, send_file, abort
from re import findall
# needed for Decorator to accept cross-domain AJAX requests
from datetime import timedelta
from flask import current_app
from functools import update_wrapper
app = Flask(__name__)
# Database Variables
HOSTNAME = 'localhost'
DATABASE = 'repodump5000'
USERNAME = 'nomadapp'
PASSWORD = 'Nomad_Flask'
# Decorator to accept cross-domain AJAX requests
# -*- coding: utf-8 -*-
def crossdomain(origin=None, methods=None, headers=None,
max_age=21600, attach_to_all=True,
automatic_options=True):
if methods is not None:
methods = ', '.join(sorted(x.upper() for x in methods))
if headers is not None and not isinstance(headers, basestring):
headers = ', '.join(x.upper() for x in headers)
if not isinstance(origin, basestring):
origin = ', '.join(origin)
if isinstance(max_age, timedelta):
max_age = max_age.total_seconds()
def get_methods():
if methods is not None:
return methods
options_resp = current_app.make_default_options_response()
return options_resp.headers['allow']
def decorator(f):
def wrapped_function(*args, **kwargs):
if automatic_options and request.method == 'OPTIONS':
resp = current_app.make_default_options_response()
else:
resp = make_response(f(*args, **kwargs))
if not attach_to_all and request.method != 'OPTIONS':
return resp
h = resp.headers
h['Access-Control-Allow-Origin'] = origin
h['Access-Control-Allow-Methods'] = get_methods()
h['Access-Control-Max-Age'] = str(max_age)
h['Access-Control-Allow-Credentials'] = 'true'
h['Access-Control-Allow-Headers'] = \
"Origin, X-Requested-With, Content-Type, Accept, Authorization"
if headers is not None:
h['Access-Control-Allow-Headers'] = headers
return resp
f.provide_automatic_options = False
return update_wrapper(wrapped_function, f)
return decorator
# function to get the number of atoms from chemical_formula (good job Igor :))
def get_number_atoms(s):
nums = [findall(r"[0-9\.]+", i) for i in findall('[A-Z][^A-Z]*', s)]
return sum([1 if len(i) == 0 else sum([int(j) for j in i]) for i in nums])
# just send the index page for static version of mini-gui
@app.route('/')
def hello():
return send_file('static/index.html')
# nothing to return here - not implemented
@app.route('/materials')
def get_materials():
return "NOT IMPLEMENTED: Your list of materials available is here\n"
@app.route('/materials/search')
@crossdomain(origin='*')
def search_materials():
if len(request.args['element']) == 0:
abort(404)
elements = request.args['element'].split(",")
for i in range(len(elements)):
if len(elements[i]) == 1:
elements[i] += "[^a-z]"
page = request.args['page']
limit = request.args['limit']
# let's limit the number of results per request to maximum 200
if int(limit) > 200:
limit = 200
query_offset = (int(page) - 1) * int(limit)
# let's start querying the database
connexion = None
try:
connexion = psycopg2.connect(host=HOSTNAME, database=DATABASE, user=USERNAME, password=PASSWORD)
cursor = connexion.cursor(cursor_factory=RealDictCursor)
SQL = "SELECT chemical_formula, type, name, n, content"
SQL += " FROM metadata m"
SQL += " JOIN calculations c ON m.calc_id = c.calc_id"
SQL += " JOIN basis_sets b ON m.calc_id = b.calc_id"
SQL += " JOIN pottypes p ON c.pottype_id = p.pottype_id"
SQL += " JOIN spacegroups s ON c.calc_id = s.calc_id "
SQL += " JOIN codeversions cv ON m.version_id = cv.version_id"
if len(elements) == 3:
SQL += " WHERE m.chemical_formula SIMILAR TO %s AND m.chemical_formula SIMILAR TO %s AND m.chemical_formula SIMILAR TO %s"
data = ("%" + elements[0] + "%", "%" + elements[1] + "%", "%" + elements[2] + "%")
elif len(elements) == 2:
SQL += " WHERE m.chemical_formula SIMILAR TO %s AND m.chemical_formula SIMILAR TO %s"
data = ("%" + elements[0] + "%", "%" + elements[1] + "%")
else:
SQL += " WHERE m.chemical_formula SIMILAR TO %s"
data = ("%" + elements[0] + "%",)
cursor.execute(SQL, data)
# fetch entries
if cursor.rowcount > 0:
cursor.scroll(query_offset)
rows = cursor.fetchmany(int(limit))
nr_results = cursor.rowcount
# create the dictionary list
results = []
for row in rows:
name_association = {'content': 'code_name', 'type': 'basis_set_type', 'n': 'space_group', 'name': 'xc_treatment_basis', 'chemical_formula': 'formula'}
row = {name_association[old]: new for old, new in row.items()}
row["nr_atoms"] = get_number_atoms(row["formula"])
row["nr_results"] = nr_results
results.append(row)
except psycopg2.DatabaseError as e:
print ('Error %s' % e)
sys.exit(1)
finally:
if connexion:
connexion.close()
return json.dumps(results, indent=2)
# little error handler for "404: Not Found" page
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'error': 'Not found'}), 404)
if __name__ == '__main__':
app.run()