# -*- coding: utf-8 -*-
"""Create EMR cluster."""
import getpass
import logging
import datetime
from sparksteps import steps
logger = logging.getLogger(__name__)
username = getpass.getuser()
[docs]def parse_conf(raw_conf_list):
"""Parse configuration items for spark-defaults."""
conf_dict = {}
for raw_conf in raw_conf_list:
if "=" in raw_conf:
key, value = raw_conf.split('=', 1)
conf_dict[key] = value
return conf_dict
[docs]def emr_config(release_label, keep_alive=False, **kw):
timestamp = datetime.datetime.now().replace(microsecond=0)
config = dict(
Name="{} SparkStep Task [{}]".format(username, timestamp),
ReleaseLabel=release_label,
Instances={
'InstanceGroups': [],
'KeepJobFlowAliveWhenNoSteps': keep_alive,
'TerminationProtected': False,
},
Applications=[{'Name': 'Hadoop'}, {'Name': 'Spark'}],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
)
for instance_group in ('master', 'core', 'task'):
num_instances = kw.get('num_{}'.format(instance_group), 0)
if instance_group != 'master' and not num_instances:
# We don't need this instance group.
continue
instance_type = kw.get('instance_type_{}'.format(instance_group))
if not instance_type:
raise ValueError('{} nodes specified without instance type.'.format(
instance_group.capitalize()))
instance_group_config = {
'Name': '{} Node{}'.format(instance_group.capitalize(),
's' if instance_group != 'master' else ''),
'Market': 'ON_DEMAND',
'InstanceRole': instance_group.upper(),
'InstanceType': instance_type,
'InstanceCount': 1 if instance_group == 'master' else num_instances
}
bid_price = kw.get('bid_price_{}'.format(instance_group))
if bid_price:
instance_group_config['Market'] = 'SPOT'
instance_group_config['BidPrice'] = bid_price
ebs_volume_size = kw.get('ebs_volume_size_{}'.format(instance_group), 0)
if ebs_volume_size:
ebs_configuration = {
'EbsBlockDeviceConfigs': [{
'VolumeSpecification': {
'VolumeType': kw.get('ebs_volume_type_{}'.format(instance_group)),
'SizeInGB': ebs_volume_size
},
'VolumesPerInstance': kw.get('ebs_volumes_per_{}'.format(instance_group), 1)
}],
'EbsOptimized': kw.get('ebs_optimized_{}'.format(instance_group), False)
}
instance_group_config['EbsConfiguration'] = ebs_configuration
config['Instances']['InstanceGroups'].append(instance_group_config)
if kw.get('name'):
config['Name'] = kw['name']
if kw.get('ec2_key'):
config['Instances']['Ec2KeyName'] = kw['ec2_key']
if kw.get('ec2_subnet_id'):
config['Instances']['Ec2SubnetId'] = kw['ec2_subnet_id']
if kw.get('debug', False) and kw.get('s3_bucket'):
config['LogUri'] = 's3://%s/logs/sparksteps/' % kw['s3_bucket']
config['Steps'] = [steps.DebugStep().step]
if kw.get('tags'):
config['Tags'] = parse_tags(kw['tags'])
if kw.get('defaults'):
config['Configurations'] = [{'Classification': 'spark-defaults',
'Properties': parse_conf(kw['defaults'])}]
if kw.get('maximize_resource_allocation'):
configurations = config.get('Configurations', [])
configurations.append({
'Classification': 'spark',
'Properties': {'maximizeResourceAllocation': 'true'}
})
config['Configurations'] = configurations
if kw.get('bootstrap_script'):
config['BootstrapActions'] = [{'Name': 'bootstrap',
'ScriptBootstrapAction': {'Path': kw['bootstrap_script']}}]
return config