import os
import shutil
import uuid
import json
from flask import Flask, url_for
from flask import render_template, request, jsonify
import plotly
from datetime import datetime
from datmo.config import Config
from datmo.core.entity.run import Run
from datmo.core.controller.base import BaseController
from datmo.core.util.misc_functions import prettify_datetime, printable_object
app = Flask(__name__)
base_controller = BaseController()
user = {
"name":
"Shabaz Patel",
"username":
"shabazp",
"email":
"shabaz@datmo.com",
"gravatar_url":
"https://www.gravatar.com/avatar/" + str(uuid.uuid1()) +
"?s=220&d=identicon&r=PG"
}
[docs]@app.route("/")
def home():
models = [base_controller.model.__dict__] if base_controller.model else []
return render_template("profile.html", user=user, models=models)
[docs]@app.route("/<model_name>")
def model_summary(model_name):
model = base_controller.model.__dict__
snapshots = [
{
"id": "alfwokd",
"created_at": "Sun March 3rd, 2018",
"labels": ["cool", "default"],
"config": {
"algorithm": "random forest"
},
"stats": {
"accuracy": 0.98
}
},
]
config_keys = ["algorithm"]
stats_keys = ["accuracy"]
return render_template(
"model_summary.html",
user=user,
model=model,
snapshots=snapshots,
config_keys=config_keys,
stats_keys=stats_keys)
[docs]@app.route("/<model_name>/experiments")
def model_experiments(model_name):
model = base_controller.model.__dict__
if model_name == model['name']:
tasks = base_controller.dal.task.query({"model_id": model['id']})
experiments = [Run(task) for task in tasks]
for experiment in experiments:
experiment.config_printable = printable_object(experiment.config)
experiment.start_time_prettified = prettify_datetime(
experiment.start_time)
experiment.end_time_prettified = prettify_datetime(
experiment.end_time)
experiment.results_printable = printable_object(experiment.results)
else:
experiments = []
return render_template(
"model_experiments.html",
user=user,
model=model,
experiments=experiments)
[docs]@app.route("/<model_name>/snapshots")
def model_snapshots(model_name):
model = base_controller.model.__dict__
if model_name == model['name']:
snapshots = base_controller.dal.snapshot.query({
"model_id": model['id']
})
snapshots = [(snapshot, snapshot.to_dictionary(stringify=True))
for snapshot in snapshots]
else:
snapshots = []
config_keys = set(
item for sublist in
[snapshot[0].__dict__["config"].keys() for snapshot in snapshots]
for item in sublist)
stats_keys = set(
item for sublist in
[snapshot[0].__dict__["stats"].keys() for snapshot in snapshots]
for item in sublist)
return render_template(
"model_snapshots.html",
user=user,
model=model,
snapshots=snapshots,
config_keys=config_keys,
stats_keys=stats_keys)
[docs]@app.route(
"/data/<model_name>/deployments/<deployment_version_id>/<model_version_id>"
)
def model_deployment_data(model_name, deployment_version_id, model_version_id):
# here we want to get the value of user (i.e. ?start=0)
start = request.args.get('start', 0)
count = request.args.get('count', None)
data_type = request.args.get('data_type', None)
key_name = request.args.get('key_name', None)
graph_type = request.args.get('graph_type', None)
if not data_type and not key_name and not graph_type:
return "error", 400
# Get new data to add to the graphs
filter = {
"model_id": model_name,
"deployment_version_id": deployment_version_id,
"model_version_id": model_version_id,
"start": int(start)
}
if count: filter["count"] = int(count)
new_data = datmo_monitoring.search_metadata(filter)
# update the number of new results
num_new_results = len(new_data)
# return error if data_type is not correct
if data_type not in ["input", "prediction", "feedback", "system_metrics"]:
return "error", 400
# populate the data into the correct construct based on graph type
if graph_type == "timeseries":
new_time_data = [
datum['updated_at'] if datum['updated_at'] else datum['created_at']
for datum in new_data
]
new_time_data_datetime = [
datetime.fromtimestamp(
float(t) / 1000).strftime('%Y-%m-%d %H:%M:%S')
for t in new_time_data
]
new_feature_data = [
datum[data_type][key_name] if datum[data_type] else None
for datum in new_data
]
graph_data_output = {
"new_data": {
"x": [new_time_data_datetime],
"y": [new_feature_data],
}
}
elif graph_type == "histogram":
filter = {
"model_id": model_name,
"deployment_version_id": deployment_version_id,
"model_version_id": model_version_id,
}
cumulative_data = datmo_monitoring.search_metadata(filter)
cumulative_feature_data = [
datum[data_type][key_name] for datum in cumulative_data
if datum[data_type] and key_name in datum[data_type].keys()
]
import numpy as np
counts, binedges = np.histogram(cumulative_feature_data)
binsize = binedges[1] - binedges[0]
bin_names = [
str(round(binedge, 2)) + " : " + str(round(binedge + binsize, 2))
for binedge in binedges
]
graph_data_output = {
"cumulative_data": {
"x": [bin_names],
"y": [counts]
}
}
elif graph_type == "gauge":
new_feature_data = [
datum[data_type][key_name] if datum[data_type] else None
for datum in new_data
]
if len(new_feature_data) > 0:
average = sum(new_feature_data) / float(len(new_feature_data))
else:
average = None
graph_data_output = {"average": average}
else:
return "error", 400
# Convert the figures to JSON
# PlotlyJSONEncoder appropriately converts pandas, datetime, etc
# objects to their JSON equivalents
graph_data_outputJSON = json.dumps(
graph_data_output, cls=plotly.utils.PlotlyJSONEncoder)
return jsonify(
graph_data_output_json_str=graph_data_outputJSON,
num_new_results=num_new_results)
[docs]@app.route(
"/<model_name>/deployments/<deployment_version_id>/<model_version_id>")
def model_deployment_detail(model_name, deployment_version_id,
model_version_id):
model = base_controller.model.__dict__
filter = {
"model_id": model_name,
"model_version_id": model_version_id,
"deployment_version_id": deployment_version_id
}
input_keys, prediction_keys, feedback_keys = [], [], []
data = datmo_monitoring.search_metadata(filter)
if data:
max_index = 0
for ind, datum in enumerate(data):
if datum['feedback'] is not None:
max_index = ind
datum = data[max_index]
input_keys = list(datum['input'].keys())
prediction_keys = list(datum['prediction'].keys())
feedback_keys = list(
datum['feedback'].keys()) if datum['feedback'] is not None else []
# Determine the graph directory path and create if not present
graph_dirpath = os.path.join(base_controller.home,
Config().datmo_directory_name, "deployments",
deployment_version_id, model_version_id,
"graphs")
if not os.path.exists(graph_dirpath): os.makedirs(graph_dirpath)
# Include deployment info
deployment_info = datmo_monitoring.get_deployment_info(
deployment_version_id=deployment_version_id)
# Prettify dates
deployment_info['created_at'] = prettify_datetime(
deployment_info['created_at'])
# TODO: replace with proper handling
deployment_info['endpoints'] = [
endpoint for endpoint in deployment_info['endpoints']
if "".join(model_version_id.split("_")) in endpoint
]
deployment_info['service_paths'] = [
path for path in deployment_info['service_paths']
if "".join(model_version_id.split("_")) in path
]
# TODO: END
deployment_info['deployment_version_id'] = deployment_version_id
deployment_info['model_version_id'] = model_version_id
return render_template(
"model_deployment_detail.html",
user=user,
model=model,
deployment=deployment_info,
graph_dirpath=graph_dirpath,
input_keys=input_keys,
prediction_keys=prediction_keys,
feedback_keys=feedback_keys,
)
[docs]@app.route("/<model_name>/deployments")
def model_deployments(model_name):
model = base_controller.model.__dict__
# get all data and extract unique model_version_id and deployment_version_id
filter = {"model_id": model_name}
all_data = datmo_monitoring.search_metadata(filter)
model_version_ids = set(data['model_version_id'] for data in all_data)
deployment_version_ids = set(
data['deployment_version_id'] for data in all_data)
# Get deployment information for each of the deployments
deployments = []
for deployment_version_id in deployment_version_ids:
for model_version_id in model_version_ids:
try:
deployment_info = datmo_monitoring.get_deployment_info(
deployment_version_id=deployment_version_id)
except:
break
# Prettify dates
deployment_info['created_at'] = prettify_datetime(
deployment_info['created_at'])
# TODO: replace with proper handling
deployment_info['endpoints'] = [
endpoint for endpoint in deployment_info['endpoints']
if "".join(model_version_id.split("_")) in endpoint
]
deployment_info['service_paths'] = [
path for path in deployment_info['service_paths']
if "".join(model_version_id.split("_")) in path
]
# TODO: END
deployment_info['deployment_version_id'] = deployment_version_id
deployment_info['model_version_id'] = model_version_id
deployments.append(deployment_info)
return render_template(
"model_deployments.html",
user=user,
model=model,
deployments=deployments,
)
[docs]@app.route(
"/<model_name>/deployments/<deployment_version_id>/<model_version_id>/custom/create"
)
def model_deployment_script_create(model_name, deployment_version_id,
model_version_id):
content = request.args.get('content')
filepath = request.args.get('filepath')
dirpath, _ = os.path.split(filepath)
# ensure the containing directory exists
if not os.path.exists(dirpath): os.makedirs(dirpath)
with open(filepath, "w") as f:
f.write(content)
return "complete", 200
[docs]@app.route(
"/<model_name>/deployments/<deployment_version_id>/<model_version_id>/custom/run"
)
def model_deployment_script_run(model_name, deployment_version_id,
model_version_id):
filepath = request.args.get('filepath')
# ensure that the filepath is a valid path
if not os.path.isfile(filepath):
return "error", 400
os.system("python " + filepath)
return "complete", 200
[docs]@app.route("/hash/generate")
def generate_hash():
string_to_hash = str(request.args.get('string_to_hash'))
hash = str(uuid.uuid3(uuid.NAMESPACE_DNS, string_to_hash))
return jsonify({"result": hash})
[docs]@app.route("/alias/create")
def create_alias():
filepath = request.args.get('filepath')
graph_id = request.args.get('graph_id')
available_filepath = os.path.join(app.root_path, "static", "img",
graph_id + ".jpg")
print(filepath)
if os.path.exists(available_filepath):
os.remove(available_filepath)
shutil.copy(src=filepath, dst=available_filepath)
print(available_filepath)
webpath = url_for("static", filename="./img/" + graph_id + ".jpg")
return jsonify({"webpath": webpath})
if __name__ == "__main__":
app.run(debug=True)