Getting Started with AWS SDK for Java (3)

By , 2016年2月13日 10:40 上午

This is the 3rd part of my tutorial on “Getting Started with AWS SDK for Java”. If you have not already do so, I suggest that you first take a look at the first chapter of this set of training “Getting Started with AWS SDK for Java (1)” to properly set up your development environment. In this part, we will cover the basic concepts related to the DataPipelineClient. Through this example, you will be able to create and activate a simple pipeline with a ShellCommandActivity running on an Ec2Resource.

Before you get started with this demo, you should get yourself familiar with what Data Pipeline is. In particular, the following AWS documentation on “Data Pipeline Concepts” is very helpful.

http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-concepts.html

First of all we create an instance of the DataPipelineClient in the constructor, then set the region to ap-southeast-2. For debugging purposes, we enable logging using log4j.

public class DemoDataPipeline
{
	static DataPipelineClient client;
	final static Logger logger = Logger.getLogger(DemoDataPipeline.class);

	/**
	 *
	 * Constructor
	 *
	 */

	public DemoDataPipeline()
	{
		// Create the DataPipelineClient
		client = new DataPipelineClient();
		// Set the region to ap-southeast-2
		client.configureRegion(Regions.AP_SOUTHEAST_2);
	}

We use the createPipeline() method in DataPipelineClient to create a new pipeline. This methods takes a CreatePipelineRequest as the parameter, which requires a name and a unique id for the pipeline to be created. Here we use the java.util.UUID utility to generate a unique id for the pipeline. This creates an empty pipeline for us.

	public void createPipeline() throws Exception
	{
		System.out.println("CREATE PIPELINE.");
		
		CreatePipelineRequest request = new CreatePipelineRequest();
		request.setName("Java SDK Demo");
		String uuid = UUID.randomUUID().toString();
		request.setUniqueId(uuid);
		client.createPipeline(request);
	}

We can use the listPipelines() method in DataPipelineClient to get a list of the pipelines. This returns a ListPipelinesResult, which includes a list of PipelineIdName objects. We traverse through this list to obtain the id and name of all the pipelines.

	public void listPipeline() throws Exception
	{
		System.out.println("LIST PIPELINE.");
		
		ListPipelinesResult result = client.listPipelines();
		List list = result.getPipelineIdList();
		for (PipelineIdName pipeline : list)
		{
			System.out.println(pipeline.getId() + "\t- " + pipeline.getName());
		}
	}

Now we have the id of the newly created pipeline. In the AWS SDK for Java, pipeline components specifying the data sources, activities, schedule, and preconditions of the workflow are represented in PipelineObject. The following code defines a Default object, a Schedule object, an Ec2Resource object, and a ShellCommandActivity object. A PipelineObject is a collection of key-value fields. For example, the following JSON string defines an Ec2Resource in a VPC:

{
“id” : “MyEC2Resource”,
“type” : “Ec2Resource”,
“actionOnTaskFailure” : “terminate”,
“actionOnResourceFailure” : “retryAll”,
“maximumRetries” : “1”,
“instanceType” : “m1.medium”,
“securityGroupIds” : [
“sg-12345678”,
“sg-12345678”
],
“subnetId”: “subnet-12345678”,
“associatePublicIpAddress”: “true”,
“keyPair” : “my-key-pair”
}

When the value of a key is another pipeline object, we use Field().withKey(“field_name”).withRefValue(“object_id”) to represent the key-value pair. Otherwise, we use Field().withKey(“field_name”).withStringValue(“field_value”) to represent the key-value pair. Please refer to the part of ShellCommandActivity in the following code for details.

	public void definePipeline(String id) throws Exception
	{
		System.out.println("Define PIPELINE.");

		// Definition of the default object
		Field defaultType = new Field().withKey("scheduleType").withStringValue("CRON");
		Field defaultScheduleType = new Field().withKey("schedule").withRefValue("RunOnceSchedule");
		Field defaultFailureAndRerunMode = new Field().withKey("failureAndRerunMode").withStringValue("CASCADE");
		Field defaultRole = new Field().withKey("role").withStringValue("DataPipelineDefaultRole");
		Field defaultResourceRole = new Field().withKey("resourceRole").withStringValue("DataPipelineDefaultResourceRole");
		Field defaultLogUri = new Field().withKey("pipelineLogUri").withStringValue("s3://331982-syd/java-dp-log");
		List defaultFieldList = Lists.newArrayList(defaultType, defaultScheduleType, defaultFailureAndRerunMode, defaultRole, defaultResourceRole, defaultLogUri);
		PipelineObject defaultObject = new PipelineObject().withName("Default").withId("Default").withFields(defaultFieldList);

		// Definition of the pipeline schedule
		Field scheduleType = new Field().withKey("type").withStringValue("Schedule");
		Field scheduleStartAt = new Field().withKey("startAt").withStringValue("FIRST_ACTIVATION_DATE_TIME");
		Field schedulePeriod = new Field().withKey("period").withStringValue("1 day");
		Field scheduleOccurrences = new Field().withKey("occurrences").withStringValue("1");
		List scheduleFieldList = Lists.newArrayList(scheduleType, scheduleStartAt, schedulePeriod, scheduleOccurrences);
		PipelineObject schedule = new PipelineObject().withName("RunOnceSchedule").withId("RunOnceSchedule").withFields(scheduleFieldList);

		// Definition of the Ec2Resource
		Field ec2Type = new Field().withKey("type").withStringValue("Ec2Resource");
		Field ec2TerminateAfter = new Field().withKey("terminateAfter").withStringValue("15 minutes");
		List ec2FieldList = Lists.newArrayList(ec2Type, ec2TerminateAfter);
		PipelineObject ec2 = new PipelineObject().withName("Ec2Instance").withId("Ec2Instance").withFields(ec2FieldList);

		// Definition of the ShellCommandActivity
		// The ShellCommandActivity is a command "df -h"
		Field activityType = new Field().withKey("type").withStringValue("ShellCommandActivity");
		Field activityRunsOn = new Field().withKey("runsOn").withRefValue("Ec2Instance");
		Field activityCommand = new Field().withKey("command").withStringValue("df -h");
		Field activityStdout = new Field().withKey("stdout").withStringValue("s3://331982-syd/dp-java-demo-stdout");
		Field activityStderr = new Field().withKey("stderr").withStringValue("s3://331982-syd/dp-java-demo-stderr");
		Field activitySchedule = new Field().withKey("schedule").withRefValue("RunOnceSchedule");
		List activityFieldList = Lists.newArrayList(activityType, activityRunsOn, activityCommand, activityStdout, activityStderr, activitySchedule);
		PipelineObject activity = new PipelineObject().withName("DfCommand").withId("DfCommand").withFields(activityFieldList);

		// setPipelineObjects
		List objects = Lists.newArrayList(defaultObject, schedule, ec2, activity);

		// putPipelineDefinition
		PutPipelineDefinitionRequest request = new PutPipelineDefinitionRequest();
		request.setPipelineId(id);
		request.setPipelineObjects(objects);
		PutPipelineDefinitionResult putPipelineResult = client.putPipelineDefinition(request);

		if (putPipelineResult.isErrored()) 
		{
			logger.error("Error found in pipeline definition: ");
			putPipelineResult.getValidationErrors().stream().forEach(e -> logger.error(e));
		}

		if (putPipelineResult.getValidationWarnings().size() > 0) 
		{
			logger.warn("Warnings found in definition: ");
			putPipelineResult.getValidationWarnings().stream().forEach(e -> logger.warn(e));
		}
	}

Now you can activate the pipeline for execution:

	public void activatePipeline(String id) throws Exception
	{
		System.out.println("ACTIVATE PIPELINE.");	

		ActivatePipelineRequest request = new ActivatePipelineRequest();
		request.setPipelineId(id);
		client.activatePipeline(request);
	}

Then, you can terminate the pipeline:

	public void deletePipeline(String id) throws Exception
	{
		System.out.println("DELETE PIPELINE.");	

		DeletePipelineRequest request = new DeletePipelineRequest();
		request.setPipelineId(id);
		client.deletePipeline(request);
	}

After checking out the demo code from github, you should modify the code to use your own S3 bucket for logging, as well as the stdout and stderr for the ShellCommandActivity. After making these changes, you can run the demo code using the following commands:

$ mvn clean; mvn compile; mvn package
$ java -cp target/demo-1.0-SNAPSHOT.jar:third-party/guava-18.0.jar -Dlog4j.configurationFile=log4j2.xml net.qyjohn.aws.DemoDataPipeline create
$ java -cp target/demo-1.0-SNAPSHOT.jar:third-party/guava-18.0.jar -Dlog4j.configurationFile=log4j2.xml net.qyjohn.aws.DemoDataPipeline list
$ java -cp target/demo-1.0-SNAPSHOT.jar:third-party/guava-18.0.jar -Dlog4j.configurationFile=log4j2.xml net.qyjohn.aws.DemoDataPipeline define df-0098814S3FS9ACXICID  (make sure you change this part using your own pipeline id)
$ java -cp target/demo-1.0-SNAPSHOT.jar:third-party/guava-18.0.jar -Dlog4j.configurationFile=log4j2.xml net.qyjohn.aws.DemoDataPipeline activate df-0098814S3FS9ACXICID  (make sure you change this part using your own pipeline id)
$ java -cp target/demo-1.0-SNAPSHOT.jar:third-party/guava-18.0.jar -Dlog4j.configurationFile=log4j2.xml net.qyjohn.aws.DemoDataPipeline delete df-0098814S3FS9ACXICID  (make sure you change this part using your own pipeline id)

Leave a Reply

Panorama Theme by Themocracy