From e3d541bafcbd036d7760a0eb2e090b40a0d8e675 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Sun, 8 Dec 2024 20:53:45 -0600 Subject: [PATCH] Add continous sync mode for utilities bundle, add additional testing --- pom.xml | 2 +- xtable-core/pom.xml | 15 ++ .../apache/xtable/ITConversionController.java | 29 ++++ xtable-utilities/pom.xml | 16 +++ .../org/apache/xtable/utilities/RunSync.java | 61 ++++++++- .../resources/xtable-conversion-defaults.yaml | 1 + .../apache/xtable/utilities/ITRunSync.java | 128 ++++++++++++++++++ .../apache/xtable/utilities/TestRunSync.java | 9 ++ 8 files changed, 254 insertions(+), 7 deletions(-) create mode 100644 xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java diff --git a/pom.xml b/pom.xml index 99b4fe1a9..6106fa5d8 100644 --- a/pom.xml +++ b/pom.xml @@ -658,7 +658,7 @@ false 6 false - -Xmx1024m + -Xmx1500m 120 diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index f277495e7..ecec5ac36 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -174,4 +174,19 @@ test + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 3d539766a..3325ca676 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -653,6 +653,35 @@ public void testOutOfSyncIncrementalSyncs() { } } + @Test + public void testIncrementalSyncsWithNoChangesDoesNotThrowError() { + String tableName = getTableName(); + ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(HUDI); + try (TestJavaHudiTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + ConversionConfig dualTableConfig = + getTableSyncConfig( + HUDI, + SyncMode.INCREMENTAL, + tableName, + table, + Arrays.asList(ICEBERG, DELTA), + null, + null); + + table.insertRecords(50, true); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + // sync once + conversionController.sync(dualTableConfig, conversionSourceProvider); + checkDatasetEquivalence(HUDI, table, Arrays.asList(DELTA, ICEBERG), 50); + // sync again + conversionController.sync(dualTableConfig, conversionSourceProvider); + checkDatasetEquivalence(HUDI, table, Arrays.asList(DELTA, ICEBERG), 50); + } + } + @Test public void testIcebergCorruptedSnapshotRecovery() throws Exception { String tableName = getTableName(); diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml index 8191af3c0..2181d25ce 100644 --- a/xtable-utilities/pom.xml +++ b/xtable-utilities/pom.xml @@ -125,6 +125,22 @@ junit-jupiter-engine test + + + org.apache.xtable + xtable-core_${scala.binary.version} + ${project.version} + tests + test-jar + test + + + + org.apache.hudi + hudi-spark${spark.version.prefix}-bundle_${scala.binary.version} + test + + diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index c84753de5..8939c30be 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -26,9 +26,15 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import lombok.Builder; import lombok.Data; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; import lombok.extern.log4j.Log4j2; import org.apache.commons.cli.CommandLine; @@ -42,7 +48,6 @@ import com.fasterxml.jackson.annotation.JsonMerge; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.annotations.VisibleForTesting; @@ -69,6 +74,8 @@ public class RunSync { private static final String HADOOP_CONFIG_PATH = "p"; private static final String CONVERTERS_CONFIG_PATH = "c"; private static final String ICEBERG_CATALOG_CONFIG_PATH = "i"; + private static final String CONTINUOUS_MODE = "m"; + private static final String CONTINUOUS_MODE_INTERVAL = "t"; private static final String HELP_OPTION = "h"; private static final Options OPTIONS = @@ -96,6 +103,16 @@ public class RunSync { true, "The path to a yaml file containing Iceberg catalog configuration. The configuration will be " + "used for any Iceberg source or target.") + .addOption( + CONTINUOUS_MODE, + "continuousMode", + false, + "Runs the tool on a scheduled loop. On each iteration, the process will reload the configurations from the provided file path allowing the user to update the tables managed by the job without restarting the job.") + .addOption( + CONTINUOUS_MODE_INTERVAL, + "continuousModeInterval", + true, + "The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.") .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); public static void main(String[] args) throws IOException { @@ -115,11 +132,39 @@ public static void main(String[] args) throws IOException { return; } - DatasetConfig datasetConfig = new DatasetConfig(); + if (cmd.hasOption(CONTINUOUS_MODE)) { + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + long intervalInSeconds = Long.parseLong(cmd.getOptionValue(CONTINUOUS_MODE_INTERVAL, "5")); + executorService.scheduleAtFixedRate( + () -> { + try { + runSync(cmd); + } catch (IOException ex) { + log.error("Sync operation failed", ex); + } + }, + 0, + intervalInSeconds, + TimeUnit.SECONDS); + while (!Thread.interrupted()) { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + log.debug("Received interrupt signal"); + break; + } + } + executorService.shutdownNow(); + } else { + runSync(cmd); + } + } + + private static void runSync(CommandLine cmd) throws IOException { + DatasetConfig datasetConfig; try (InputStream inputStream = Files.newInputStream(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)))) { - ObjectReader objectReader = YAML_MAPPER.readerForUpdating(datasetConfig); - objectReader.readValue(inputStream); + datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class); } byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH); @@ -242,7 +287,9 @@ static IcebergCatalogConfig loadIcebergCatalogConfig(byte[] customConfigs) throw : YAML_MAPPER.readValue(customConfigs, IcebergCatalogConfig.class); } - @Data + @Value + @Builder + @Jacksonized public static class DatasetConfig { /** @@ -258,7 +305,9 @@ public static class DatasetConfig { /** Configuration of the dataset to sync, path, table name, etc. */ List datasets; - @Data + @Value + @Builder + @Jacksonized public static class Table { /** * The base path of the table to sync. Any authentication configuration needed by HDFS client diff --git a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml index c80c939bf..e9217a338 100644 --- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml +++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml @@ -29,6 +29,7 @@ tableFormatConverters: HUDI: conversionSourceProviderClass: org.apache.xtable.hudi.HudiConversionSourceProvider + conversionTargetProviderClass: org.apache.xtable.hudi.HudiConversionTarget DELTA: conversionSourceProviderClass: org.apache.xtable.delta.DeltaConversionSourceProvider conversionTargetProviderClass: org.apache.xtable.delta.DeltaConversionTarget diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java new file mode 100644 index 000000000..2294e16aa --- /dev/null +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.utilities; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import lombok.SneakyThrows; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.hudi.common.model.HoodieTableType; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.TestJavaHudiTable; + +class ITRunSync { + + @Test + void testSingleSyncMode(@TempDir Path tempDir) throws IOException { + String tableName = "test-table"; + try (GenericTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + table.insertRows(20); + File configFile = writeConfigFile(tempDir, table, tableName); + String[] args = new String[] {"--datasetConfig", configFile.getPath()}; + RunSync.main(args); + Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata")); + waitForNumIcebergCommits(icebergMetadataPath, 2); + } + } + + @Test + void testContinuousSyncMode(@TempDir Path tempDir) throws IOException { + ExecutorService runner = Executors.newSingleThreadExecutor(); + String tableName = "test-table"; + try (GenericTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + table.insertRows(20); + File configFile = writeConfigFile(tempDir, table, tableName); + String[] args = new String[] {"--datasetConfig", configFile.getPath(), "--continuousMode"}; + runner.submit( + () -> { + try { + RunSync.main(args); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + }); + Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata")); + waitForNumIcebergCommits(icebergMetadataPath, 2); + // write more data now that table is initialized and data is synced + table.insertRows(20); + waitForNumIcebergCommits(icebergMetadataPath, 3); + assertEquals(3, numIcebergMetadataJsonFiles(icebergMetadataPath)); + } finally { + runner.shutdownNow(); + } + } + + private static File writeConfigFile(Path tempDir, GenericTable table, String tableName) + throws IOException { + RunSync.DatasetConfig config = + RunSync.DatasetConfig.builder() + .sourceFormat("HUDI") + .targetFormats(Collections.singletonList("ICEBERG")) + .datasets( + Collections.singletonList( + RunSync.DatasetConfig.Table.builder() + .tableBasePath(table.getBasePath()) + .tableName(tableName) + .build())) + .build(); + File configFile = new File(tempDir + "config.yaml"); + RunSync.YAML_MAPPER.writeValue(configFile, config); + return configFile; + } + + @SneakyThrows + private static void waitForNumIcebergCommits(Path metadataPath, int count) { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(5)) { + if (numIcebergMetadataJsonFiles(metadataPath) == count) { + break; + } + Thread.sleep(5000); + } + } + + @SneakyThrows + private static long numIcebergMetadataJsonFiles(Path path) { + long count = 0; + if (Files.exists(path)) { + count = Files.list(path).filter(p -> p.toString().endsWith("metadata.json")).count(); + } + return count; + } +} diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java index a61d948e0..20a190e88 100644 --- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java @@ -88,12 +88,21 @@ public void testTableFormatConverterConfigDefault() throws IOException { Assertions.assertEquals( "org.apache.xtable.hudi.HudiConversionSourceProvider", tfConverters.get(HUDI).getConversionSourceProviderClass()); + Assertions.assertEquals( + "org.apache.xtable.hudi.HudiConversionTarget", + tfConverters.get(HUDI).getConversionTargetProviderClass()); Assertions.assertEquals( "org.apache.xtable.iceberg.IcebergConversionTarget", tfConverters.get(ICEBERG).getConversionTargetProviderClass()); Assertions.assertEquals( "org.apache.xtable.iceberg.IcebergConversionSourceProvider", tfConverters.get(ICEBERG).getConversionSourceProviderClass()); + Assertions.assertEquals( + "org.apache.xtable.delta.DeltaConversionTarget", + tfConverters.get(DELTA).getConversionTargetProviderClass()); + Assertions.assertEquals( + "org.apache.xtable.delta.DeltaConversionSourceProvider", + tfConverters.get(DELTA).getConversionSourceProviderClass()); } @Test