AWS EMR Cluster Class Using Boto

I’ve been working with boto for a little while now, and while I know that there are quite a few examples out there on how to spin up an AWS EMR cluster, I couldn’t find anything that put everything together. With that in mind, I present a python class which provides the user with the means to edit cluster attributes, add steps, start a cluster, ssh into the cluster as well as terminate the cluster, in a convenient package.

The entire code can be pulled from here, but I give a brief overview of each method to, hopefully, shine some light onto something that took me a while to hammer out.

I recommend pulling the code from Github vs. copying from here as there may be whitespace or indentation issues. The self_test method gives an example of a practical use of the class.

**Versions: boto 2.38.0, python 2.7

**Assumptions: boto is installed, .boto file is configured (how to)

__init__:

The cluster class can be instantiated with the following keyword arguments (and their default values):

**Note: Some of the default keyword args will have to be adjusted for your application. ami version, hadoop, hive, etc. versions can supposedly be set to ‘latest’ but I haven’t had much luck with that, so I manually set the version to the latest version available at the time of writing this.

cluster_name = "Test Cluster",
cluster_region="us-ease-1",
cluster_zone="us-east-1e",
cluster_market = "SPOT",
spot_price = "2.02",
ami_version="3.1.2",
hadoop_version = "2.4.0",
keep_alive = True,
log_location = "S3://cluster_logs/",
tags={"key":"value"},
hive_version = "0.11.0.2",
impala_version = "1.2.4",
bootstrap_script = "",
ec2_keyname = "",
pem_key = "",
debugging = True,
failure_action = "CANCEL_AND_WAIT",
master_type = "m3.xlarge",
core_type = "m3.xlarge",
num_core_nodes = 10
__author__ = 'brentonmallen'

import time
import boto
import boto.emr
import boto.emr.emrobject
from boto.emr.instance_group
import InstanceGroup
from boto.manage.cmdshell
import sshclient_from_instance
from os.path import expanduser, isfile
import sys

class emr_cluster()
 def __init__(self, cluster_name = "Test Cluster", cluster_region="us-ease-1",
     cluster_zone="us-east-1e", cluster_market = "SPOT", spot_price = "2.02",
     ami_version="3.1.2", hadoop_version = "2.4.0", keep_alive = True,
     log_location = "S3://cluster_logs/", tags={"key":"value"},
     hive_version = "0.11.0.2", impala_version = "1.2.4", bootstrap_script = "",
     ec2_keyname = "", pem_key = "", debugging = True, failure_action = "CANCEL_AND_WAIT",
     master_type = "m3.xlarge", core_type = "m3.xlarge", num_core_nodes = 10):
 
     """ This is a AWS EMR Cluster class. Used to spin up, interact with and terminate
     an Amazon Web Service EMR cluster.
     
     PARAMETERS:
     cluster_name - (str) Name of the cluster. As it will appear in the AWS EMR Console cluster list
     cluster_region - (str) AWS compute region
     cluster_zone - (str) AWS compute zone
     cluster_market - (str) cluster market [on demand or spot]
     spot_price - (str) spot bid price
     ami_version - (str) version of AMI to be used
     hadoop_version - (str) version of hadoop to be used
     keep_alive - (bool) keep the cluster running when jobflow is finished
     log_location - (str) s3 location to stash the cluster logs
     tags - (dict) key:value pairs of cluster tags
     hive_version - (str) version of Hive to be installed
     impala_version - (str) version of impala to be installed
     bootstrap_script - (str) directory to shell script to be run at cluster bootstrap ['~/bash.sh']
     ec2_keyname - (str) cluster security key name
     pem_key - (str) location of pem key file ['~/key.pem']
     debugging - (bool) enable cluster debugging
     failure_action - (str) step to take if step fails
     master_type - (str) ec2 instance type
     core_type - (str) ec2 instance type
     num_core_nodes - (int)
     
     METHODS:
     check_pem_key - checks to see if the pem key exists in the specified location
     check_cluster_exists - checks to see if a cluster with the given name is already running
     load_cluster - if the cluster exists, connect to that cluster
     set_instance_group - compiles the cluster instance group
     set_cluster_steps - compiles the list of steps
     start_cluster - starts the cluster request
     get_connection - sets up the connection to aws
     get_cluster_status - returns the cluster's current status
     get_cluster_dns - returns the cluster's master public DNS
     get_cluster_ssh - returns a ssh client object to the cluster's master instance
     kill_cluster - terminates the cluster
     """ 
 
 # SET/INITIALIZE COMMON ATTRIBUTES
     self.cluster_name = cluster_name # set cluster name
     self.pem_key = pem_key # set pem key location
     self.check_pem_key(self.pem_key) # check that the pem exists
     self.cluster_region = cluster_region # set cluster region
     self.cluster_zone = cluster_zone # set cluster zone
     self.get_connection() # establish connection with aws
     self.cluster_dns = "" # initialize dns value attribute
     self.cluster_ssh = None # initialize ssh client object attribute
 
 # CHECK FOR EXISTING, RUNNING CLUSTER
     self.check_cluster_exists(self.cluster_name)
     if self.cluster_exists is False: # if cluster does not exist 
 
         # GENERAL CLUSTER PARAMETERS
         self.cluster_market = cluster_market
         self.cluster_spot_price = spot_price
         self.cluster_ami_version = ami_version
         self.cluster_hadoop_version = hadoop_version
         self.cluster_keep_alive = keep_alive
         self.cluster_log_location = log_location
         self.cluster_tags = tags
         self.cluster_hive_version = hive_version
         self.cluster_impala_version = impala_version
         self.cluster_bootstrap_script_location = bootstrap_script
         self.cluster_ec2_keyname = ec2_keyname
         self.cluster_enable_debugging = debugging
         self.cluster_action_on_failure = failure_action
 
         # CONTINUE, TERMINATE_JOB_FLOW # MASTER NODE PARAMETERS
         self.master_node_name = "Cluster Master"
         self.master_number_nodes = 1
         self.master_instance_type = master_type
         self.master_node_role = "MASTER"
 
        # CORE NODE PARAMETERS
        self.core_node_name = "Cluster Core"
        self.core_number_nodes = num_core_nodes
        self.core_instance_type = core_type
        self.core_node_role = "CORE" 
        
        # INIT METHODS 
        self.set_instance_group() # compile node instances
        self.set_cluster_steps() # compile cluster steps
    else:
        print "Cluster [{}] exists. Connected.".format(self.cluster_name)

Check PEM Key:

This method checks that the pem key specified does indeed exist. The input argument is a string of the path to the pem file (i.e. “~/path/to/key.pem”)

 def check_pem_key(self, pem_key_location):
    """ Checks to see if the pem key specified exists
    :param pem_key_location: (str) path to the pem key file check_pem_key("~/path/to/key.pem")
    """ 
    if not isfile(pem_key_location): 
        error_string = "pem key doesn't exist! Please specify the pem key location using the pem_key keyword argument"
        raise ValueError(error_string)
    else:
        pass

Cluster Check:

This method checks to see if a cluster, by the same name (cluster_name keyword argument), is already running.

 def check_cluster_exists(self, cluster_name):
    """ Check to see if the cluster already exists and is available
    :param cluster_name: (str) name of the cluster as it appears in the AWS EMR Console
    """
    # get a list of all clusters in an 'operational' state
    # this is the list of clusters on the account in the specified region
    list_of_clusters = self.aws_connection.list_clusters( cluster_states=["STARTING", "RUNNING", "WAITING", "BOOTSTRAPPING"])
    cluster_list_names = [cluster.name for cluster in list_of_clusters]
    # gather a list of cluster names
    
    if cluster_name in cluster_list_names:
        self.cluster_exists = True
        self.load_cluster(cluster_name) # if the cluster exists, go ahead and use it
    else:
    self.cluster_exists = False
    print "No cluster by the name [{}] is currently running".format(cluster_name)
    print "Creating one. It must be started with the START_CLUSTER mentod" 

Load Existing Cluster:

This method is used to connect to an existing cluster. If the a cluster with the same name is currently running, the object will connect to that cluster instead of starting a new one.

def load_cluster(self, cluster_name):
    """ Load the cluster with the input name
    :param cluster_name: (str) name of the cluster as it appears in the AWS EMR Console
    """
    
    list_of_clusters = self.aws_connection.list_clusters(
            cluster_states=["STARTING", "RUNNING", "WAITING", "BOOTSTRAPPING"]) for clstr, cluster in enumerate(list_of_clusters.clusters):
        if str(cluster.name) == cluster_name:
            self.cluster_id = cluster.id
            self.cluster_dns = self.get_cluster_dns()
            # self.cluster_ssh = self.get_cluster_ssh()
        else:
            continue

Set Instances:

This method compiles the specified attributes into a set of ec2 instances for the EMR cluster. Here I establish master and core nodes but not task nodes. These can be easily added if need be.

 def set_instance_group(self):
    """ Set the parameters for the master and core nodes Task nodes can
    be added by following the same format and setting the role to "TASK"
    """
    self.instance_group = []
    self.instance_group.append(InstanceGroup( num_instances = self.master_number_nodes,
            role = self.master_node_role,
            type = self.master_instance_type,
            market = self.cluster_market,
            name = self.master_node_name,
            bidprice = self.cluster_spot_price
            ))
    
    self.instance_group.append(InstanceGroup( num_instances = self.core_number_nodes, role = self.core_node_role, type = self.core_instance_type, market = self.cluster_market, name = self.core_node_name, bidprice = self.cluster_spot_price )) 

Set Steps:

This method compiles a list of bootstrapping actions and steps. Here, the default bootstrapping action includes installing Impala and the default step includes installing hive. More steps can be added to install other software packages, edit cluster parameters, run bash scripts and even perform MapReduce stream jobs. For some details check out the boto documentation here. I will make another post demonstrating a step to increase the MapReduce heap size.

 def set_cluster_steps(self):
    """ Creates a list of bootstrap actions and a list of steps
    The default steps here install impala and hive. More steps can be appened to the list
    """
    
    self.cluster_bootstrap_steps = []
    self.cluster_steps = []
    self.cluster_install_impala_step = boto.emr.BootstrapAction("Install Impala",
                "s3://elasticmapreduce/libs/impala/setup-impala",
                ["--base-path","s3://elasticmapreduce","--impala-version",
                self.cluster_impala_version]
                )
    
    self.cluster_bootstrap_steps.append(self.cluster_install_impala_step)
    self.hive_install_step = boto.emr.step.InstallHiveStep(hive_versions = self.cluster_hive_version)
    self.cluster_steps.append(self.hive_install_step)

Start Cluster:

This method starts the cluster. If a cluster by the same name exists, this method is bypassed.

 def start_cluster(self):
    """ Initiate the cluster.
    """
    if self.cluster_exists is False: # if the cluster doesn't already exist, request one
        print "\nPlease Wait while a cluster is being provisioned..."
        self.cluster_id = self.aws_connection.run_jobflow( self.cluster_name,
                    instance_groups = self.instance_group,
                    action_on_failure = self.cluster_action_on_failure,
                    keep_alive = self.cluster_keep_alive,
                    enable_debugging = self.cluster_enable_debugging,
                    log_uri = self.cluster_log_location,
                    hadoop_version = self.cluster_hadoop_version,
                    availability_zone = self.cluster_zone,
                    ami_version = self.cluster_ami_version,
                    steps = self.cluster_steps ,#boto.emr.step.InstallHiveStep(hive_versions="latest"),
                    bootstrap_actions=self.cluster_bootstrap_steps,
                    ec2_keyname = self.cluster_ec2_keyname, # need to figure this out visible_to_all_users=True
                    ) 
        # Add tags
        self.aws_connection.add_tags(self.cluster_id, self.cluster_tags)
    else: # if the cluster already exists, do nothing 
        pass

Establish AWS Connection:

This method establishes an AWS connection, in the specified region.

def get_connection(self):
    """ Establish a connection with S3 """
    self.aws_connection = boto.emr.connect_to_region(self.cluster_region) # connection to EMR

Obtain Cluster Status:

This method grabs the current status of the cluster and returns it as a string. The possible values for the cluster’s state can be found here.

 def get_cluster_status(self):
    """ Get the current status of the running cluster :return: (str) current cluster status status """
    try:
        status = str(self.aws_connection.describe_jobflow(self.cluster_id).state)
        return status
    except:
        pass
 

Obtain Cluster DNS:

This method grabs the cluster’s master DNS if it is available. If the DNS is not available (cluster is still being provisioned) this method will wait until it is. The master DNS is returned as a string.

def get_cluster_dns(self):
    """ Get the cluster's MASTER PUBLIC DNS :return: (str) cluster's master DNS """
    printout = 0
    if self.cluster_dns == "":
        # if the dns is not available, continue polling until it's available
        while self.cluster_dns == "":
            try:
                self.cluster_dns = str(self.aws_connection.describe_cluster(self.cluster_id).__dict__['masterpublicdnsname'])
                return self.cluster_dns
            except:
                if printout == 0:
                    print "Waiting for DNS to become available..."
                    printout = 1 time.sleep(1)
    else:
        # if the cluster is already up and running, grab its dns
        self.cluster_dns = str(self.aws_connection.describe_cluster(self.cluster_id).__dict__['masterpublicdnsname'])
        return self.cluster_dns

Establish Cluster SSH Connection:

This method establishes a SSH connection with the cluster. It calls the get_cluster_dns method to obtain the master DNS. A SSH Client object is returned and a list of available methods can be found here under cmdShell.

def get_cluster_ssh(self):
        """
        This will create a SSH client object with a connection to the cluster's master instance
        Refer to http://boto.readthedocs.org/en/latest/ref/manage.html?highlight=ssh#boto.manage.cmdshell.SSHClient
        for all possible methods on the ssh client object
        :param ssh_script: Self
        :return: a boto ssh client object
        """
        printout = 0

        if self.cluster_dns == "":
            while self.cluster_dns == "":

                try:
                	# if the dns doesn't exist, go grab it
                    self.get_cluster_dns() 

                    instance = self.aws_connection.list_instances(self.cluster_id).instances
                    # find the master instance out of all cluster instances
                    for inst in instance:
                        if inst.publicdnsname == self.cluster_dns: 
                            master_instance = inst
                            break
                        else:
                            continue
					# this is a hack, for some reason boto cmdshell looks for a nonexistant dns_name attribute
                    master_instance.dns_name = master_instance.publicdnsname 

                    ssh_client = sshclient_from_instance(master_instance, ssh_key_file = self.pem_key,
                                                   user_name = "hadoop", ssh_pwd = None) # establish the ssh connection
                    return ssh_client
                except:

                    if printout == 0:
                        print "Waiting for DNS to become available..."
                        printout = 1

                    time.sleep(1)

        else:
        	# if the dns exists, grab the master dns and establish a connection
            instance = self.aws_connection.list_instances(self.cluster_id).instances 

            for inst in instance:
                if inst.publicdnsname == self.cluster_dns:
                    master_instance = inst
                    break
                else:
                    continue

            master_instance.dns_name = master_instance.publicdnsname # this is a hack, for some reason boto cmdshell looks for this attribute

            ssh_client = sshclient_from_instance(master_instance, ssh_key_file = self.pem_key,
                                           user_name = "hadoop", ssh_pwd = None)
            return ssh_client

Terminate Cluster:

This method terminates the cluster.

def kill_cluster(self):
    """
    Terminate the cluster once started.
    """
    try:
        self.aws_connection.terminate_jobflow(self.cluster_id)
        print "Terminating Cluster, please wait..."
        time.sleep(5)
        print "Cluster Status: {}".format(str(self.get_cluster_status()))
    except:
        print "Cluster Not Terminated (was one even started?)"

Self Test:

The self test is called in __main__ to ensure the class works properly. This is also a good example of how this class should be used. A cluster (1 master and 1 core node) is requested. Once the cluster has been provisioned, a test directory is created in the home directory. Once the existence of the test directory has been verified, the cluster is terminated.

def self_test():
    """
    Instantiates a cluster and starts a self test
        1. start the cluster
        2. when it's started, ssh and make a test directory
        3. check to see if that directory was indeed created
        4. terminate cluster
    :return:
    """

    print "initiating cluster class self test..."

    test = emr_cluster(num_core_nodes=1)
    test.start_cluster()

    cluster_status = test.get_cluster_status()

    while cluster_status != "WAITING":
        time.sleep(10)
        cluster_status = test.get_cluster_status()
        if cluster_status in ["SHUTTING_DOWN","TERMINATED"]:
            print "Cluster is unexpectedly terminating. Please consult AWS EMR Console."

    test.cluster_ssh = test.get_cluster_ssh()
    test.cluster_ssh.run("mkdir /home/hadoop/testdir")[1] # run command and return stdout
    directoryCheck = test.cluster_ssh.run("ls")[1].split("\n")

    if "testdir" in directoryCheck:
        print "Self test successful. Terminating Cluster, please wait"
        test.kill_cluster()
        
        cluster_status = test.get_cluster_status()
        
        while cluster_status != "TERMINATED":
            cluster_status = test.get_cluster_status()
            time.sleep(5)


    else:
        print "Self test failed! \nThe {} cluster will remain active for investigation".format(test.cluster_name)
        print "\nCluster must be terminated manually."

I hope this helps. If there are any questions or issues, please let me know.