Writing the integration test

The test covers the entire DAG and will run the application in embedded mode. In embedded mode, all operators and containers share the JUnit JVM. Containers are threads (instead of separate processes) but the data flow still behaves as if operators lived in separate processes. This means operators execute asynchronously as they would in a distributed cluster and data is transferred over the loopback interface (if that's how the streams are configured).

@Test 
public void testApplication() throws Exception { 
  EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); 
  Attribute.AttributeMap launchAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap(); launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); Configuration conf = new Configuration(false); conf.addResource(this.getClass().getResourceAsStream("/META-
INF/properties.xml")); conf.addResource(this.getClass().getResourceAsStream("/wordcounttest-
properties.xml")); File resultFile = new File("./target/wordcountresult_4.0"); if (resultFile.exists() && !resultFile.delete()) { throw new AssertionError("Failed to delete " + resultFile); } AppHandle appHandle = launcher.launchApp(new Application(), conf,
launchAttributes); long timeoutMillis = System.currentTimeMillis() + 10000; while (!appHandle.isFinished() && System.currentTimeMillis() < timeoutMillis) { Thread.sleep(500); if (resultFile.exists() && resultFile.length() > 0) { break; } } appHandle.shutdown(ShutdownMode.KILL); Assert.assertTrue(resultFile.exists() && resultFile.length() > 0); String result = FileUtils.readFileToString(resultFile); Assert.assertTrue(result, result.contains("MyFirstApplication=5")); }

The flow of the test is as follows:

  1. The test driver creates a launcher for embedded mode.
  2. Load the configuration (from the standard location as well as overrides that are needed for the test.
  3. Make sure that the result area is clean, which means any residual from previous runs should be removed as part of the test initialization.
  4. Instantiate the application and launch it with the embedded cluster, keep the returned handle to check and control the asynchronous execution.
  5. In a loop, check if the application has terminated or cancel it after the expected results are in. In case the logic checks for the expected result files and with the timeout also makes sure that the test does not turn into a runaway process if the application fails to produce the expected results.
  6. After the results are available, check the details with assertions.

The test completes within a few seconds and can easily be run within the IDE whenever a change is made. At this stage, there is no need to package the application, launch it on a Hadoop cluster, and diagnose issues by collecting logs and other information from distributed workers. Instead, when a test fails it is most of the time straightforward to identify the cause and where appropriate step through the code with a debugger.