Source code for sparksteps.steps

# -*- coding: utf-8 -*-
"""Create EMR steps and upload files."""
import os
import tempfile
import zipfile

REMOTE_DIR = '/home/hadoop/'
S3_KEY_PREFIX = 'sparksteps/sources/'
S3_URI_FMT = "s3://{bucket}/{key}"


[docs]def get_basename(path): return os.path.basename(os.path.normpath(path))
[docs]def ls_recursive(dirname): """Recursively list files in a directory.""" for (dirpath, dirnames, filenames) in os.walk(os.path.expanduser(dirname)): for f in filenames: yield os.path.join(dirpath, f)
[docs]def zip_to_s3(s3_resource, dirpath, bucket, key): """Zip folder and upload to S3.""" with tempfile.SpooledTemporaryFile() as tmp: with zipfile.ZipFile(tmp, 'w', zipfile.ZIP_DEFLATED) as archive: for fpath in ls_recursive(dirpath): archive.write(fpath, get_basename(fpath)) tmp.seek(0) # Reset file pointer response = s3_resource.Bucket(bucket).put_object(Key=key, Body=tmp) return response
[docs]class CmdStep(object): on_failure = 'CANCEL_AND_WAIT' @property def step_name(self): raise NotImplementedError() @property def cmd(self): raise NotImplementedError() @property def step(self): return { 'Name': self.step_name, 'ActionOnFailure': self.on_failure, 'HadoopJarStep': { 'Jar': 'command-runner.jar', 'Args': self.cmd } }
[docs]class CopyStep(CmdStep): def __init__(self, bucket, filename): self.bucket = bucket self.filename = filename @property def step_name(self): return "Copy {}".format(self.filename) @property def cmd(self): return ['aws', 's3', 'cp', self.s3_uri, REMOTE_DIR] @property def key(self): return S3_KEY_PREFIX + self.filename @property def s3_uri(self): return S3_URI_FMT.format(bucket=self.bucket, key=self.key)
[docs]class DebugStep(CmdStep): on_failure = 'TERMINATE_CLUSTER' @property def step_name(self): return "Setup - debug" @property def cmd(self): return ['state-pusher-script']
[docs]class SparkStep(CmdStep): def __init__(self, app_path, submit_args=None, app_args=None): self.app = get_basename(app_path) self.submit_args = submit_args or [] self.app_args = app_args or [] @property def step_name(self): return "Run {}".format(self.app) @property def cmd(self): return (['spark-submit'] + self.submit_args + [self.remote_app] + self.app_args) @property def remote_app(self): return os.path.join(REMOTE_DIR, self.app)
[docs]class UnzipStep(CmdStep): def __init__(self, dirpath): self.dirpath = dirpath @property def step_name(self): return "Unzip {}".format(self.zipfile) @property def cmd(self): return ['unzip', '-o', self.remote_zipfile, '-d', self.remote_dirpath] @property def zipfile(self): return self.dirname + '.zip' @property def remote_zipfile(self): return os.path.join(REMOTE_DIR, self.zipfile) @property def dirname(self): return get_basename(self.dirpath) @property def remote_dirpath(self): return os.path.join(REMOTE_DIR, self.dirname)
[docs]class S3DistCp(CmdStep): on_failure = 'CONTINUE' def __init__(self, s3_dist_cp): self.s3_dist_cp = s3_dist_cp @property def step_name(self): return "S3DistCp step" @property def cmd(self): return ['s3-dist-cp'] + self.s3_dist_cp
[docs]def upload_steps(s3_resource, bucket, path): """Upload files to S3 and get steps.""" steps = [] basename = get_basename(path) if os.path.isdir(path): # zip directory copy_step = CopyStep(bucket, basename + '.zip') zip_to_s3(s3_resource, path, bucket, key=copy_step.key) steps.extend([copy_step, UnzipStep(path)]) else: copy_step = CopyStep(bucket, basename) s3_resource.meta.client.upload_file(path, bucket, copy_step.key) steps.append(copy_step) return steps
[docs]def setup_steps(s3, bucket, app_path, submit_args=None, app_args=None, uploads=None, s3_dist_cp=None): cmd_steps = [] paths = uploads or [] paths.append(app_path) for path in paths: cmd_steps.extend(upload_steps(s3, bucket, path)) cmd_steps.append(SparkStep(app_path, submit_args, app_args)) if s3_dist_cp is not None: cmd_steps.append(S3DistCp(s3_dist_cp)) return [s.step for s in cmd_steps]