Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion #6767

Merged
merged 16 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
import static org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ARRAY_GENERIC_TYPE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE_SIMPLE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;

/**
Expand Down Expand Up @@ -87,6 +90,14 @@ public static SeaTunnelRuntimeException writeSeaTunnelRowFailed(
return new SeaTunnelRuntimeException(WRITE_SEATUNNEL_ROW_ERROR, params, cause);
}

public static SeaTunnelRuntimeException unsupportedDataType(
String identifier, String dataType) {
Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("dataType", dataType);
return new SeaTunnelRuntimeException(UNSUPPORTED_DATA_TYPE_SIMPLE, params);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reuse public static SeaTunnelRuntimeException unsupportedDataType( String identifier, String dataType, String field) . Because we should make sure user know which field can not supported.


public static SeaTunnelRuntimeException unsupportedDataType(
String identifier, String dataType, String field) {
Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -199,4 +210,19 @@ public static SeaTunnelRuntimeException sqlTemplateHandledError(
params.put("optionName", optionName);
return new SeaTunnelRuntimeException(SQL_TEMPLATE_HANDLED_ERROR, params);
}

public static SeaTunnelRuntimeException unsupportedArrayGenericType(
String identifier, String dataType) {
Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("dataType", dataType);
return new SeaTunnelRuntimeException(UNSUPPORTED_ARRAY_GENERIC_TYPE, params);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

done.


public static SeaTunnelRuntimeException unsupportedRowKind(String identifier, String rowKind) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need tableId when invoke unsupportedRowKind method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need tableId when invoke unsupportedRowKind method.

OK, I will optimize the code based on your suggestions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need tableId when invoke unsupportedRowKind method.

OK, I will optimize the code based on your suggestions

done.

Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("rowKind", rowKind);
return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."),
SQL_TEMPLATE_HANDLED_ERROR(
"COMMON-24",
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template");
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),
UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-25", "'<identifier>' unsupported data type '<dataType>'"),
UNSUPPORTED_ARRAY_GENERIC_TYPE(
"COMMON-26", "'<identifier>' array type not support genericType '<genericType>'"),
UNSUPPORTED_ROW_KIND("COMMON-27", "'<identifier>' unsupported rowKind type '<rowKind>'");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;

Expand All @@ -35,6 +36,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -46,9 +48,9 @@
public class PaimonCatalog implements Catalog, PaimonTable {
private static final String DEFAULT_DATABASE = "default";

private String catalogName;
private PaimonSinkConfig paimonSinkConfig;
private PaimonCatalogLoader paimonCatalogLoader;
private final String catalogName;
private final PaimonSinkConfig paimonSinkConfig;
private final PaimonCatalogLoader paimonCatalogLoader;
private org.apache.paimon.catalog.Catalog catalog;

public PaimonCatalog(String catalogName, PaimonSinkConfig paimonSinkConfig) {
Expand Down Expand Up @@ -181,7 +183,12 @@ private CatalogTable toCatalogTable(
TableSchema.Builder builder = TableSchema.builder();
dataFields.forEach(
dataField -> {
Column column = SchemaUtil.toSeaTunnelType(dataField.type());
BasicTypeDefine.BasicTypeDefineBuilder<DataType> typeDefineBuilder =
BasicTypeDefine.<DataType>builder()
.name(dataField.name())
.comment(dataField.description())
.nativeType(dataField.type());
Column column = SchemaUtil.toSeaTunnelType(typeDefineBuilder.build());
builder.column(column);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
@Getter
public class PaimonConfig implements Serializable {

public static final String CONNECTOR_IDENTITY = "Paimon";

public static final Option<String> WAREHOUSE =
Options.key("warehouse")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.data;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;

import org.apache.paimon.types.DataType;
Expand All @@ -29,21 +30,21 @@

@Slf4j
@AutoService(TypeConverter.class)
public class PaimonTypeMapper implements TypeConverter<DataType> {
public class PaimonTypeMapper implements TypeConverter<BasicTypeDefine<DataType>> {
public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper();

@Override
public String identifier() {
return PaimonSink.PLUGIN_NAME;
return PaimonConfig.CONNECTOR_IDENTITY;
}

@Override
public Column convert(DataType dataType) {
return RowTypeConverter.convert(dataType);
public Column convert(BasicTypeDefine<DataType> typeDefine) {
return RowTypeConverter.convert(typeDefine);
}

@Override
public DataType reconvert(Column column) {
public BasicTypeDefine<DataType> reconvert(Column column) {
return RowTypeConverter.reconvert(column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;

import org.apache.paimon.data.BinaryArray;
Expand Down Expand Up @@ -119,10 +121,8 @@ public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType)
}
return doubles;
default:
String errorMsg =
String.format("Array type not support this genericType [%s]", dataType);
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString());
}
}

Expand Down Expand Up @@ -220,10 +220,8 @@ public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> dataType)
}
break;
default:
String errorMsg =
String.format("Array type not support this genericType [%s]", dataType);
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString());
}
binaryArrayWriter.complete();
return binaryArray;
Expand Down Expand Up @@ -320,7 +318,7 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
break;
default:
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw common error by use this method.

public static SeaTunnelRuntimeException unsupportedDataType(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw common error by use this method.

public static SeaTunnelRuntimeException unsupportedDataType(

done.

"SeaTunnel does not support this type");
}
}
Expand Down Expand Up @@ -438,9 +436,9 @@ public static InternalRow reconvert(
binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType));
break;
default:
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Unsupported data type " + seaTunnelRowType.getFieldType(i));
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelRowType.getFieldType(i).getSqlType().toString());
}
}
return binaryRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.utils;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import org.apache.paimon.data.InternalRow;

Expand All @@ -43,9 +43,8 @@ public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowK
case INSERT:
return org.apache.paimon.types.RowKind.INSERT;
default:
throw new PaimonConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported rowKind type " + seaTunnelRowInd.shortString());
throw CommonError.unsupportedRowKind(
PaimonConfig.CONNECTOR_IDENTITY, seaTunnelRowInd.shortString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.ArrayType;
Expand Down Expand Up @@ -80,11 +81,20 @@ public static SeaTunnelRowType convert(RowType rowType) {
/**
* Convert Paimon row type {@link DataType} to SeaTunnel row type {@link SeaTunnelDataType}
*
* @param dataType Paimon data type
* @param typeDefine Paimon data type
* @return SeaTunnel data type {@link SeaTunnelDataType}
*/
public static Column convert(DataType dataType) {
PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder = PhysicalColumn.builder();
public static Column convert(BasicTypeDefine<DataType> typeDefine) {

PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder =
PhysicalColumn.builder()
.name(typeDefine.getName())
.sourceType(typeDefine.getColumnType())
.nullable(typeDefine.isNullable())
.defaultValue(typeDefine.getDefaultValue())
.comment(typeDefine.getComment());

DataType dataType = typeDefine.getNativeType();
SeaTunnelDataType<?> seaTunnelDataType;
PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
PaimonToSeaTunnelTypeVisitor.INSTANCE;
Expand Down Expand Up @@ -164,12 +174,8 @@ public static Column convert(DataType dataType) {
seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((RowType) dataType);
break;
default:
String errorMsg =
String.format(
"Paimon dataType not support this genericType [%s]",
dataType.asSQLString());
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.asSQLString());
}
return physicalColumnBuilder.dataType(seaTunnelDataType).build();
}
Expand Down Expand Up @@ -209,7 +215,7 @@ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema t
* @param column SeaTunnel data type {@link Column}
* @return Paimon data type {@link DataType}
*/
public static DataType reconvert(Column column) {
public static BasicTypeDefine<DataType> reconvert(Column column) {
return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(column);
}

Expand All @@ -234,18 +240,53 @@ private static class SeaTunnelTypeToPaimonVisitor {

private SeaTunnelTypeToPaimonVisitor() {}

public DataType visit(Column column) {
public BasicTypeDefine<DataType> visit(Column column) {
BasicTypeDefine.BasicTypeDefineBuilder<DataType> builder =
BasicTypeDefine.<DataType>builder()
.name(column.getName())
.nullable(column.isNullable())
.comment(column.getComment())
.defaultValue(column.getDefaultValue());
SeaTunnelDataType<?> dataType = column.getDataType();
Integer scale = column.getScale();
switch (dataType.getSqlType()) {
case TIMESTAMP:
return DataTypes.TIMESTAMP(
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale);
int timestampScale =
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale;
TimestampType timestampType = DataTypes.TIMESTAMP(timestampScale);
builder.nativeType(timestampType);
builder.dataType(timestampType.getTypeRoot().name());
builder.columnType(timestampType.toString());
builder.scale(timestampScale);
builder.length(column.getColumnLength());
return builder.build();
case TIME:
return DataTypes.TIME(
Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale);
int timeScale = Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale;
TimeType timeType = DataTypes.TIME(timeScale);
builder.nativeType(timeType);
builder.columnType(timeType.toString());
builder.dataType(timeType.getTypeRoot().name());
builder.scale(timeScale);
builder.length(column.getColumnLength());
return builder.build();
case DECIMAL:
int precision =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, the precision and scale obtained here may be 0, and you must handle this situation to achieve better compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, the precision and scale obtained here may be 0, and you must handle this situation to achieve better compatibility.

done.

((org.apache.seatunnel.api.table.type.DecimalType) dataType)
.getPrecision();
DecimalType decimalType = DataTypes.DECIMAL(precision, scale);
builder.nativeType(decimalType);
builder.columnType(decimalType.toString());
builder.dataType(decimalType.getTypeRoot().name());
builder.scale(scale);
builder.precision((long) precision);
builder.length(column.getColumnLength());
return builder.build();
default:
return visit(dataType);
builder.nativeType(visit(dataType));
builder.columnType(dataType.toString());
builder.length(column.getColumnLength());
builder.dataType(dataType.getSqlType().name());
return builder.build();
}
}

Expand Down Expand Up @@ -301,9 +342,8 @@ public DataType visit(SeaTunnelDataType<?> dataType) {
Arrays.stream(fieldTypes).map(this::visit).toArray(DataType[]::new);
return DataTypes.ROW(dataTypes);
default:
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + dataType.getSqlType());
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString());
}
}
}
Expand Down Expand Up @@ -417,12 +457,9 @@ public SeaTunnelDataType<?> visit(ArrayType arrayType) {
case DOUBLE:
return org.apache.seatunnel.api.table.type.ArrayType.DOUBLE_ARRAY_TYPE;
default:
String errorMsg =
String.format(
"Array type not support this genericType [%s]",
seaTunnelArrayType);
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelArrayType.getSqlType().toString());
}
}

Expand All @@ -445,9 +482,8 @@ public SeaTunnelDataType<?> visit(RowType rowType) {

@Override
protected SeaTunnelDataType defaultMethod(DataType dataType) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X This defaultMethod method inherited from paimon seems to be difficult to obtain the fieldname.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put fieldname as UNKNOWN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put fieldname as UNKNOWN.

done.

throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + dataType);
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getTypeRoot().name());
}
}
}