diff --git a/pom.xml b/pom.xml index 7a5973428..fa94f6f15 100644 --- a/pom.xml +++ b/pom.xml @@ -61,8 +61,9 @@ 5.9.0 1.18.30 1.18.20.0 - 3.4.0 + 3.3.6 0.14.0 + 2.28.22 3.3.1 3.8.0 3.2.4 @@ -372,9 +373,9 @@ runtime - com.amazonaws - aws-java-sdk-bundle - 1.12.328 + software.amazon.awssdk + bundle + ${aws.version} runtime diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java index af85c9007..f02adc308 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java @@ -31,7 +31,8 @@ public enum ErrorCode { UNSUPPORTED_SCHEMA_TYPE(10007), UNSUPPORTED_FEATURE(10008), PARSE_EXCEPTION(10009), - CATALOG_REFRESH_EXCEPTION(10010); + CATALOG_REFRESH_EXCEPTION(10010), + CATALOG_SYNC_GENERIC_EXCEPTION(10011); private final int errorCode; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java new file mode 100644 index 000000000..e5d6a862b --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.storage; + +public class CatalogType { + public static final String GLUE = "GLUE"; +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java index e76d59068..e52c203bb 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java @@ -83,6 +83,11 @@ private CatalogSyncStatus syncCatalog( CatalogSyncClient
catalogSyncClient, CatalogTableIdentifier tableIdentifier, InternalTable table) { + log.info( + "Running catalog sync for table {} with format {} using catalogSync {}", + table.getBasePath(), + table.getTableFormat(), + catalogSyncClient.getClass().getName()); if (!catalogSyncClient.hasDatabase(tableIdentifier)) { catalogSyncClient.createDatabase(tableIdentifier); } diff --git a/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestHierarchicalTableIdentifier.java b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestHierarchicalTableIdentifier.java new file mode 100644 index 000000000..f773c2797 --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestHierarchicalTableIdentifier.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.catalog; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestHierarchicalTableIdentifier { + + @Test + void testToString() { + HierarchicalTableIdentifier catalogTableIdentifier = + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "catalogName.databaseName.tableName"); + assertEquals("catalogName.databaseName.tableName", catalogTableIdentifier.getId()); + } + + @Test + void testConstructorForHierarchicalTableIdentifier() { + Assertions.assertDoesNotThrow( + () -> + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "catalogName.databaseName.tableName")); + Assertions.assertDoesNotThrow( + () -> + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "databaseName.tableName")); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier("tableName")); + } +} diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index 80de22991..25a4002ec 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -138,12 +138,23 @@ provided + + + software.amazon.awssdk + glue + ${aws.version} + + org.mockito mockito-core test + + org.mockito + mockito-junit-jupiter + diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java index 37778eb8e..7b4a0ba32 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java @@ -57,7 +57,7 @@ public CatalogSyncClient createCatalogSyncClient( return ReflectionUtils.createInstanceOfClass( targetCatalogConfig.getCatalogSyncClientImpl(), targetCatalogConfig, - tableFormat, - configuration); + configuration, + tableFormat); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogTableBuilder.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogTableBuilder.java new file mode 100644 index 000000000..6216d111c --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogTableBuilder.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; + +/** + * The interface for creating/updating catalog table object, each catalog can have its own + * implementation that can be plugged in. + */ +public interface CatalogTableBuilder { + public REQUEST getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier); + + public REQUEST getUpdateTableRequest( + InternalTable table, TABLE catalogTable, CatalogTableIdentifier tableIdentifier); +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java new file mode 100644 index 000000000..7692824e0 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; + +public class CatalogUtils { + + public static HierarchicalTableIdentifier castToHierarchicalTableIdentifier( + CatalogTableIdentifier tableIdentifier) { + if (tableIdentifier instanceof HierarchicalTableIdentifier) { + return (HierarchicalTableIdentifier) tableIdentifier; + } + throw new IllegalArgumentException("Invalid tableIdentifier implementation"); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/Constants.java b/xtable-core/src/main/java/org/apache/xtable/catalog/Constants.java new file mode 100644 index 000000000..fec8607fd --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/Constants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +public class Constants { + + public static final String PROP_SPARK_SQL_SOURCES_PROVIDER = "spark.sql.sources.provider"; + public static final String PROP_PATH = "path"; + public static final String PROP_SERIALIZATION_FORMAT = "serialization.format"; + public static final String PROP_EXTERNAL = "EXTERNAL"; +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java index 3649ae8e0..2a8047167 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java @@ -20,16 +20,27 @@ import java.util.Map; +import org.apache.xtable.catalog.glue.GlueCatalogConversionSource; +import org.apache.xtable.catalog.glue.GlueCatalogSyncClient; import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.storage.CatalogType; /** A factory class which returns {@link ExternalCatalogConfig} based on catalogType. */ public class ExternalCatalogConfigFactory { public static ExternalCatalogConfig fromCatalogType( String catalogType, String catalogId, Map properties) { - // TODO: Choose existing implementation based on catalogType. - String catalogSyncClientImpl = ""; - String catalogConversionSourceImpl = ""; + String catalogSyncClientImpl; + String catalogConversionSourceImpl; + switch (catalogType) { + case CatalogType.GLUE: + catalogSyncClientImpl = GlueCatalogSyncClient.class.getName(); + catalogConversionSourceImpl = GlueCatalogConversionSource.class.getName(); + break; + default: + throw new NotSupportedException("Unsupported catalogType: " + catalogType); + } return ExternalCatalogConfig.builder() .catalogType(catalogType) .catalogSyncClientImpl(catalogSyncClientImpl) diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/TableFormatUtils.java b/xtable-core/src/main/java/org/apache/xtable/catalog/TableFormatUtils.java new file mode 100644 index 000000000..78825ae42 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/TableFormatUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER; + +import java.util.Map; + +import org.apache.iceberg.TableProperties; + +import com.google.common.base.Strings; + +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.storage.TableFormat; + +public class TableFormatUtils { + + public static String getTableDataLocation( + String tableFormat, String tableLocation, Map properties) { + switch (tableFormat) { + case TableFormat.ICEBERG: + return getIcebergDataLocation(tableLocation, properties); + case TableFormat.DELTA: + case TableFormat.HUDI: + return tableLocation; + default: + throw new NotSupportedException("Unsupported table format: " + tableFormat); + } + } + + /** Get iceberg table data files location */ + private static String getIcebergDataLocation( + String tableLocation, Map properties) { + String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH); + if (dataLocation == null) { + dataLocation = String.format("%s/data", tableLocation); + } + } + } + return dataLocation; + } + + // Get table format name from table properties + public static String getTableFormat(Map properties) { + // - In case of ICEBERG, table_type param will give the table format + // - In case of DELTA, table_type or spark.sql.sources.provider param will give the table + // format + // - In case of HUDI, spark.sql.sources.provider param will give the table format + String tableFormat = properties.get(TABLE_TYPE_PROP); + if (Strings.isNullOrEmpty(tableFormat)) { + tableFormat = properties.get(PROP_SPARK_SQL_SOURCES_PROVIDER); + } + return tableFormat; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/DefaultGlueClientFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/DefaultGlueClientFactory.java new file mode 100644 index 000000000..e36bbe356 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/DefaultGlueClientFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.reflection.ReflectionUtils; + +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +/** + * Factory class for creating and configuring instances of {@link GlueClient} with settings provided + * by {@link GlueCatalogConfig}. + * + *

This factory is responsible for setting the AWS region and credentials for the Glue client. If + * a custom credentials provider class is specified in {@code GlueCatalogConfig}, it will use + * reflection to instantiate the provider; otherwise, it defaults to the standard AWS credentials + * provider. + */ +public class DefaultGlueClientFactory extends GlueClientFactory { + + public DefaultGlueClientFactory(GlueCatalogConfig glueConfig) { + super(glueConfig); + } + + public GlueClient getGlueClient() { + GlueClientBuilder builder = GlueClient.builder(); + if (!StringUtils.isEmpty(glueConfig.getRegion())) { + builder.region(Region.of(glueConfig.getRegion())); + } + + AwsCredentialsProvider credentialsProvider; + if (!StringUtils.isEmpty(glueConfig.getClientCredentialsProviderClass())) { + String className = glueConfig.getClientCredentialsProviderClass(); + try { + credentialsProvider = + ReflectionUtils.createInstanceOfClassFromStaticMethod( + className, + "create", + new Class[] {Map.class}, + new Object[] {glueConfig.getClientCredentialConfigs()}); + } catch (Exception e) { + credentialsProvider = + ReflectionUtils.createInstanceOfClassFromStaticMethod(className, "create"); + } + } else { + credentialsProvider = DefaultCredentialsProvider.create(); + } + + builder.credentialsProvider(credentialsProvider); + return builder.build(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConfig.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConfig.java new file mode 100644 index 000000000..2c64831bc --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConfig.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** Configurations for setting up Glue client and running Glue catalog operations */ +@Getter +@EqualsAndHashCode +@ToString +public class GlueCatalogConfig { + + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + public static final String CLIENT_CREDENTIAL_PROVIDER_PREFIX = + "externalCatalog.glue.credentials.provider."; + + @JsonProperty("externalCatalog.glue.catalogId") + private String catalogId; + + @JsonProperty("externalCatalog.glue.region") + private String region; + + @JsonProperty("externalCatalog.glue.credentialsProviderClass") + private String clientCredentialsProviderClass; + + /** + * In case a credentialsProviderClass is configured and require additional properties for + * instantiation, those properties should start with {@link #CLIENT_CREDENTIAL_PROVIDER_PREFIX}. + * + *

For ex: if credentialsProviderClass requires `accessKey` and `secretAccessKey`, they should + * be configured using below keys: + *

  • externalCatalog.glue.credentials.provider.accessKey + *
  • externalCatalog.glue.credentials.provider.secretAccessKey + */ + @Setter private Map clientCredentialConfigs; + + /** Creates GlueCatalogConfig from given key-value map */ + public static GlueCatalogConfig of(Map properties) { + try { + GlueCatalogConfig glueCatalogConfig = + OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(properties), GlueCatalogConfig.class); + Map clientCredentialProperties = + propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX); + glueCatalogConfig.setClientCredentialConfigs(clientCredentialProperties); + return glueCatalogConfig; + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static Map propertiesWithPrefix( + Map properties, String prefix) { + if (properties == null || properties.isEmpty()) { + return Collections.emptyMap(); + } + + return properties.entrySet().stream() + .filter(e -> e.getKey().startsWith(prefix)) + .collect(Collectors.toMap(e -> e.getKey().replaceFirst(prefix, ""), Map.Entry::getValue)); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConversionSource.java new file mode 100644 index 000000000..67f5b3176 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConversionSource.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier; + +import java.util.Locale; +import java.util.Properties; + +import javax.security.auth.login.Configuration; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; + +import org.apache.xtable.catalog.TableFormatUtils; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.spi.extractor.CatalogConversionSource; + +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.Table; + +public class GlueCatalogConversionSource implements CatalogConversionSource { + private final GlueClient glueClient; + private final GlueCatalogConfig glueCatalogConfig; + + public GlueCatalogConversionSource( + ExternalCatalogConfig catalogConfig, Configuration configuration) { + this.glueCatalogConfig = GlueCatalogConfig.of(catalogConfig.getCatalogProperties()); + this.glueClient = new DefaultGlueClientFactory(glueCatalogConfig).getGlueClient(); + } + + @VisibleForTesting + public GlueCatalogConversionSource(GlueCatalogConfig glueCatalogConfig, GlueClient glueClient) { + this.glueCatalogConfig = glueCatalogConfig; + this.glueClient = glueClient; + } + + @Override + public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); + try { + GetTableResponse response = + glueClient.getTable( + GetTableRequest.builder() + .catalogId(glueCatalogConfig.getCatalogId()) + .databaseName(tblIdentifier.getDatabaseName()) + .name(tblIdentifier.getTableName()) + .build()); + Table table = response.table(); + if (table == null) { + throw new IllegalStateException(String.format("table: %s is null", tableIdentifier)); + } + + String tableFormat = TableFormatUtils.getTableFormat(table.parameters()); + if (Strings.isNullOrEmpty(tableFormat)) { + throw new IllegalStateException( + String.format("TableFormat is null or empty for table: %s", tableIdentifier.getId())); + } + tableFormat = tableFormat.toUpperCase(Locale.ENGLISH); + + String tableLocation = table.storageDescriptor().location(); + String dataPath = + TableFormatUtils.getTableDataLocation(tableFormat, tableLocation, table.parameters()); + + Properties tableProperties = new Properties(); + tableProperties.putAll(table.parameters()); + return SourceTable.builder() + .name(table.name()) + .basePath(tableLocation) + .dataPath(dataPath) + .formatName(tableFormat) + .additionalProperties(tableProperties) + .build(); + } catch (GlueException e) { + throw new CatalogSyncException("Failed to get table: " + tableIdentifier, e); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogSyncClient.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogSyncClient.java new file mode 100644 index 000000000..1e5654921 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogSyncClient.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier; + +import java.time.ZonedDateTime; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.DatabaseInput; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; + +/** AWS Glue implementation for CatalogSyncClient for registering InternalTable in Glue */ +@Log4j2 +public class GlueCatalogSyncClient implements CatalogSyncClient
  • { + + public static final String GLUE_EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE"; + private static final String TEMP_SUFFIX = "_temp"; + + private final ExternalCatalogConfig catalogConfig; + private final GlueClient glueClient; + private final GlueCatalogConfig glueCatalogConfig; + private final Configuration configuration; + private final CatalogTableBuilder tableBuilder; + + public GlueCatalogSyncClient( + ExternalCatalogConfig catalogConfig, Configuration configuration, String tableFormat) { + this.catalogConfig = catalogConfig; + this.glueCatalogConfig = GlueCatalogConfig.of(catalogConfig.getCatalogProperties()); + this.glueClient = new DefaultGlueClientFactory(glueCatalogConfig).getGlueClient(); + this.configuration = new Configuration(configuration); + this.tableBuilder = GlueCatalogTableBuilderFactory.getInstance(tableFormat, this.configuration); + } + + @VisibleForTesting + GlueCatalogSyncClient( + ExternalCatalogConfig catalogConfig, + Configuration configuration, + GlueCatalogConfig glueCatalogConfig, + GlueClient glueClient, + CatalogTableBuilder tableBuilder) { + this.catalogConfig = catalogConfig; + this.configuration = new Configuration(configuration); + this.glueCatalogConfig = glueCatalogConfig; + this.glueClient = glueClient; + this.tableBuilder = tableBuilder; + } + + @Override + public String getCatalogId() { + return catalogConfig.getCatalogId(); + } + + @Override + public String getStorageLocation(Table table) { + if (table == null || table.storageDescriptor() == null) { + return null; + } + return table.storageDescriptor().location(); + } + + @Override + public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) { + String databaseName = castToHierarchicalTableIdentifier(tableIdentifier).getDatabaseName(); + try { + return glueClient + .getDatabase( + GetDatabaseRequest.builder() + .catalogId(glueCatalogConfig.getCatalogId()) + .name(databaseName) + .build()) + .database() + != null; + } catch (EntityNotFoundException e) { + return false; + } catch (Exception e) { + throw new CatalogSyncException("Failed to get database: " + databaseName, e); + } + } + + @Override + public void createDatabase(CatalogTableIdentifier tableIdentifier) { + String databaseName = castToHierarchicalTableIdentifier(tableIdentifier).getDatabaseName(); + try { + glueClient.createDatabase( + CreateDatabaseRequest.builder() + .catalogId(glueCatalogConfig.getCatalogId()) + .databaseInput( + DatabaseInput.builder() + .name(databaseName) + .description("Created by " + this.getClass().getName()) + .build()) + .build()); + } catch (Exception e) { + throw new CatalogSyncException("Failed to create database: " + databaseName, e); + } + } + + @Override + public Table getTable(CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); + try { + GetTableResponse response = + glueClient.getTable( + GetTableRequest.builder() + .catalogId(glueCatalogConfig.getCatalogId()) + .databaseName(tblIdentifier.getDatabaseName()) + .name(tblIdentifier.getTableName()) + .build()); + return response.table(); + } catch (EntityNotFoundException e) { + return null; + } catch (Exception e) { + throw new CatalogSyncException("Failed to get table: " + tblIdentifier.getId(), e); + } + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); + TableInput tableInput = tableBuilder.getCreateTableRequest(table, tableIdentifier); + try { + glueClient.createTable( + CreateTableRequest.builder() + .catalogId(glueCatalogConfig.getCatalogId()) + .databaseName(tblIdentifier.getDatabaseName()) + .tableInput(tableInput) + .build()); + } catch (Exception e) { + throw new CatalogSyncException("Failed to create table: " + tblIdentifier.getId(), e); + } + } + + @Override + public void refreshTable( + InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); + TableInput tableInput = + tableBuilder.getUpdateTableRequest(table, catalogTable, tableIdentifier); + try { + glueClient.updateTable( + UpdateTableRequest.builder() + .catalogId(glueCatalogConfig.getCatalogId()) + .databaseName(tblIdentifier.getDatabaseName()) + .skipArchive(true) + .tableInput(tableInput) + .build()); + } catch (Exception e) { + throw new CatalogSyncException("Failed to refresh table: " + tblIdentifier.getId(), e); + } + } + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + // validate before dropping the table + validateTempTableCreation(table, tableIdentifier); + dropTable(table, tableIdentifier); + createTable(table, tableIdentifier); + } + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); + try { + glueClient.deleteTable( + DeleteTableRequest.builder() + .catalogId(glueCatalogConfig.getCatalogId()) + .databaseName(tblIdentifier.getDatabaseName()) + .name(tblIdentifier.getTableName()) + .build()); + } catch (Exception e) { + throw new CatalogSyncException("Failed to drop table: " + tableIdentifier.getId(), e); + } + } + + @Override + public void close() throws Exception { + if (glueClient != null) { + glueClient.close(); + } + } + + /** + * creates a temp table with new metadata and properties to ensure table creation succeeds before + * dropping the table and recreating it. This ensures that actual table is not dropped in case + * there are any issues + */ + private void validateTempTableCreation( + InternalTable table, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); + String tempTableName = + tblIdentifier.getTableName() + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond(); + ThreePartHierarchicalTableIdentifier tempTableIdentifier = + new ThreePartHierarchicalTableIdentifier(tblIdentifier.getDatabaseName(), tempTableName); + createTable(table, tempTableIdentifier); + dropTable(table, tempTableIdentifier); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogTableBuilderFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogTableBuilderFactory.java new file mode 100644 index 000000000..d9c6d7f6d --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogTableBuilderFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.catalog.glue.table.IcebergGlueCatalogTableBuilder; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.storage.TableFormat; + +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +class GlueCatalogTableBuilderFactory { + + static CatalogTableBuilder getInstance( + String tableFormat, Configuration configuration) { + switch (tableFormat) { + case TableFormat.ICEBERG: + return new IcebergGlueCatalogTableBuilder(configuration); + default: + throw new NotSupportedException("Unsupported table format: " + tableFormat); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueClientFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueClientFactory.java new file mode 100644 index 000000000..c29382787 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueClientFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import software.amazon.awssdk.services.glue.GlueClient; + +/** + * Abstract factory for creating {@link GlueClient} instances configured with {@link + * GlueCatalogConfig} settings. + */ +public abstract class GlueClientFactory { + + protected final GlueCatalogConfig glueConfig; + + public GlueClientFactory(GlueCatalogConfig glueConfig) { + this.glueConfig = glueConfig; + } + + public abstract GlueClient getGlueClient(); +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueSchemaExtractor.java new file mode 100644 index 000000000..60ca159bf --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueSchemaExtractor.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.hudi.common.util.VisibleForTesting; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.exception.SchemaExtractorException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; + +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Table; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class GlueSchemaExtractor { + private static final GlueSchemaExtractor INSTANCE = new GlueSchemaExtractor(); + private static final String FIELD_ID = "field.id"; + private static final String FIELD_OPTIONAL = "field.optional"; + private static final String FIELD_CURRENT = "field.current"; + + public static GlueSchemaExtractor getInstance() { + return INSTANCE; + } + + /** + * Extract column list from OneTable schema + * + * @param tableFormat tableFormat to handle format specific type conversion + * @param tableSchema OneTable schema + * @return glue table column list + */ + public List toColumns(String tableFormat, InternalSchema tableSchema) { + return toColumns(tableFormat, tableSchema, null); + } + + public List toColumns( + String tableFormat, InternalSchema tableSchema, Table existingTable) { + List columns = Lists.newArrayList(); + Set addedNames = Sets.newHashSet(); + for (InternalField field : tableSchema.getFields()) { + if (!addedNames.contains(field.getName())) { + int fieldId = field.getFieldId() != null ? field.getFieldId() : -1; + Column.Builder builder = + Column.builder() + .name(field.getName()) + .type(toTypeString(field.getSchema(), tableFormat)) + .parameters( + ImmutableMap.of( + getColumnProperty(tableFormat, FIELD_ID), + Integer.toString(fieldId), + getColumnProperty(tableFormat, FIELD_OPTIONAL), + Boolean.toString(field.getSchema().isNullable()), + getColumnProperty(tableFormat, FIELD_CURRENT), + "true")); + + if (!StringUtils.isEmpty(field.getSchema().getComment())) { + builder.comment(field.getSchema().getComment()); + } + columns.add(builder.build()); + addedNames.add(field.getName()); + } + } + + // if there are columns in existing glueTable that are not part of tableSchema, + // include them by setting "field.current" property to false + List existingColumns = + existingTable != null && existingTable.storageDescriptor() != null + ? existingTable.storageDescriptor().columns() + : Collections.emptyList(); + for (Column column : existingColumns) { + if (!addedNames.contains(column.name())) { + Map columnParams = new HashMap<>(); + if (column.hasParameters()) { + columnParams.putAll(column.parameters()); + } + columnParams.put(getColumnProperty(tableFormat, FIELD_CURRENT), "false"); + column = column.toBuilder().parameters(columnParams).build(); + columns.add(column); + addedNames.add(column.name()); + } + } + return columns; + } + + /** + * Get glue compatible column type from Onetable field schema + * + * @param tableFormat tableFormat to handle format specific type conversion + * @param fieldSchema OneTable field schema + * @return glue column type + */ + protected String toTypeString(InternalSchema fieldSchema, String tableFormat) { + switch (fieldSchema.getDataType()) { + case BOOLEAN: + return "boolean"; + case INT: + return "int"; + case LONG: + return "bigint"; + case FLOAT: + return "float"; + case DOUBLE: + return "double"; + case DATE: + return "date"; + case ENUM: + case STRING: + return "string"; + case TIMESTAMP: + case TIMESTAMP_NTZ: + return "timestamp"; + case FIXED: + case BYTES: + return "binary"; + case DECIMAL: + Map metadata = fieldSchema.getMetadata(); + if (metadata == null || metadata.isEmpty()) { + throw new NotSupportedException("Invalid decimal type, precision and scale is missing"); + } + int precision = + (int) + metadata.computeIfAbsent( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + k -> { + throw new NotSupportedException("Invalid decimal type, precision is missing"); + }); + int scale = + (int) + metadata.computeIfAbsent( + InternalSchema.MetadataKey.DECIMAL_SCALE, + k -> { + throw new NotSupportedException("Invalid decimal type, scale is missing"); + }); + return String.format("decimal(%s,%s)", precision, scale); + case RECORD: + final String nameToType = + fieldSchema.getFields().stream() + .map( + f -> + String.format( + "%s:%s", f.getName(), toTypeString(f.getSchema(), tableFormat))) + .collect(Collectors.joining(",")); + return String.format("struct<%s>", nameToType); + case LIST: + InternalField arrayElement = + fieldSchema.getFields().stream() + .filter( + arrayField -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals( + arrayField.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid array schema")); + return String.format("array<%s>", toTypeString(arrayElement.getSchema(), tableFormat)); + case MAP: + InternalField key = + fieldSchema.getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid map schema")); + InternalField value = + fieldSchema.getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid map schema")); + return String.format( + "map<%s,%s>", + toTypeString(key.getSchema(), tableFormat), + toTypeString(value.getSchema(), tableFormat)); + default: + throw new NotSupportedException("Unsupported type: " + fieldSchema.getDataType()); + } + } + + @VisibleForTesting + protected static String getColumnProperty(String tableFormat, String property) { + return String.format("%s.%s", tableFormat.toLowerCase(Locale.ENGLISH), property); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/table/IcebergGlueCatalogTableBuilder.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/table/IcebergGlueCatalogTableBuilder.java new file mode 100644 index 000000000..81b9449ae --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/table/IcebergGlueCatalogTableBuilder.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue.table; + +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier; +import static org.apache.xtable.catalog.glue.GlueCatalogSyncClient.GLUE_EXTERNAL_TABLE_TYPE; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.hadoop.HadoopTables; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.catalog.glue.GlueSchemaExtractor; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.storage.TableFormat; + +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +/** Iceberg specific table operations for Glue catalog sync */ +public class IcebergGlueCatalogTableBuilder implements CatalogTableBuilder { + + private final GlueSchemaExtractor schemaExtractor; + private final HadoopTables hadoopTables; + private static final String tableFormat = TableFormat.ICEBERG; + + public IcebergGlueCatalogTableBuilder(Configuration configuration) { + this.schemaExtractor = GlueSchemaExtractor.getInstance(); + this.hadoopTables = new HadoopTables(configuration); + } + + @VisibleForTesting + IcebergGlueCatalogTableBuilder(GlueSchemaExtractor schemaExtractor, HadoopTables hadoopTables) { + this.schemaExtractor = schemaExtractor; + this.hadoopTables = hadoopTables; + } + + @Override + public TableInput getCreateTableRequest( + InternalTable table, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); + BaseTable fsTable = loadTableFromFs(table.getBasePath()); + return TableInput.builder() + .name(tblIdentifier.getTableName()) + .tableType(GLUE_EXTERNAL_TABLE_TYPE) + .parameters(getTableParameters(fsTable)) + .storageDescriptor( + StorageDescriptor.builder() + .location(table.getBasePath()) + .columns(schemaExtractor.toColumns(tableFormat, table.getReadSchema())) + .build()) + .build(); + } + + @Override + public TableInput getUpdateTableRequest( + InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); + BaseTable icebergTable = loadTableFromFs(table.getBasePath()); + Map parameters = new HashMap<>(catalogTable.parameters()); + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, parameters.get(METADATA_LOCATION_PROP)); + parameters.put(METADATA_LOCATION_PROP, getMetadataFileLocation(icebergTable)); + parameters.putAll(icebergTable.properties()); + + return TableInput.builder() + .name(tblIdentifier.getTableName()) + .tableType(GLUE_EXTERNAL_TABLE_TYPE) + .parameters(parameters) + .storageDescriptor( + StorageDescriptor.builder() + .location(table.getBasePath()) + .columns( + schemaExtractor.toColumns(tableFormat, table.getReadSchema(), catalogTable)) + .build()) + .build(); + } + + @VisibleForTesting + Map getTableParameters(BaseTable icebergTable) { + Map parameters = new HashMap<>(icebergTable.properties()); + parameters.put(TABLE_TYPE_PROP, tableFormat); + parameters.put(METADATA_LOCATION_PROP, getMetadataFileLocation(icebergTable)); + return parameters; + } + + private BaseTable loadTableFromFs(String tableBasePath) { + return (BaseTable) hadoopTables.load(tableBasePath); + } + + private String getMetadataFileLocation(BaseTable table) { + return table.operations().current().metadataFileLocation(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java new file mode 100644 index 000000000..56e56df99 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.exception; + +import org.apache.xtable.model.exception.ErrorCode; +import org.apache.xtable.model.exception.InternalException; + +public class CatalogSyncException extends InternalException { + + public CatalogSyncException(ErrorCode errorCode, String message, Throwable e) { + super(errorCode, message, e); + } + + public CatalogSyncException(String message, Throwable e) { + super(ErrorCode.CATALOG_SYNC_GENERIC_EXCEPTION, message, e); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java b/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java index 4790d0d6c..ff70471b9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java +++ b/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java @@ -19,6 +19,8 @@ package org.apache.xtable.reflection; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.Arrays; import org.apache.xtable.exception.ConfigurationException; @@ -53,6 +55,34 @@ public static T createInstanceOfClass(String className, Object... constructo } } + public static T createInstanceOfClassFromStaticMethod( + String className, String methodName, Class[] argClasses, Object[] args) { + try { + // try loading the class; throw error if not found + Class clazz = (Class) ReflectionUtils.class.getClassLoader().loadClass(className); + + // Retrieve and make the specified method accessible + Method method = clazz.getDeclaredMethod(methodName, argClasses); + method.setAccessible(true); + + // Invoke the method if it's static; throw an error otherwise + if (Modifier.isStatic(method.getModifiers())) { + return (T) method.invoke(null, args); + } else { + throw new IllegalArgumentException("The specified method is not static: " + methodName); + } + } catch (ClassNotFoundException ex) { + throw new ConfigurationException("Unable to load class: " + className); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) { + throw new ConfigurationException( + String.format("Failed to invoke method '%s' in class '%s'", methodName, className)); + } + } + + public static T createInstanceOfClassFromStaticMethod(String className, String methodName) { + return createInstanceOfClassFromStaticMethod(className, methodName, new Class[] {}, null); + } + private static boolean hasConstructor(Class clazz, Class... constructorArgTypes) { try { clazz.getConstructor(constructorArgTypes); diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestSchemaExtractorBase.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestSchemaExtractorBase.java new file mode 100644 index 000000000..c8c4cc3b7 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestSchemaExtractorBase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import java.util.Collections; +import java.util.Map; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; + +public class TestSchemaExtractorBase { + protected static InternalField getPrimitiveOneField( + String fieldName, String schemaName, InternalType dataType, boolean isNullable, int fieldId) { + return getPrimitiveOneField( + fieldName, schemaName, dataType, isNullable, fieldId, Collections.emptyMap()); + } + + protected static InternalField getPrimitiveOneField( + String fieldName, + String schemaName, + InternalType dataType, + boolean isNullable, + int fieldId, + String parentPath) { + return getPrimitiveOneField( + fieldName, schemaName, dataType, isNullable, fieldId, parentPath, Collections.emptyMap()); + } + + protected static InternalField getPrimitiveOneField( + String fieldName, + String schemaName, + InternalType dataType, + boolean isNullable, + int fieldId, + Map metadata) { + return getPrimitiveOneField( + fieldName, schemaName, dataType, isNullable, fieldId, null, metadata); + } + + protected static InternalField getPrimitiveOneField( + String fieldName, + String schemaName, + InternalType dataType, + boolean isNullable, + int fieldId, + String parentPath, + Map metadata) { + return InternalField.builder() + .name(fieldName) + .parentPath(parentPath) + .schema( + InternalSchema.builder() + .name(schemaName) + .dataType(dataType) + .isNullable(isNullable) + .metadata(metadata) + .build()) + .fieldId(fieldId) + .build(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestTableFormatUtils.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestTableFormatUtils.java new file mode 100644 index 000000000..6a0808ef4 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestTableFormatUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import static org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import org.apache.iceberg.TableProperties; + +import org.apache.xtable.model.storage.TableFormat; + +class TestTableFormatUtils { + + @Test + void testGetTableDataLocation_HudiDelta() { + // For Hudi and Delta, data location should be tableLocation + String tableLocation = "base-path"; + assertEquals( + tableLocation, + TableFormatUtils.getTableDataLocation( + TableFormat.HUDI, tableLocation, Collections.emptyMap())); + assertEquals( + tableLocation, + TableFormatUtils.getTableDataLocation( + TableFormat.HUDI, + tableLocation, + Collections.singletonMap(TableProperties.WRITE_DATA_LOCATION, "base-path/data"))); + } + + @Test + void testGetTableDataLocation_Iceberg() { + // For Iceberg, data location will be WRITE_DATA_LOCATION / OBJECT_STORE_PATH param or + // "tableLocation/data" + String tableLocation = "base-path"; + + // no params is set + assertEquals( + tableLocation + "/data", + TableFormatUtils.getTableDataLocation( + TableFormat.ICEBERG, tableLocation, Collections.emptyMap())); + + // WRITE_DATA_LOCATION param is set + String writeDataPath = "base-path/iceberg"; + assertEquals( + writeDataPath, + TableFormatUtils.getTableDataLocation( + TableFormat.ICEBERG, + tableLocation, + Collections.singletonMap(TableProperties.WRITE_DATA_LOCATION, writeDataPath))); + + // OBJECT_STORE_PATH param is set + String objectStorePath = "base-path/iceberg"; + assertEquals( + objectStorePath, + TableFormatUtils.getTableDataLocation( + TableFormat.ICEBERG, + tableLocation, + Collections.singletonMap(TableProperties.OBJECT_STORE_PATH, objectStorePath))); + } + + @Test + void testGetTableFormat() { + Map params = new HashMap<>(); + + // table format is null when table type param in not present + assertNull(TableFormatUtils.getTableFormat(params)); + + // "table_type" is set + params.put("table_type", TableFormat.ICEBERG); + assertEquals(TableFormat.ICEBERG, TableFormatUtils.getTableFormat(params)); + + params.clear(); + // "spark.sql.sources.provider" is set + params.put(PROP_SPARK_SQL_SOURCES_PROVIDER, TableFormat.DELTA); + assertEquals(TableFormat.DELTA, TableFormatUtils.getTableFormat(params)); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/GlueCatalogSyncTestBase.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/GlueCatalogSyncTestBase.java new file mode 100644 index 000000000..59b533f3e --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/GlueCatalogSyncTestBase.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import static org.apache.xtable.catalog.glue.GlueCatalogSyncClient.GLUE_EXTERNAL_TABLE_TYPE; + +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.mockito.Mock; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.model.storage.TableFormat; + +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.DatabaseInput; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; + +public class GlueCatalogSyncTestBase { + + @Mock protected GlueClient mockGlueClient; + @Mock protected GlueCatalogConfig mockGlueCatalogConfig; + @Mock protected GlueSchemaExtractor mockGlueSchemaExtractor; + protected final Configuration testConfiguration = new Configuration(); + + protected static final String TEST_GLUE_DATABASE = "glue_db"; + protected static final String TEST_GLUE_TABLE = "glue_table"; + protected static final String TEST_GLUE_CATALOG_ID = "aws-account-id"; + protected static final String TEST_BASE_PATH = "base-path"; + protected static final String TEST_CATALOG_NAME = "aws-glue-1"; + protected static final String ICEBERG_METADATA_FILE_LOCATION = "base-path/metadata"; + protected static final String ICEBERG_METADATA_FILE_LOCATION_v2 = "base-path/v2-metadata"; + protected static final InternalTable TEST_ICEBERG_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.ICEBERG) + .readSchema(InternalSchema.builder().fields(Collections.emptyList()).build()) + .build(); + protected static final InternalTable TEST_HUDI_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.HUDI) + .readSchema(InternalSchema.builder().fields(Collections.emptyList()).build()) + .build(); + protected static final ThreePartHierarchicalTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER = + new ThreePartHierarchicalTableIdentifier(TEST_GLUE_DATABASE, TEST_GLUE_TABLE); + protected static final ExternalCatalogConfig catalogConfig = + ExternalCatalogConfig.builder() + .catalogId(TEST_CATALOG_NAME) + .catalogType(CatalogType.GLUE) + .catalogSyncClientImpl(GlueCatalogSyncClient.class.getCanonicalName()) + .catalogProperties(Collections.emptyMap()) + .build(); + protected static final TableInput TEST_TABLE_INPUT = TableInput.builder().build(); + protected static final GlueException TEST_GLUE_EXCEPTION = + (GlueException) GlueException.builder().message("something went wrong").build(); + + protected GetDatabaseRequest getDbRequest(String dbName) { + return GetDatabaseRequest.builder().catalogId(TEST_GLUE_CATALOG_ID).name(dbName).build(); + } + + protected GetTableRequest getTableRequest(String dbName, String tableName) { + return GetTableRequest.builder() + .catalogId(TEST_GLUE_CATALOG_ID) + .databaseName(dbName) + .name(tableName) + .build(); + } + + protected CreateDatabaseRequest createDbRequest(String dbName) { + return CreateDatabaseRequest.builder() + .catalogId(TEST_GLUE_CATALOG_ID) + .databaseInput( + DatabaseInput.builder() + .name(dbName) + .description("Created by " + GlueCatalogSyncClient.class.getName()) + .build()) + .build(); + } + + protected TableInput getCreateOrUpdateTableInput( + String tableName, Map params, InternalTable internalTable) { + return TableInput.builder() + .name(tableName) + .tableType(GLUE_EXTERNAL_TABLE_TYPE) + .parameters(params) + .storageDescriptor( + StorageDescriptor.builder() + .location(internalTable.getBasePath()) + .columns(Collections.emptyList()) + .build()) + .build(); + } + + protected CreateTableRequest createTableRequest(String dbName, TableInput tableInput) { + return CreateTableRequest.builder() + .catalogId(TEST_GLUE_CATALOG_ID) + .databaseName(dbName) + .tableInput(tableInput) + .build(); + } + + protected UpdateTableRequest updateTableRequest(String dbName, TableInput tableInput) { + return UpdateTableRequest.builder() + .catalogId(TEST_GLUE_CATALOG_ID) + .databaseName(dbName) + .skipArchive(true) + .tableInput(tableInput) + .build(); + } + + protected DeleteTableRequest deleteTableRequest(String dbName, String tableName) { + return DeleteTableRequest.builder() + .catalogId(TEST_GLUE_CATALOG_ID) + .databaseName(dbName) + .name(tableName) + .build(); + } + + protected Table getGlueTable(String dbName, String tableName, String location) { + return Table.builder() + .databaseName(dbName) + .name(tableName) + .storageDescriptor(StorageDescriptor.builder().location(location).build()) + .build(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConfig.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConfig.java new file mode 100644 index 000000000..a7ffef009 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConfig.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestGlueCatalogConfig { + private static final String GLUE_CATALOG_ID_KEY = "externalCatalog.glue.catalogId"; + private static final String GLUE_CATALOG_ID_VALUE = "aws-accountId"; + private static final String GLUE_CATALOG_REGION_KEY = "externalCatalog.glue.region"; + private static final String GLUE_CATALOG_REGION_VALUE = "aws-region"; + private static final String GLUE_CATALOG_CREDENTIAL_PROVIDER_KEY = + "externalCatalog.glue.credentialsProviderClass"; + private static final String GLUE_CATALOG_CREDENTIAL_PROVIDER_VALUE = + "externalCatalog.glue.credentialsProviderClass"; + private static final String GLUE_CATALOG_LAKE_FORMATION_KEY = + "externalCatalog.glue.lakeFormationEnabled"; + + @Test + void testGetGlueCatalogConfig_withNoPropertiesSet() { + Map props = Collections.emptyMap(); + GlueCatalogConfig catalogConfig = GlueCatalogConfig.of(props); + assertNull(catalogConfig.getCatalogId()); + assertNull(catalogConfig.getRegion()); + assertNull(catalogConfig.getClientCredentialsProviderClass()); + } + + @Test + void testGetGlueCatalogConfig_withMissingProperties() { + Map props = + createProps( + GLUE_CATALOG_ID_KEY, + GLUE_CATALOG_ID_VALUE, + GLUE_CATALOG_REGION_KEY, + GLUE_CATALOG_REGION_VALUE); + GlueCatalogConfig catalogConfig = GlueCatalogConfig.of(props); + assertEquals(GLUE_CATALOG_ID_VALUE, catalogConfig.getCatalogId()); + assertEquals(GLUE_CATALOG_REGION_VALUE, catalogConfig.getRegion()); + assertNull(catalogConfig.getClientCredentialsProviderClass()); + } + + @Test + void testGetGlueCatalogConfig_withUnknownProperty() { + Map props = + createProps( + GLUE_CATALOG_ID_KEY, + GLUE_CATALOG_ID_VALUE, + GLUE_CATALOG_REGION_KEY, + GLUE_CATALOG_REGION_VALUE, + "externalCatalog.glue.unknownProperty", + "unknown-property-value"); + assertDoesNotThrow(() -> GlueCatalogConfig.of(props)); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testGetGlueCatalogConfig_withAllPropertiesSet(boolean lakeformationEnabled) { + Map props = + createProps( + GLUE_CATALOG_ID_KEY, + GLUE_CATALOG_ID_VALUE, + GLUE_CATALOG_REGION_KEY, + GLUE_CATALOG_REGION_VALUE, + GLUE_CATALOG_CREDENTIAL_PROVIDER_KEY, + GLUE_CATALOG_CREDENTIAL_PROVIDER_VALUE, + GLUE_CATALOG_LAKE_FORMATION_KEY, + String.valueOf(lakeformationEnabled)); + GlueCatalogConfig catalogConfig = GlueCatalogConfig.of(props); + assertEquals(GLUE_CATALOG_ID_VALUE, catalogConfig.getCatalogId()); + assertEquals(GLUE_CATALOG_REGION_VALUE, catalogConfig.getRegion()); + assertEquals( + GLUE_CATALOG_CREDENTIAL_PROVIDER_VALUE, catalogConfig.getClientCredentialsProviderClass()); + } + + private Map createProps(String... keyValues) { + Map props = new HashMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + props.put(keyValues[i], keyValues[i + 1]); + } + return props; + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConversionSource.java new file mode 100644 index 000000000..1a1c99c27 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConversionSource.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.model.storage.TableFormat; + +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; + +@ExtendWith(MockitoExtension.class) +public class TestGlueCatalogConversionSource { + + @Mock private GlueCatalogConfig mockCatalogConfig; + @Mock private GlueClient mockGlueClient; + private GlueCatalogConversionSource catalogConversionSource; + private static final String GLUE_DB = "glue_db"; + private static final String GLUE_TABLE = "glue_tbl"; + private static final String TABLE_BASE_PATH = "/var/data/table"; + private static final String GLUE_CATALOG_ID = "aws-account-id"; + private static final ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier(GLUE_DB, GLUE_TABLE); + private static final GetTableRequest getTableRequest = + GetTableRequest.builder() + .catalogId(GLUE_CATALOG_ID) + .databaseName(GLUE_DB) + .name(GLUE_TABLE) + .build(); + + @BeforeEach + void init() { + when(mockCatalogConfig.getCatalogId()).thenReturn(GLUE_CATALOG_ID); + catalogConversionSource = new GlueCatalogConversionSource(mockCatalogConfig, mockGlueClient); + } + + @Test + void testGetSourceTable_errorGettingTableFromGlue() { + // error getting table from glue + when(mockGlueClient.getTable(getTableRequest)) + .thenThrow(GlueException.builder().message("something went wrong").build()); + assertThrows( + CatalogSyncException.class, () -> catalogConversionSource.getSourceTable(tableIdentifier)); + + verify(mockGlueClient, times(1)).getTable(getTableRequest); + } + + @Test + void testGetSourceTable_tableNotFoundInGlue() { + // table not found in glue + when(mockGlueClient.getTable(getTableRequest)) + .thenThrow(EntityNotFoundException.builder().message("table not found").build()); + assertThrows( + CatalogSyncException.class, () -> catalogConversionSource.getSourceTable(tableIdentifier)); + + verify(mockGlueClient, times(1)).getTable(getTableRequest); + } + + @Test + void testGetSourceTable_tableFormatNotPresent() { + // table format not present in table properties + when(mockGlueClient.getTable(getTableRequest)) + .thenReturn(GetTableResponse.builder().table(Table.builder().build()).build()); + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> catalogConversionSource.getSourceTable(tableIdentifier)); + assertEquals( + "TableFormat is null or empty for table: glue_db.glue_tbl", exception.getMessage()); + + verify(mockGlueClient, times(1)).getTable(getTableRequest); + } + + @ParameterizedTest + @CsvSource(value = {"ICEBERG", "HUDI", "DELTA"}) + void testGetSourceTable(String tableFormat) { + StorageDescriptor sd = StorageDescriptor.builder().location(TABLE_BASE_PATH).build(); + Map tableParams = new HashMap<>(); + if (tableFormat.equals(TableFormat.ICEBERG)) { + tableParams.put("write.data.path", String.format("%s/iceberg", TABLE_BASE_PATH)); + tableParams.put("table_type", tableFormat); + } else { + tableParams.put("spark.sql.sources.provider", tableFormat); + } + + String dataPath = + tableFormat.equals(TableFormat.ICEBERG) + ? String.format("%s/iceberg", TABLE_BASE_PATH) + : TABLE_BASE_PATH; + SourceTable expected = + newSourceTable(GLUE_TABLE, TABLE_BASE_PATH, dataPath, tableFormat, tableParams); + when(mockGlueClient.getTable(getTableRequest)) + .thenReturn( + GetTableResponse.builder() + .table(newGlueTable(GLUE_DB, GLUE_TABLE, tableParams, sd)) + .build()); + SourceTable output = catalogConversionSource.getSourceTable(tableIdentifier); + assertEquals(expected, output); + + verify(mockGlueClient, times(1)).getTable(getTableRequest); + } + + private Table newGlueTable( + String dbName, String tableName, Map params, StorageDescriptor sd) { + return Table.builder() + .databaseName(dbName) + .name(tableName) + .parameters(params) + .storageDescriptor(sd) + .build(); + } + + private SourceTable newSourceTable( + String tblName, + String basePath, + String dataPath, + String tblFormat, + Map params) { + Properties tblProperties = new Properties(); + tblProperties.putAll(params); + return SourceTable.builder() + .name(tblName) + .basePath(basePath) + .dataPath(dataPath) + .formatName(tblFormat) + .additionalProperties(tblProperties) + .build(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogSyncClient.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogSyncClient.java new file mode 100644 index 000000000..f09f50e7d --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogSyncClient.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.ZonedDateTime; +import java.util.Collections; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; + +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.CreateTableResponse; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableResponse; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; +import software.amazon.awssdk.services.glue.model.UpdateTableResponse; + +@ExtendWith(MockitoExtension.class) +public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { + + @Mock private CatalogTableBuilder mockTableBuilder; + private GlueCatalogSyncClient glueCatalogSyncClient; + + private GlueCatalogSyncClient createGlueCatalogSyncClient() { + return new GlueCatalogSyncClient( + catalogConfig, testConfiguration, mockGlueCatalogConfig, mockGlueClient, mockTableBuilder); + } + + void setupCommonMocks() { + glueCatalogSyncClient = createGlueCatalogSyncClient(); + when(mockGlueCatalogConfig.getCatalogId()).thenReturn(TEST_GLUE_CATALOG_ID); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHasDatabase(boolean isDbPresent) { + setupCommonMocks(); + GetDatabaseRequest dbRequest = getDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()); + GetDatabaseResponse dbResponse = + GetDatabaseResponse.builder() + .database( + Database.builder().name(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()).build()) + .build(); + if (isDbPresent) { + when(mockGlueClient.getDatabase(dbRequest)).thenReturn(dbResponse); + } else { + when(mockGlueClient.getDatabase(dbRequest)) + .thenThrow(EntityNotFoundException.builder().message("db not found").build()); + } + boolean output = glueCatalogSyncClient.hasDatabase(TEST_CATALOG_TABLE_IDENTIFIER); + if (isDbPresent) { + assertTrue(output); + } else { + assertFalse(output); + } + verify(mockGlueClient, times(1)).getDatabase(dbRequest); + } + + @Test + void testHasDatabaseFailure() { + setupCommonMocks(); + GetDatabaseRequest dbRequest = getDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()); + when(mockGlueClient.getDatabase(dbRequest)).thenThrow(TEST_GLUE_EXCEPTION); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> glueCatalogSyncClient.hasDatabase(TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to get database: %s", TEST_GLUE_DATABASE), exception.getMessage()); + verify(mockGlueClient, times(1)).getDatabase(dbRequest); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGetTable(boolean isTablePresent) { + setupCommonMocks(); + GetTableRequest tableRequest = + getTableRequest( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName()); + GetTableResponse tableResponse = + GetTableResponse.builder() + .table( + Table.builder() + .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()) + .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()) + .build()) + .build(); + if (isTablePresent) { + when(mockGlueClient.getTable(tableRequest)).thenReturn(tableResponse); + } else { + when(mockGlueClient.getTable(tableRequest)) + .thenThrow(EntityNotFoundException.builder().message("table not found").build()); + } + Table table = glueCatalogSyncClient.getTable(TEST_CATALOG_TABLE_IDENTIFIER); + if (isTablePresent) { + assertNotNull(table); + Assertions.assertEquals( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), table.databaseName()); + Assertions.assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), table.name()); + } else { + assertNull(table); + } + verify(mockGlueClient, times(1)).getTable(tableRequest); + } + + @Test + void testGetTableFailure() { + setupCommonMocks(); + GetTableRequest tableRequest = + getTableRequest( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName()); + when(mockGlueClient.getTable(tableRequest)).thenThrow(TEST_GLUE_EXCEPTION); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> glueCatalogSyncClient.getTable(TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to get table: %s.%s", TEST_GLUE_DATABASE, TEST_GLUE_TABLE), + exception.getMessage()); + verify(mockGlueClient, times(1)).getTable(tableRequest); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testCreateDatabase(boolean shouldFail) { + setupCommonMocks(); + CreateDatabaseRequest dbRequest = + createDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()); + if (shouldFail) { + when(mockGlueClient.createDatabase(dbRequest)).thenThrow(TEST_GLUE_EXCEPTION); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> glueCatalogSyncClient.createDatabase(TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to create database: %s", TEST_GLUE_DATABASE), + exception.getMessage()); + } else { + when(mockGlueClient.createDatabase(dbRequest)) + .thenReturn(CreateDatabaseResponse.builder().build()); + glueCatalogSyncClient.createDatabase(TEST_CATALOG_TABLE_IDENTIFIER); + } + verify(mockGlueClient, times(1)).createDatabase(dbRequest); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testDropTable(boolean shouldFail) { + setupCommonMocks(); + DeleteTableRequest deleteRequest = + deleteTableRequest( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName()); + if (shouldFail) { + when(mockGlueClient.deleteTable(deleteRequest)).thenThrow(TEST_GLUE_EXCEPTION); + RuntimeException exception = + assertThrows( + RuntimeException.class, + () -> + glueCatalogSyncClient.dropTable( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to drop table: %s.%s", TEST_GLUE_DATABASE, TEST_GLUE_TABLE), + exception.getMessage()); + } else { + when(mockGlueClient.deleteTable(deleteRequest)) + .thenReturn(DeleteTableResponse.builder().build()); + glueCatalogSyncClient.dropTable(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + } + verify(mockGlueClient, times(1)).deleteTable(deleteRequest); + } + + @Test + void testCreateTable_Success() { + setupCommonMocks(); + CreateTableRequest createTableRequest = + createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), TEST_TABLE_INPUT); + when(mockTableBuilder.getCreateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(TEST_TABLE_INPUT); + when(mockGlueClient.createTable(createTableRequest)) + .thenReturn(CreateTableResponse.builder().build()); + glueCatalogSyncClient.createTable(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockGlueClient, times(1)).createTable(createTableRequest); + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + } + + @Test + void testCreateTable_ErrorGettingTableInput() { + glueCatalogSyncClient = createGlueCatalogSyncClient(); + + // error when getting iceberg table input + doThrow(new RuntimeException("something went wrong")) + .when(mockTableBuilder) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + assertThrows( + RuntimeException.class, + () -> + glueCatalogSyncClient.createTable( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockGlueClient, never()).createTable(any(CreateTableRequest.class)); + } + + @Test + void testCreateTable_ErrorCreatingTable() { + setupCommonMocks(); + + // error when creating table + CreateTableRequest createTableRequest = + createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), TEST_TABLE_INPUT); + when(mockTableBuilder.getCreateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(TEST_TABLE_INPUT); + when(mockGlueClient.createTable(createTableRequest)).thenThrow(TEST_GLUE_EXCEPTION); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + glueCatalogSyncClient.createTable( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to create table: %s.%s", TEST_GLUE_DATABASE, TEST_GLUE_TABLE), + exception.getMessage()); + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockGlueClient, times(1)).createTable(createTableRequest); + } + + @Test + void testRefreshTable_Success() { + setupCommonMocks(); + UpdateTableRequest updateTableRequest = + updateTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), TEST_TABLE_INPUT); + Table glueTable = Table.builder().parameters(Collections.emptyMap()).build(); + when(mockTableBuilder.getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(TEST_TABLE_INPUT); + when(mockGlueClient.updateTable(updateTableRequest)) + .thenReturn(UpdateTableResponse.builder().build()); + glueCatalogSyncClient.refreshTable( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockGlueClient, times(1)).updateTable(updateTableRequest); + verify(mockTableBuilder, times(1)) + .getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); + } + + @Test + void testRefreshTable_ErrorCreatingTableInput() { + glueCatalogSyncClient = createGlueCatalogSyncClient(); + Table glueTable = Table.builder().parameters(Collections.emptyMap()).build(); + + // error while refreshing table + doThrow(new RuntimeException("something went wrong")) + .when(mockTableBuilder) + .getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); + assertThrows( + RuntimeException.class, + () -> + glueCatalogSyncClient.refreshTable( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER)); + verify(mockTableBuilder, times(1)) + .getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockGlueClient, never()).updateTable(any(UpdateTableRequest.class)); + } + + @Test + void testRefreshTable_ErrorRefreshingTable() { + setupCommonMocks(); + Table glueTable = Table.builder().parameters(Collections.emptyMap()).build(); + + UpdateTableRequest updateTableRequest = + updateTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), TEST_TABLE_INPUT); + when(mockTableBuilder.getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(TEST_TABLE_INPUT); + + // error while refreshing table + when(mockGlueClient.updateTable(updateTableRequest)).thenThrow(TEST_GLUE_EXCEPTION); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + glueCatalogSyncClient.refreshTable( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to refresh table: %s.%s", TEST_GLUE_DATABASE, TEST_GLUE_TABLE), + exception.getMessage()); + verify(mockTableBuilder, times(1)) + .getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockGlueClient, times(1)).updateTable(updateTableRequest); + } + + @Test + void testCreateOrReplaceTable() { + setupCommonMocks(); + + ZonedDateTime fixedDateTime = ZonedDateTime.parse("2024-10-25T10:15:30.00Z"); + try (MockedStatic mockZonedDateTime = mockStatic(ZonedDateTime.class)) { + mockZonedDateTime.when(ZonedDateTime::now).thenReturn(fixedDateTime); + String tempTableName = + TEST_CATALOG_TABLE_IDENTIFIER.getTableName() + + "_temp" + + ZonedDateTime.now().toEpochSecond(); + ThreePartHierarchicalTableIdentifier tempTableIdentifier = + new ThreePartHierarchicalTableIdentifier( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), tempTableName); + TableInput tableInput = TableInput.builder().name(TEST_GLUE_TABLE).build(); + TableInput tempTableInput = TableInput.builder().name(tempTableName).build(); + CreateTableRequest origCreateTableRequest = + createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), tableInput); + CreateTableRequest tempCreateTableRequest = + createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), tempTableInput); + DeleteTableRequest origDeleteTableRequest = + deleteTableRequest( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName()); + DeleteTableRequest tempDeleteTableRequest = + deleteTableRequest( + tempTableIdentifier.getDatabaseName(), tempTableIdentifier.getTableName()); + + when(mockTableBuilder.getCreateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(tableInput); + when(mockTableBuilder.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, tempTableIdentifier)) + .thenReturn(tempTableInput); + + glueCatalogSyncClient.createOrReplaceTable( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + + verify(mockGlueClient, times(1)).createTable(tempCreateTableRequest); + verify(mockGlueClient, times(1)).deleteTable(tempDeleteTableRequest); + verify(mockGlueClient, times(1)).createTable(origCreateTableRequest); + verify(mockGlueClient, times(1)).deleteTable(origDeleteTableRequest); + + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, tempTableIdentifier); + } + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueSchemaExtractor.java new file mode 100644 index 000000000..1e658fbe3 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueSchemaExtractor.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue; + +import static org.apache.xtable.catalog.glue.GlueSchemaExtractor.getColumnProperty; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import org.apache.xtable.catalog.TestSchemaExtractorBase; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.storage.TableFormat; + +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; + +public class TestGlueSchemaExtractor extends TestSchemaExtractorBase { + + private Column getCurrentGlueTableColumn( + String tableFormat, String colName, String colType, Integer fieldId, boolean isNullable) { + fieldId = fieldId != null ? fieldId : -1; + return Column.builder() + .name(colName) + .type(colType) + .parameters( + ImmutableMap.of( + getColumnProperty(tableFormat, "field.id"), Integer.toString(fieldId), + getColumnProperty(tableFormat, "field.optional"), Boolean.toString(isNullable), + getColumnProperty(tableFormat, "field.current"), "true")) + .build(); + } + + private Column getPreviousGlueTableColumn(String tableFormat, String colName, String colType) { + return Column.builder() + .name(colName) + .type(colType) + .parameters(ImmutableMap.of(getColumnProperty(tableFormat, "field.current"), "false")) + .build(); + } + + @Test + void testPrimitiveTypes_NoExistingTable() { + int precision = 10; + int scale = 5; + Map doubleMetadata = new HashMap<>(); + doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, precision); + doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, scale); + String tableFormat = TableFormat.ICEBERG; + + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "requiredBoolean", "boolean", InternalType.BOOLEAN, false, 1), + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 2), + getPrimitiveOneField("requiredInt", "integer", InternalType.INT, false, 3), + getPrimitiveOneField("requiredLong", "long", InternalType.LONG, false, 4), + getPrimitiveOneField("requiredDouble", "double", InternalType.DOUBLE, false, 5), + getPrimitiveOneField("requiredFloat", "float", InternalType.FLOAT, false, 6), + getPrimitiveOneField("requiredString", "string", InternalType.STRING, false, 7), + getPrimitiveOneField("requiredBytes", "binary", InternalType.BYTES, false, 8), + getPrimitiveOneField("requiredDate", "date", InternalType.DATE, false, 9), + getPrimitiveOneField( + "requiredDecimal", + "decimal", + InternalType.DECIMAL, + false, + 10, + doubleMetadata), + getPrimitiveOneField( + "requiredTimestamp", "timestamp", InternalType.TIMESTAMP, false, 11), + getPrimitiveOneField( + "requiredTimestampNTZ", + "timestamp_ntz", + InternalType.TIMESTAMP_NTZ, + false, + 12))) + .build(); + + List expectedGlueColumns = + Arrays.asList( + getCurrentGlueTableColumn(tableFormat, "requiredBoolean", "boolean", 1, false), + getCurrentGlueTableColumn(tableFormat, "optionalBoolean", "boolean", 2, true), + getCurrentGlueTableColumn(tableFormat, "requiredInt", "int", 3, false), + getCurrentGlueTableColumn(tableFormat, "requiredLong", "bigint", 4, false), + getCurrentGlueTableColumn(tableFormat, "requiredDouble", "double", 5, false), + getCurrentGlueTableColumn(tableFormat, "requiredFloat", "float", 6, false), + getCurrentGlueTableColumn(tableFormat, "requiredString", "string", 7, false), + getCurrentGlueTableColumn(tableFormat, "requiredBytes", "binary", 8, false), + getCurrentGlueTableColumn(tableFormat, "requiredDate", "date", 9, false), + getCurrentGlueTableColumn( + tableFormat, + "requiredDecimal", + String.format("decimal(%s,%s)", precision, scale), + 10, + false), + getCurrentGlueTableColumn(tableFormat, "requiredTimestamp", "timestamp", 11, false), + getCurrentGlueTableColumn(tableFormat, "requiredTimestampNTZ", "timestamp", 12, false)); + + assertEquals( + expectedGlueColumns, GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testTimestamps_NoExistingTable() { + String tableFormat = TableFormat.ICEBERG; + Map millisTimestamp = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); + + Map microsTimestamp = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "requiredTimestampMillis", + "timestamp", + InternalType.TIMESTAMP, + false, + 1, + millisTimestamp), + getPrimitiveOneField( + "requiredTimestampMicros", + "timestamp", + InternalType.TIMESTAMP, + false, + 2, + microsTimestamp), + getPrimitiveOneField( + "requiredTimestampNTZMillis", + "timestamp_ntz", + InternalType.TIMESTAMP_NTZ, + false, + 3, + millisTimestamp), + getPrimitiveOneField( + "requiredTimestampNTZMicros", + "timestamp_ntz", + InternalType.TIMESTAMP_NTZ, + false, + 4, + microsTimestamp))) + .build(); + + List expectedGlueColumns = + Arrays.asList( + getCurrentGlueTableColumn( + tableFormat, "requiredTimestampMillis", "timestamp", 1, false), + getCurrentGlueTableColumn( + tableFormat, "requiredTimestampMicros", "timestamp", 2, false), + getCurrentGlueTableColumn( + tableFormat, "requiredTimestampNTZMillis", "timestamp", 3, false), + getCurrentGlueTableColumn( + tableFormat, "requiredTimestampNTZMicros", "timestamp", 4, false)); + + assertEquals( + expectedGlueColumns, GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testMaps_NoExistingTable() { + String tableFormat = TableFormat.ICEBERG; + InternalSchema recordMapElementSchema = + InternalSchema.builder() + .name("struct") + .isNullable(true) + .fields( + Arrays.asList( + getPrimitiveOneField( + "requiredDouble", + "double", + InternalType.DOUBLE, + false, + 1, + "recordMap._one_field_value"), + getPrimitiveOneField( + "optionalString", + "string", + InternalType.STRING, + true, + 2, + "recordMap._one_field_value"))) + .dataType(InternalType.RECORD) + .build(); + + InternalSchema oneSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("intMap") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + getPrimitiveOneField( + InternalField.Constants.MAP_KEY_FIELD_NAME, + "string", + InternalType.STRING, + false, + 3, + "intMap"), + getPrimitiveOneField( + InternalField.Constants.MAP_VALUE_FIELD_NAME, + "integer", + InternalType.INT, + false, + 4, + "intMap"))) + .build()) + .build(), + InternalField.builder() + .name("recordMap") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("map") + .isNullable(true) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + getPrimitiveOneField( + InternalField.Constants.MAP_KEY_FIELD_NAME, + "integer", + InternalType.INT, + false, + 5, + "recordMap"), + InternalField.builder() + .name(InternalField.Constants.MAP_VALUE_FIELD_NAME) + .fieldId(6) + .parentPath("recordMap") + .schema(recordMapElementSchema) + .build())) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + List expectedGlueColumns = + Arrays.asList( + getCurrentGlueTableColumn(tableFormat, "intMap", "map", 1, false), + getCurrentGlueTableColumn( + tableFormat, + "recordMap", + "map>", + 2, + true)); + + assertEquals( + expectedGlueColumns, GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testLists_NoExistingTable() { + String tableFormat = TableFormat.ICEBERG; + InternalSchema recordListElementSchema = + InternalSchema.builder() + .name("struct") + .isNullable(true) + .fields( + Arrays.asList( + getPrimitiveOneField( + "requiredDouble", + "double", + InternalType.DOUBLE, + false, + 11, + "recordMap._one_field_value"), + getPrimitiveOneField( + "optionalString", + "string", + InternalType.STRING, + true, + 12, + "recordMap._one_field_value"))) + .dataType(InternalType.RECORD) + .build(); + + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .name("record") + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("intList") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Collections.singletonList( + getPrimitiveOneField( + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME, + "integer", + InternalType.INT, + false, + 13, + "intList"))) + .build()) + .build(), + InternalField.builder() + .name("recordList") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("list") + .isNullable(true) + .dataType(InternalType.LIST) + .fields( + Collections.singletonList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .fieldId(14) + .parentPath("recordList") + .schema(recordListElementSchema) + .build())) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + List expectedGlueColumns = + Arrays.asList( + getCurrentGlueTableColumn(tableFormat, "intList", "array", 1, false), + getCurrentGlueTableColumn( + tableFormat, + "recordList", + "array>", + 2, + true)); + + assertEquals( + expectedGlueColumns, GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testNestedRecords_NoExistingTable() { + String tableFormat = TableFormat.ICEBERG; + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .name("record") + .isNullable(false) + .fields( + Collections.singletonList( + InternalField.builder() + .name("nestedOne") + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .fieldId(1) + .schema( + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + getPrimitiveOneField( + "nestedOptionalInt", + "integer", + InternalType.INT, + true, + 11, + "nestedOne"), + getPrimitiveOneField( + "nestedRequiredDouble", + "double", + InternalType.DOUBLE, + false, + 12, + "nestedOne"), + InternalField.builder() + .name("nestedTwo") + .parentPath("nestedOne") + .fieldId(13) + .schema( + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Collections.singletonList( + getPrimitiveOneField( + "doublyNestedString", + "string", + InternalType.STRING, + true, + 14, + "nestedOne.nestedTwo"))) + .build()) + .build())) + .build()) + .build())) + .build(); + + List expectedGlueColumns = + Arrays.asList( + getCurrentGlueTableColumn( + tableFormat, + "nestedOne", + "struct>", + 1, + true)); + assertEquals( + expectedGlueColumns, GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testToColumns_NoColumnsFromExistingTable() { + String tableFormat = TableFormat.ICEBERG; + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 2), + getPrimitiveOneField("requiredInt", "integer", InternalType.INT, false, 3))) + .build(); + + List
    tableList = + Arrays.asList( + // table is null + null, + // storageDescriptor is null + Table.builder().build(), + // no columns present + Table.builder().storageDescriptor(StorageDescriptor.builder().build()).build()); + + List expectedGlueColumns = + Arrays.asList( + getCurrentGlueTableColumn(tableFormat, "optionalBoolean", "boolean", 2, true), + getCurrentGlueTableColumn(tableFormat, "requiredInt", "int", 3, false)); + + for (Table table : tableList) { + assertEquals( + expectedGlueColumns, + GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema, table)); + } + } + + @Test + void testToColumns_ValidExistingTable() { + String tableFormat = TableFormat.ICEBERG; + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 2), + getPrimitiveOneField("requiredInt", "integer", InternalType.INT, false, 3))) + .build(); + + Table existingTable = + Table.builder() + .storageDescriptor( + StorageDescriptor.builder() + .columns( + ImmutableList.of( + Column.builder().name("prev_x").type("string").build(), + Column.builder().name("prev_y").type("string").build())) + .build()) + .build(); + + List expectedGlueColumns = + Arrays.asList( + getCurrentGlueTableColumn(tableFormat, "optionalBoolean", "boolean", 2, true), + getCurrentGlueTableColumn(tableFormat, "requiredInt", "int", 3, false), + getPreviousGlueTableColumn(tableFormat, "prev_x", "string"), + getPreviousGlueTableColumn(tableFormat, "prev_y", "string")); + + assertEquals( + expectedGlueColumns, + GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema, existingTable)); + } + + @Test + void testUnsupportedType() { + String tableFormat = TableFormat.ICEBERG; + // Unknown "UNION" type + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 2), + InternalField.builder() + .name("unionField") + .schema( + InternalSchema.builder() + .name("unionSchema") + .dataType(InternalType.UNION) + .isNullable(true) + .build()) + .fieldId(2) + .build())) + .build(); + + NotSupportedException exception = + assertThrows( + NotSupportedException.class, + () -> GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + assertEquals("Unsupported type: InternalType.UNION(name=union)", exception.getMessage()); + + // Invalid decimal type (precision and scale metadata is missing) + InternalSchema oneSchema2 = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 1), + getPrimitiveOneField( + "optionalDecimal", "decimal", InternalType.DECIMAL, true, 2))) + .build(); + + exception = + assertThrows( + NotSupportedException.class, + () -> GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema2)); + assertEquals("Invalid decimal type, precision and scale is missing", exception.getMessage()); + + // Invalid decimal type (scale metadata is missing) + Map doubleMetadata = new HashMap<>(); + doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10); + InternalSchema oneSchema3 = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 1), + getPrimitiveOneField( + "optionalDecimal", + "decimal", + InternalType.DECIMAL, + true, + 2, + doubleMetadata))) + .build(); + + exception = + assertThrows( + NotSupportedException.class, + () -> GlueSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema3)); + assertEquals("Invalid decimal type, scale is missing", exception.getMessage()); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/table/TestIcebergGlueCatalogTableBuilder.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/table/TestIcebergGlueCatalogTableBuilder.java new file mode 100644 index 000000000..c6898e17c --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/table/TestIcebergGlueCatalogTableBuilder.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.glue.table; + +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.hadoop.HadoopTables; + +import org.apache.xtable.catalog.glue.GlueCatalogSyncTestBase; +import org.apache.xtable.model.storage.TableFormat; + +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +@ExtendWith(MockitoExtension.class) +public class TestIcebergGlueCatalogTableBuilder extends GlueCatalogSyncTestBase { + + @Mock private HadoopTables mockIcebergHadoopTables; + @Mock private BaseTable mockIcebergBaseTable; + @Mock private TableOperations mockIcebergTableOperations; + @Mock private TableMetadata mockIcebergTableMetadata; + private IcebergGlueCatalogTableBuilder icebergGlueCatalogTableBuilder; + + private IcebergGlueCatalogTableBuilder createIcebergGlueCatalogSyncHelper() { + return new IcebergGlueCatalogTableBuilder(mockGlueSchemaExtractor, mockIcebergHadoopTables); + } + + void setupCommonMocks() { + icebergGlueCatalogTableBuilder = createIcebergGlueCatalogSyncHelper(); + } + + void mockIcebergHadoopTables() { + when(mockIcebergHadoopTables.load(TEST_BASE_PATH)).thenReturn(mockIcebergBaseTable); + mockIcebergMetadataFileLocation(); + } + + void mockIcebergMetadataFileLocation() { + when(mockIcebergBaseTable.operations()).thenReturn(mockIcebergTableOperations); + when(mockIcebergTableOperations.current()).thenReturn(mockIcebergTableMetadata); + when(mockIcebergTableMetadata.metadataFileLocation()) + .thenReturn(ICEBERG_METADATA_FILE_LOCATION); + } + + @Test + void testGetCreateTableRequest() { + setupCommonMocks(); + mockIcebergHadoopTables(); + when(mockGlueSchemaExtractor.toColumns( + TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema())) + .thenReturn(Collections.emptyList()); + + TableInput expected = + getCreateOrUpdateTableInput( + TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), + icebergGlueCatalogTableBuilder.getTableParameters(mockIcebergBaseTable), + TEST_ICEBERG_INTERNAL_TABLE); + TableInput output = + icebergGlueCatalogTableBuilder.getCreateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + assertEquals(expected, output); + verify(mockGlueSchemaExtractor, times(1)) + .toColumns(TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()); + } + + @Test + void testGetUpdateTableRequest() { + setupCommonMocks(); + mockIcebergHadoopTables(); + + Map glueTableParams = new HashMap<>(); + glueTableParams.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION); + Table glueTable = Table.builder().parameters(glueTableParams).build(); + + Map parameters = new HashMap<>(); + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, glueTableParams.get(METADATA_LOCATION_PROP)); + when(mockIcebergTableMetadata.metadataFileLocation()) + .thenReturn(ICEBERG_METADATA_FILE_LOCATION_v2); + parameters.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION_v2); + + when(mockGlueSchemaExtractor.toColumns( + TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema(), glueTable)) + .thenReturn(Collections.emptyList()); + + TableInput expected = + getCreateOrUpdateTableInput( + TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), parameters, TEST_ICEBERG_INTERNAL_TABLE); + TableInput output = + icebergGlueCatalogTableBuilder.getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); + assertEquals(expected, output); + verify(mockGlueSchemaExtractor, times(1)) + .toColumns(TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema(), glueTable); + } + + @Test + void testGetTableParameters() { + icebergGlueCatalogTableBuilder = createIcebergGlueCatalogSyncHelper(); + mockIcebergMetadataFileLocation(); + Map expected = new HashMap<>(); + expected.put(TABLE_TYPE_PROP, TableFormat.ICEBERG); + expected.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION); + Map tableParameters = + icebergGlueCatalogTableBuilder.getTableParameters(mockIcebergBaseTable); + assertEquals(expected, tableParameters); + } +} diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml index 25d559730..6fabfde74 100644 --- a/xtable-utilities/pom.xml +++ b/xtable-utilities/pom.xml @@ -107,8 +107,8 @@ hadoop-aws - com.amazonaws - aws-java-sdk-bundle + software.amazon.awssdk + bundle diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index 2f60f33f6..7317d2653 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.function.Function; import java.util.stream.Collectors; @@ -58,6 +59,7 @@ import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.conversion.TargetCatalogConfig; import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.hudi.HudiSourceConfig; import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; @@ -139,15 +141,17 @@ public static void main(String[] args) throws Exception { datasetConfig.getTargetCatalogs().stream() .map(RunCatalogSync::populateCatalogImplementations) .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity())); - CatalogConversionSource catalogConversionSource = - CatalogConversionFactory.createCatalogConversionSource( - datasetConfig.getSourceCatalog(), hadoopConf); ConversionController conversionController = new ConversionController(hadoopConf); for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) { SourceTable sourceTable = null; if (dataset.getSourceCatalogTableIdentifier().getStorageIdentifier() != null) { StorageIdentifier storageIdentifier = dataset.getSourceCatalogTableIdentifier().getStorageIdentifier(); + Properties sourceProperties = new Properties(); + if (storageIdentifier.getPartitionSpec() != null) { + sourceProperties.put( + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, storageIdentifier.getPartitionSpec()); + } sourceTable = SourceTable.builder() .name(storageIdentifier.getTableName()) @@ -158,8 +162,12 @@ public static void main(String[] args) throws Exception { : storageIdentifier.getNamespace().split("\\.")) .dataPath(storageIdentifier.getTableDataPath()) .formatName(storageIdentifier.getTableFormat()) + .additionalProperties(sourceProperties) .build(); } else { + CatalogConversionSource catalogConversionSource = + CatalogConversionFactory.createCatalogConversionSource( + datasetConfig.getSourceCatalog(), hadoopConf); sourceTable = catalogConversionSource.getSourceTable( getCatalogTableIdentifier( @@ -175,6 +183,7 @@ public static void main(String[] args) throws Exception { .basePath(sourceTable.getBasePath()) .namespace(sourceTable.getNamespace()) .formatName(targetCatalogTableIdentifier.getTableFormat()) + .additionalProperties(sourceTable.getAdditionalProperties()) .build(); targetTables.add(targetTable); if (!targetCatalogs.containsKey(targetTable)) {