Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[594] Support continuous conversion with the RunSync tool #593

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@
<reuseForks>false</reuseForks>
<forkCount>6</forkCount>
<trimStackTrace>false</trimStackTrace>
<argLine>-Xmx1024m</argLine>
<argLine>-Xmx1500m</argLine>
<forkedProcessExitTimeoutInSeconds>120</forkedProcessExitTimeoutInSeconds>
</configuration>
</plugin>
Expand Down
15 changes: 15 additions & 0 deletions xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,19 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,35 @@ public void testOutOfSyncIncrementalSyncs() {
}
}

@Test
public void testIncrementalSyncsWithNoChangesDoesNotThrowError() {
String tableName = getTableName();
ConversionSourceProvider<?> conversionSourceProvider = getConversionSourceProvider(HUDI);
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
ConversionConfig dualTableConfig =
getTableSyncConfig(
HUDI,
SyncMode.INCREMENTAL,
tableName,
table,
Arrays.asList(ICEBERG, DELTA),
null,
null);

table.insertRecords(50, true);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
// sync once
conversionController.sync(dualTableConfig, conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, Arrays.asList(DELTA, ICEBERG), 50);
// sync again
conversionController.sync(dualTableConfig, conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, Arrays.asList(DELTA, ICEBERG), 50);
}
}

@Test
public void testIcebergCorruptedSnapshotRecovery() throws Exception {
String tableName = getTableName();
Expand Down
16 changes: 16 additions & 0 deletions xtable-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.xtable</groupId>
<artifactId>xtable-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark${spark.version.prefix}-bundle_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import lombok.Builder;
import lombok.Data;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import lombok.extern.log4j.Log4j2;

import org.apache.commons.cli.CommandLine;
Expand All @@ -42,7 +48,6 @@
import com.fasterxml.jackson.annotation.JsonMerge;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.annotations.VisibleForTesting;

Expand All @@ -69,6 +74,8 @@ public class RunSync {
private static final String HADOOP_CONFIG_PATH = "p";
private static final String CONVERTERS_CONFIG_PATH = "c";
private static final String ICEBERG_CATALOG_CONFIG_PATH = "i";
private static final String CONTINUOUS_MODE = "m";
private static final String CONTINUOUS_MODE_INTERVAL = "t";
private static final String HELP_OPTION = "h";

private static final Options OPTIONS =
Expand Down Expand Up @@ -96,6 +103,16 @@ public class RunSync {
true,
"The path to a yaml file containing Iceberg catalog configuration. The configuration will be "
+ "used for any Iceberg source or target.")
.addOption(
CONTINUOUS_MODE,
"continuousMode",
false,
"Runs the tool on a scheduled loop. On each iteration, the process will reload the configurations from the provided file path allowing the user to update the tables managed by the job without restarting the job.")
.addOption(
CONTINUOUS_MODE_INTERVAL,
"continuousModeInterval",
true,
"The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.")
.addOption(HELP_OPTION, "help", false, "Displays help information to run this utility");

public static void main(String[] args) throws IOException {
Expand All @@ -115,11 +132,39 @@ public static void main(String[] args) throws IOException {
return;
}

DatasetConfig datasetConfig = new DatasetConfig();
if (cmd.hasOption(CONTINUOUS_MODE)) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
long intervalInSeconds = Long.parseLong(cmd.getOptionValue(CONTINUOUS_MODE_INTERVAL, "5"));
executorService.scheduleAtFixedRate(
() -> {
try {
runSync(cmd);
} catch (IOException ex) {
log.error("Sync operation failed", ex);
}
},
0,
intervalInSeconds,
TimeUnit.SECONDS);
while (!Thread.interrupted()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
log.debug("Received interrupt signal");
break;
}
}
executorService.shutdownNow();
} else {
runSync(cmd);
}
}

private static void runSync(CommandLine cmd) throws IOException {
DatasetConfig datasetConfig;
try (InputStream inputStream =
Files.newInputStream(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)))) {
ObjectReader objectReader = YAML_MAPPER.readerForUpdating(datasetConfig);
objectReader.readValue(inputStream);
datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
}

byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
Expand Down Expand Up @@ -242,7 +287,9 @@ static IcebergCatalogConfig loadIcebergCatalogConfig(byte[] customConfigs) throw
: YAML_MAPPER.readValue(customConfigs, IcebergCatalogConfig.class);
}

@Data
@Value
@Builder
@Jacksonized
public static class DatasetConfig {

/**
Expand All @@ -258,7 +305,9 @@ public static class DatasetConfig {
/** Configuration of the dataset to sync, path, table name, etc. */
List<Table> datasets;

@Data
@Value
@Builder
@Jacksonized
public static class Table {
/**
* The base path of the table to sync. Any authentication configuration needed by HDFS client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
tableFormatConverters:
HUDI:
conversionSourceProviderClass: org.apache.xtable.hudi.HudiConversionSourceProvider
conversionTargetProviderClass: org.apache.xtable.hudi.HudiConversionTarget
DELTA:
conversionSourceProviderClass: org.apache.xtable.delta.DeltaConversionSourceProvider
conversionTargetProviderClass: org.apache.xtable.delta.DeltaConversionTarget
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.utilities;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import lombok.SneakyThrows;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import org.apache.hudi.common.model.HoodieTableType;

import org.apache.xtable.GenericTable;
import org.apache.xtable.TestJavaHudiTable;

class ITRunSync {

@Test
void testSingleSyncMode(@TempDir Path tempDir) throws IOException {
String tableName = "test-table";
try (GenericTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRows(20);
File configFile = writeConfigFile(tempDir, table, tableName);
String[] args = new String[] {"--datasetConfig", configFile.getPath()};
RunSync.main(args);
Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
waitForNumIcebergCommits(icebergMetadataPath, 2);
}
}

@Test
void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
ExecutorService runner = Executors.newSingleThreadExecutor();
String tableName = "test-table";
try (GenericTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRows(20);
File configFile = writeConfigFile(tempDir, table, tableName);
String[] args = new String[] {"--datasetConfig", configFile.getPath(), "--continuousMode"};
runner.submit(
() -> {
try {
RunSync.main(args);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
waitForNumIcebergCommits(icebergMetadataPath, 2);
// write more data now that table is initialized and data is synced
table.insertRows(20);
waitForNumIcebergCommits(icebergMetadataPath, 3);
assertEquals(3, numIcebergMetadataJsonFiles(icebergMetadataPath));
} finally {
runner.shutdownNow();
}
}

private static File writeConfigFile(Path tempDir, GenericTable table, String tableName)
throws IOException {
RunSync.DatasetConfig config =
RunSync.DatasetConfig.builder()
.sourceFormat("HUDI")
.targetFormats(Collections.singletonList("ICEBERG"))
.datasets(
Collections.singletonList(
RunSync.DatasetConfig.Table.builder()
.tableBasePath(table.getBasePath())
.tableName(tableName)
.build()))
.build();
File configFile = new File(tempDir + "config.yaml");
RunSync.YAML_MAPPER.writeValue(configFile, config);
return configFile;
}

@SneakyThrows
private static void waitForNumIcebergCommits(Path metadataPath, int count) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(5)) {
if (numIcebergMetadataJsonFiles(metadataPath) == count) {
break;
}
Thread.sleep(5000);
}
}

@SneakyThrows
private static long numIcebergMetadataJsonFiles(Path path) {
long count = 0;
if (Files.exists(path)) {
count = Files.list(path).filter(p -> p.toString().endsWith("metadata.json")).count();
}
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,21 @@ public void testTableFormatConverterConfigDefault() throws IOException {
Assertions.assertEquals(
"org.apache.xtable.hudi.HudiConversionSourceProvider",
tfConverters.get(HUDI).getConversionSourceProviderClass());
Assertions.assertEquals(
"org.apache.xtable.hudi.HudiConversionTarget",
tfConverters.get(HUDI).getConversionTargetProviderClass());
Assertions.assertEquals(
"org.apache.xtable.iceberg.IcebergConversionTarget",
tfConverters.get(ICEBERG).getConversionTargetProviderClass());
Assertions.assertEquals(
"org.apache.xtable.iceberg.IcebergConversionSourceProvider",
tfConverters.get(ICEBERG).getConversionSourceProviderClass());
Assertions.assertEquals(
"org.apache.xtable.delta.DeltaConversionTarget",
tfConverters.get(DELTA).getConversionTargetProviderClass());
Assertions.assertEquals(
"org.apache.xtable.delta.DeltaConversionSourceProvider",
tfConverters.get(DELTA).getConversionSourceProviderClass());
}

@Test
Expand Down
Loading