-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion #6767
[Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion #6767
Changes from 5 commits
6c102ae
4a5431f
fb43528
2d1cb44
0e91978
8036acb
548e68a
6e3f204
bdb4b77
ea955f3
760fb29
3915e36
a7c0649
e57182c
b068137
f162b5e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,11 +19,13 @@ | |
|
||
import org.apache.seatunnel.api.table.catalog.Column; | ||
import org.apache.seatunnel.api.table.catalog.PhysicalColumn; | ||
import org.apache.seatunnel.api.table.converter.BasicTypeDefine; | ||
import org.apache.seatunnel.api.table.type.BasicType; | ||
import org.apache.seatunnel.api.table.type.LocalTimeType; | ||
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelDataType; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelRowType; | ||
import org.apache.seatunnel.common.exception.CommonErrorCode; | ||
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; | ||
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; | ||
|
||
|
@@ -80,11 +82,20 @@ public static SeaTunnelRowType convert(RowType rowType) { | |
/** | ||
* Convert Paimon row type {@link DataType} to SeaTunnel row type {@link SeaTunnelDataType} | ||
* | ||
* @param dataType Paimon data type | ||
* @param typeDefine Paimon data type | ||
* @return SeaTunnel data type {@link SeaTunnelDataType} | ||
*/ | ||
public static Column convert(DataType dataType) { | ||
PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder = PhysicalColumn.builder(); | ||
public static Column convert(BasicTypeDefine<DataType> typeDefine) { | ||
|
||
PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder = | ||
PhysicalColumn.builder() | ||
.name(typeDefine.getName()) | ||
.sourceType(typeDefine.getColumnType()) | ||
.nullable(typeDefine.isNullable()) | ||
.defaultValue(typeDefine.getDefaultValue()) | ||
.comment(typeDefine.getComment()); | ||
|
||
DataType dataType = typeDefine.getNativeType(); | ||
SeaTunnelDataType<?> seaTunnelDataType; | ||
PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor = | ||
PaimonToSeaTunnelTypeVisitor.INSTANCE; | ||
|
@@ -168,8 +179,7 @@ public static Column convert(DataType dataType) { | |
String.format( | ||
"Paimon dataType not support this genericType [%s]", | ||
dataType.asSQLString()); | ||
throw new PaimonConnectorException( | ||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg); | ||
throw new PaimonConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg); | ||
} | ||
return physicalColumnBuilder.dataType(seaTunnelDataType).build(); | ||
} | ||
|
@@ -209,7 +219,7 @@ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema t | |
* @param column SeaTunnel data type {@link Column} | ||
* @return Paimon data type {@link DataType} | ||
*/ | ||
public static DataType reconvert(Column column) { | ||
public static BasicTypeDefine<DataType> reconvert(Column column) { | ||
return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(column); | ||
} | ||
|
||
|
@@ -234,18 +244,53 @@ private static class SeaTunnelTypeToPaimonVisitor { | |
|
||
private SeaTunnelTypeToPaimonVisitor() {} | ||
|
||
public DataType visit(Column column) { | ||
public BasicTypeDefine<DataType> visit(Column column) { | ||
BasicTypeDefine.BasicTypeDefineBuilder<DataType> builder = | ||
BasicTypeDefine.<DataType>builder() | ||
.name(column.getName()) | ||
.nullable(column.isNullable()) | ||
.comment(column.getComment()) | ||
.defaultValue(column.getDefaultValue()); | ||
SeaTunnelDataType<?> dataType = column.getDataType(); | ||
Integer scale = column.getScale(); | ||
switch (dataType.getSqlType()) { | ||
case TIMESTAMP: | ||
return DataTypes.TIMESTAMP( | ||
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale); | ||
int timestampScale = | ||
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale; | ||
TimestampType timestampType = DataTypes.TIMESTAMP(timestampScale); | ||
builder.nativeType(timestampType); | ||
builder.dataType(timestampType.getTypeRoot().name()); | ||
builder.columnType(timestampType.toString()); | ||
builder.scale(timestampScale); | ||
builder.length(column.getColumnLength()); | ||
return builder.build(); | ||
case TIME: | ||
return DataTypes.TIME( | ||
Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale); | ||
int timeScale = Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale; | ||
TimeType timeType = DataTypes.TIME(timeScale); | ||
builder.nativeType(timeType); | ||
builder.columnType(timeType.toString()); | ||
builder.dataType(timeType.getTypeRoot().name()); | ||
builder.scale(timeScale); | ||
builder.length(column.getColumnLength()); | ||
return builder.build(); | ||
case DECIMAL: | ||
int precision = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In some cases, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done. |
||
((org.apache.seatunnel.api.table.type.DecimalType) dataType) | ||
.getPrecision(); | ||
DecimalType decimalType = DataTypes.DECIMAL(precision, scale); | ||
builder.nativeType(decimalType); | ||
builder.columnType(decimalType.toString()); | ||
builder.dataType(decimalType.getTypeRoot().name()); | ||
builder.scale(scale); | ||
builder.precision((long) precision); | ||
builder.length(column.getColumnLength()); | ||
return builder.build(); | ||
default: | ||
return visit(dataType); | ||
builder.nativeType(visit(dataType)); | ||
builder.columnType(dataType.toString()); | ||
builder.length(column.getColumnLength()); | ||
builder.dataType(dataType.getSqlType().name()); | ||
return builder.build(); | ||
} | ||
} | ||
|
||
|
@@ -302,7 +347,7 @@ public DataType visit(SeaTunnelDataType<?> dataType) { | |
return DataTypes.ROW(dataTypes); | ||
default: | ||
throw new PaimonConnectorException( | ||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, | ||
CommonErrorCode.UNSUPPORTED_DATA_TYPE, | ||
"Unsupported data type: " + dataType.getSqlType()); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please throw common error by use this method.
seatunnel/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
Line 90 in a5609d6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.