Weave

Running YARN apps as simply as running Java threads.

View project onGitHub

Weave is now Apache Twill

Weave has been accepted into the Apache Incubator under its new name: Twill. Development of Weave will be discontinued. Please go to Apache Twill (twill.incubator.apache.org) for all future issues and requests.

What is Weave?

At Continuuity, we use Apache YARN as an integral part of our products because of its vision to support a diverse set of applications and processing patterns. One such product is BigFlow, our realtime distributed stream-processing engine. Apache YARN is used to run and manage BigFlow applications including lifecycle and runtime elasticity. During our journey with YARN we have come to the realization that it is extremely powerful but its full capability is challenging to leverage. It is difficult to get started, hard to test and debug, and complex to build new kinds of non-MapReduce applications and frameworks on.

Weave is a simple set of libraries that allows you to easily manage distributed applications through an abstraction layer built on Apache YARN. Weave allows you to use YARN’s distributed capabilities with a programming model that is similar to running threads. Weave is NOT a replacement for Apache YARN. It is instead a value-added framework that operates on top of Apache YARN.

Why do I need Weave?

Weave dramatically simplifies and reduces your development efforts, enabling you to quickly and easily manage your distributed applications through its simplified abstraction layer built on YARN. YARN can be quite difficult to use and requires a large ramp up effort since it is built specifically for MapReduce and is typically meant for managing batch jobs. YARN, however, can be used as a generalized custom resource management tool that can run any type of job. In additon to running batch jobs, cluster can be used for running real time jobs and long running job. Unfortunately, YARN’s capabilities are too low level to allow you to quickly develop an application, requiring a great deal of boilerplate code even for simple applications. Additionally, its logging output is not available until the application is finished. This becomes a serious challenge when managing long running jobs: since those jobs never finish you cannot view the logs, which makes it very difficult to develop and debug such applications. Finally, YARN does not provide standard support for application lifecycle management, communication between containers and the Application Master, and handling application level errors.

Continuuity Weave provides you with the following benefits:

  • A simplified API for specifying, running and managing applications
  • A simplified way to specify and manage the stages of the application lifecycle
  • A generic Application Master to better support simple applications
  • Log & metrics aggregation for application
  • Simplified archive management
  • Improved control over application logs, metrics and errors
  • Discovery service

Getting Started

Building the Weave Library

$ git clone http://github.com/continuuity/weave.git
$ cd weave
$ mvn install

Quick Example

Let's begin by building a basic EchoServer in Weave. Traditionally, when you build a server as simple as this, you add logic within a Runnable implementation to run it in within a Thread using an appropriate ExecutorService:

public class EchoServer implements Runnable {
  private static Logger LOG = LoggerFactory.getLogger(EchoServer.class);
  private final ServerSocket serverSocket;

  public EchoServer() {
    ...
  }

  @Override
  public void run() {
    while ( isRunning() ) {
      Socket socket = serverSocket.accept();
      ...
    }
  }
}

Our example defines an implementation of Runnable that implements the run method. The EchoServer is now a Runnable that can be executed by an ExecutorService in a Thread:

...
ExecutorService service = Executors.newFixedThreadPool(2);
service.submit(new EchoServer());
...

The above model is familiar, but now assume you want to run your EchoServer on a YARN cluster. To do this, all you need to do is implement the WeaveRunnable interface similarly to how you normally implement Runnable:

Implementing WeaveRunnable

public class EchoServer implements WeaveRunnable {
  private static Logger LOG = LoggerFactory.getLogger(EchoServer.class);
  private final ServerSocket serverSocket;
  private final int port;

  public EchoServer() {
    ...
  }

  @Override
  public void run() {
    while ( isRunning() ) {
      Socket socket = serverSocket.accept();
      ...
    }
  }
}

The EchoServer implements WeaveRunnable, which implements Runnable. By doing this you can run a WeaveRunnable implementation within a Thread and also in a container on a YARN cluster. In order to run EchoServer on the YARN cluster, you must first create a WeaveRunnerService, which is similar to ExecutorService. To run on the YARN cluster, specify the YARN cluster configuration and connection string to a running instance of a Zookeeper service:

Starting the YARN Runner Service

WeaveRunnerService runnerService = new YarnWeaveRunnerService(
  new YarnConfiguration(), zkConnectStr);
runnerService.startAndWait();

We have initialized WeaveRunnerService. We can now prepare to run the EchoServer on the YARN cluster, by attaching a log handler that ensures all logs generated by EchoServer across all nodes in the cluster are centralized on the client:

Preparing to Run WeaveRunnable

WeaveController controller = runnerService.prepare(new EchoServer())
  .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
  .start();

Now that we have started, prepared, and launched EchoServer to run on the YARN cluster, we can attach listeners that allow you to observe state transitions in your application.

Attaching Listeners for State Transitions

controller.addListener(new ListenerAdapter() {
  @Override
  public void running() {
    LOG.info('Echo Server Started');
  }
}

To stop the running EchoServer, use the controller returned during the start of the application:

Stopping WeaveRunnable

controller.stop().get();

This will shut down the application master and all the configured containers. Note that you do not need to specify the archives that must be shipped to remote machines on the YARN cluster (where the container will run). This is all taken care by Weave.

Advanced Examples

Discovery Service

The EchoServer is useful only if it's discoverable. Clients who want to access the server running in the cluster must be able to connect to the service and talk to it. Weave helps you accomplish this important purpose by exposing a discovery service that allows your running Weave application to announce itself on the cluster, making it possible for the client to discover and connect to it. Let's see how we can add this capability to the EchoServer. The EchoServer will start on a port available on the machine on which it started, and then announce its presence via the Weave discovery service API. In this example, the class extends AbstractWeaveRunnable, which in turn implements the WeaveRunnable, which implements Runnable:

WeaveRunnable with Discovery Announce

public class EchoServer extends AbstractWeaveRunnable {
  private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);

  @Override
  public void initialize(WeaveContext context) {
    super.initialize(context);
    ...
    try {
      serverSocket = new ServerSocket(0); // start on any available port.
      context.announce("echo", serverSocket.getLocalPort());
    } catch (IOException e) {
      throw Throwables.propogate(e);
    }
  }

  @Override
  public void run() {
    ...
  }
}

During the initialization phase of the container, WeaveContext uses the port on which EchoServer was started to announce its presence. This enables clients to discover the echo service using a live iterator:

Client Discovery

...
WeaveController controller = ....
...
Iterable echoServices = controller.discoverService("echo");
...
for(Discoverable discoverable : echoServices) {
  Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
                             discoverable.getSocketAddress().getPort());
  ...
}

Logging

In the above examples a log handler was attached when preparing to run a implementation of WeaveRunnable. It is used for collecting all logs emitted by the containers. Logs are returned to the client to take an action. This way, when you debug the application you don't have to leave your IDE running on the YARN cluster. Within the container, you use a standard SLF4J logger to log messages. The logs are hijacked and sent through the Kafka broker to the client. With every application that is launched, a Kafka broker is started within the Application Master.

SLF4J logger for logging

public class EchoServer extends AbstractWeaveRunnable {
  private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
  ...
  @Override
  public void run() {
    ...
    LOG.info('New client accepted');
    ...
  }
  ...
}

Resource Specification

While you prepare an implementation of WeaveRunnable to run on the YARN cluster, you must specify the resources used to run the container. Assets like the number of cores to be used, amount of memory, and number of instances can be specified. This internally uses Cgroups to limit the amount of system resources used by the container:

Specifying Resource Constraints for Container

WeaveController controller = runnerService.prepare(new EchoServer(port),
  ResourceSpecification().Builder().with()
    .setCores(1)
    .setMemory(1, ResourceSpecification.SizeUnit.GIGA)
    .setInstances(2).build())
    .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
    .start();

Archive Management

In order to run in a container on the YARN cluster, all the necessary jars must be marshalled to the node on which the container is running. This is all internally handled by Weave, but the APIs also allow you to specify additional files to be marshalled to the container where it's running.

Application

A WeaveApplication is a collection of distributed WeaveRunnable working together. For example, suppose you have a web application that you would like to deploy on a cluster running YARN. You will need instances of a jetty server and all associated files to serve the application:

Specifying WeaveApplication

public class WebApplication implements WeaveApplication {
  @Override
  public WeaveSpecification configure() {
    return WeaveSpecification().Builder.with()
      .setName("My Web Application")
      .withRunnables()
         .add(new JettyWebServer())
         .withLocalFiles()
            .add("html-pages.tgz", pages, true)
         .apply()
         .add(new LogsCollector())
      .anyOrder()
      .build();
  }
}

Once you have defined an application in Weave, you can simply run it the same way you run WeaveRunnable. You might notice from the above example that Weave applications support the order in which the WeaveRunnables are started on the cluster. The example specifies no order, so all the WeaveRunnables can start concurrently. However, you can modify the behavior as follows:

Ordering

public class WebApplication implements WeaveApplication {
  @Override
  public WeaveSpecification configure() {
    return WeaveSpecification().Builder.with()
      .setName("My Web Application")
      .withRunnables()
         .add("jetty", new JettyWebServer())
         .withLocalFiles()
            .add("html-pages.tgz", pages, true)
         .apply()
         .add("log", new LogsCollector())
      .order()
         .first("log")
         .next("jetty")
      .build();
  }
}

Documentation & Talks

API

Talks

Community

How to Contribute

Are you interested in making Weave better? Our development model is a simple pull-based model with a consensus building phase, similar to the Apache's voting process. If you think that you help make Weave better, add new features or fix bugs in Weave or even if you have an idea on how to improve something that's already there in Weave, here's how you can do that:

  • Fork weave into your own GitHub repository
  • Create a topic branch with an appropriate name
  • Work on your favourite feature to your content
  • Once you are satisifed, create a pull request by going continuuity/weave project.
  • Address all the review comments
  • Once addressed, the changes will be committed to the continuuity/weave repo.

Groups

License

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.