import os
import shlex
import platform
try:
basestring
except NameError:
basestring = str
from datmo.core.controller.task import TaskController
from datmo.core.entity.task import Task as CoreTask
from datmo.core.util.exceptions import InvalidArgumentType
from datmo.core.util.misc_functions import prettify_datetime, format_table
[docs]class Task():
"""Task is an entity object to enable user access to properties
Parameters
----------
task_entity : datmo.core.entity.task.Task
core task entity to reference
home : str, optional
root directory of the project
(default is CWD, if not provided)
Attributes
----------
id : str
the id of the entity
model_id : str
the parent model id for the entity
session_id : str
id of session associated with task
command : str
command that is used by the task
status : str or None
status of the current task
start_time : datetime.datetime or None
timestamp for the beginning time of the task
end_time : datetime.datetime or None
timestamp for the end time of the task
duration : float or None
delta in seconds between start and end times
logs : str or None
string output of logs
results : dict or None
dictionary containing output results from the task
files : list
returns list of file objects for the task in read mode
Methods
-------
get_files(mode="r")
Returns a list of file objects for the task
Raises
------
InvalidArgumentType
"""
def __init__(self, task_entity, home=None):
if not home:
home = os.getcwd()
if not isinstance(task_entity, CoreTask):
raise InvalidArgumentType()
self._core_task = task_entity
self._home = home
self.id = self._core_task.id
self.model_id = self._core_task.model_id
self.session_id = self._core_task.session_id
# Execution definition
self.command = self._core_task.command
# Run parameters
self._status = self._core_task.status
self._start_time = self._core_task.start_time
self._end_time = self._core_task.end_time
self._duration = self._core_task.duration
# Outputs
self._logs = self._core_task.logs
self._results = self._core_task.results
self._files = None
@property
def status(self):
self._core_task = self.__get_core_task()
self._status = self._core_task.status
return self._status
@property
def start_time(self):
self._core_task = self.__get_core_task()
self._start_time = self._core_task.start_time
return self._start_time
@property
def end_time(self):
self._core_task = self.__get_core_task()
self._end_time = self._core_task.end_time
return self._end_time
@property
def duration(self):
self._core_task = self.__get_core_task()
self._duration = self._core_task.duration
return self._duration
@property
def logs(self):
self._core_task = self.__get_core_task()
self._logs = self._core_task.logs
return self._logs
@property
def results(self):
self._core_task = self.__get_core_task()
self._results = self._core_task.results
return self._results
@property
def files(self):
self._files = self.get_files()
return self._files
def __get_core_task(self):
"""Returns the latest core task object for id
Returns
-------
datmo.core.entity.task.Task
core task object fo the task
"""
task_controller = TaskController(home=self._home)
return task_controller.get(self.id)
[docs] def get_files(self, mode="r"):
"""Returns a list of file objects for the task
Parameters
----------
mode : str
file object mode
(default is "r" which signifies read mode)
Returns
-------
list
list of file objects associated with the task
"""
task_controller = TaskController(home=self._home)
return task_controller.get_files(self.id, mode=mode)
def __eq__(self, other):
return self.id == other.id if other else False
def __str__(self):
final_str = '\033[94m' + "task " + self.id + "\n" + '\033[0m'
table_data = []
if self.session_id:
table_data.append(["Session", "-> " + self.session_id])
if self.status:
table_data.append(["Status", "-> " + self.status])
if self.start_time:
table_data.append(
["Start Time", "-> " + prettify_datetime(self.start_time)])
if self.end_time:
table_data.append(
["End Time", "-> " + prettify_datetime(self.end_time)])
if self.duration:
table_data.append(
["Duration", "-> " + str(self.duration) + " seconds"])
# Outputs
if self.logs:
table_data.append(
["Logs", "-> Use task log to view or download logs"])
if self.results:
table_data.append(["Results", "-> " + str(self.results)])
if not self.files:
table_data.append(["Files", "-> None"])
else:
table_data.append(["Files", "-> " + self.files[0].name])
if len(list(self.files)) > 1:
for f in self.files[1:]:
table_data.append([" ", "-> " + f.name])
final_str = final_str + format_table(table_data)
final_str = final_str + "\n" + " " + self.command + "\n" + "\n"
return final_str
def __repr__(self):
return self.__str__()
[docs]def run(command, env=None, home=None, gpu=False):
"""Run the code or script inside
The project must be created before this is implemented. You can do that by using
the following command::
$ datmo init
Parameters
----------
command : str or list
the command to be run in environment. this can be either a string or list
env : str, optional
the location for the environment definition path
(default is None, which will defer to the environment to find a default environment,
or will fail if not found)
home : str, optional
absolute home path of the project
(default is None, which will use the CWD as the project path)
gpu: boolean
try to run task on GPU (if available)
Returns
-------
Task
returns a Task entity as defined above
Examples
--------
You can use this function within a project repository to run tasks
in the following way.
>>> import datmo
>>> datmo.task.run(command="python script.py")
>>> datmo.task.run(command="python script.py", env='Dockerfile')
"""
if not home:
home = os.getcwd()
task_controller = TaskController(home=home)
# Create input dictionaries
snapshot_dict = {}
task_dict = {}
if env:
snapshot_dict["environment_definition_filepath"] = env
if isinstance(command, list):
task_dict["command_list"] = command
else:
if platform.system() == "Windows":
task_dict["command"] = command
elif isinstance(command, basestring):
task_dict["command_list"] = shlex.split(command)
task_dict["gpu"] = gpu
# Create the task object
core_task_obj = task_controller.create()
# Pass in the task
updated_core_task_obj = task_controller.run(
core_task_obj.id, snapshot_dict=snapshot_dict, task_dict=task_dict)
# Create a new task object for the
client_task_obj = Task(updated_core_task_obj, home=home)
return client_task_obj
[docs]def ls(session_id=None, filter=None, home=None):
"""List tasks within a project
The project must be created before this is implemented. You can do that by using
the following command::
$ datmo init
Parameters
----------
session_id : str, optional
session to filter output tasks
(default is None, which means no session filter is given)
filter : str, optional
a string to use to filter from message and label
(default is to give all snapshots, unless provided a specific string. eg: best)
home : str, optional
absolute home path of the project
(default is None, which will use the CWD as the project path)
Returns
-------
list
returns a list of Task entities (as defined above)
Examples
--------
You can use this function within a project repository to list tasks.
>>> import datmo
>>> tasks = datmo.task.ls()
"""
if not home:
home = os.getcwd()
task_controller = TaskController(home=home)
# add arguments if they are not None
if not session_id:
session_id = task_controller.current_session.id
core_task_objs = task_controller.list(
session_id, sort_key='created_at', sort_order='descending')
# Filtering Tasks
# TODO: move to list function in TaskController
# Add in preliminary tasks if no filter
filtered_core_task_objs = [
core_task_obj for core_task_obj in core_task_objs if not filter
]
# If filter is present then use it and only add those that pass filter
for core_task_obj in core_task_objs:
if filter and \
(filter in core_task_obj.command):
filtered_core_task_objs.append(core_task_obj)
# Return Task entities
return [
Task(filtered_core_task_obj, home=home)
for filtered_core_task_obj in filtered_core_task_objs
]