You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello everyone, I am using MageAI to create a streaming pipeline from RabbitMQ to ClickHouse. I am encountering an issue where after transforming the message from RabbitMQ into a dataframe, the ClickHouse data exporter does not receive it.
[test_rabbitmq_clickhouse] [RabbitMQSource] Starting to initialize consumer for queue data.test
[test_rabbitmq_clickhouse] [RabbitMQSource] Trying to connect on amqp://
[test_rabbitmq_clickhouse] [RabbitMQSource] Connected on broker
[test_rabbitmq_clickhouse] ClickHouse initialized
[test_rabbitmq_clickhouse] └─ Connecting to ClickHouse...
[test_rabbitmq_clickhouse] DONE
[test_rabbitmq_clickhouse] [RabbitMQSource] Start consuming messages.
[test_rabbitmq_clickhouse] [RabbitMQSource] Received message Payload(method=<Basic.Deliver(['consumer_tag=ctag1.863b1356997945dcb1972c4dbfdfd974', 'delivery_tag=1', 'exchange=data.test', 'redelivered=False', 'routing_key=data.test'])>, properties=<BasicProperties(['delivery_mode=2', 'headers={}'])>, body=b'{"id":1,"name":"ABC","age":20}')
[transform_rabbitmq_clickhouse] id name age
[transform_rabbitmq_clickhouse] 0 1 ABC 20
[test_rabbitmq_clickhouse] [GenericIOSink] Batch ingest 1 records, time=1715542665.6382844. Sample: id name age
[test_rabbitmq_clickhouse] 0 1 ABC 20
[test_rabbitmq_clickhouse]
[test_rabbitmq_clickhouse] ├─
[test_rabbitmq_clickhouse] └─ Exporting data to table 'kplay_data.data_test'...
Pipeline test_rabbitmq_clickhouse execution failed with error:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/mage_ai/server/websocket_server.py", line 116, in run_pipeline
pipeline.execute_sync(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/pipeline.py", line 693, in execute_sync
StreamingPipelineExecutor(self).execute(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 136, in execute
raise e
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 121, in execute
__execute_with_retry()
File "/usr/local/lib/python3.10/site-packages/mage_ai/shared/retry.py", line 54, in retry_func
raise e
File "/usr/local/lib/python3.10/site-packages/mage_ai/shared/retry.py", line 38, in retry_func
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 116, in __execute_with_retry
self.__execute_in_python(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 257, in __execute_in_python
source.batch_read(handler=handle_batch_events)
File "/usr/local/lib/python3.10/site-packages/mage_ai/streaming/sources/rabbitmq.py", line 107, in batch_read
handler(full_message, **{'channel': self.channel})
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 228, in handle_batch_events
handle_batch_events_recursively(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 216, in handle_batch_events_recursively
handle_batch_events_recursively(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 213, in handle_batch_events_recursively
sinks_by_uuid[downstream_block.uuid].batch_write(
File "/usr/local/lib/python3.10/site-packages/mage_ai/streaming/sinks/generic_io.py", line 73, in batch_write self.io_client.export(
File "/usr/local/lib/python3.10/site-packages/mage_ai/io/clickhouse.py", line 290, in export
__process()
File "/usr/local/lib/python3.10/site-packages/mage_ai/io/clickhouse.py", line 285, in __process
self.client.insert_df(f'{database}.{table_name}', df)
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 585, in insert_df
return self.insert(table,
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 543, in insert
context = self.create_insert_context(table,
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 653, in create_insert_context
raise ProgrammingError(f'Unrecognized column {ex} in table {table}') from None
clickhouse_connect.driver.exceptions.ProgrammingError: Unrecognized column 0 in table kplay_data.data_test
The text was updated successfully, but these errors were encountered:
Mage version
0.9.70
Describe the bug
Hello everyone, I am using MageAI to create a streaming pipeline from RabbitMQ to ClickHouse. I am encountering an issue where after transforming the message from RabbitMQ into a dataframe, the ClickHouse data exporter does not receive it.
[test_rabbitmq_clickhouse] [RabbitMQSource] Starting to initialize consumer for queue data.test
[test_rabbitmq_clickhouse] [RabbitMQSource] Trying to connect on amqp://
[test_rabbitmq_clickhouse] [RabbitMQSource] Connected on broker
[test_rabbitmq_clickhouse] ClickHouse initialized
[test_rabbitmq_clickhouse] └─ Connecting to ClickHouse...
[test_rabbitmq_clickhouse] DONE
[test_rabbitmq_clickhouse] [RabbitMQSource] Start consuming messages.
[test_rabbitmq_clickhouse] [RabbitMQSource] Received message Payload(method=<Basic.Deliver(['consumer_tag=ctag1.863b1356997945dcb1972c4dbfdfd974', 'delivery_tag=1', 'exchange=data.test', 'redelivered=False', 'routing_key=data.test'])>, properties=<BasicProperties(['delivery_mode=2', 'headers={}'])>, body=b'{"id":1,"name":"ABC","age":20}')
[transform_rabbitmq_clickhouse] id name age
[transform_rabbitmq_clickhouse] 0 1 ABC 20
[test_rabbitmq_clickhouse] [GenericIOSink] Batch ingest 1 records, time=1715542665.6382844. Sample: id name age
[test_rabbitmq_clickhouse] 0 1 ABC 20
[test_rabbitmq_clickhouse]
[test_rabbitmq_clickhouse] ├─
[test_rabbitmq_clickhouse] └─ Exporting data to table 'kplay_data.data_test'...
Pipeline test_rabbitmq_clickhouse execution failed with error:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/mage_ai/server/websocket_server.py", line 116, in run_pipeline
pipeline.execute_sync(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/pipeline.py", line 693, in execute_sync
StreamingPipelineExecutor(self).execute(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 136, in execute
raise e
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 121, in execute
__execute_with_retry()
File "/usr/local/lib/python3.10/site-packages/mage_ai/shared/retry.py", line 54, in retry_func
raise e
File "/usr/local/lib/python3.10/site-packages/mage_ai/shared/retry.py", line 38, in retry_func
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 116, in __execute_with_retry
self.__execute_in_python(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 257, in __execute_in_python
source.batch_read(handler=handle_batch_events)
File "/usr/local/lib/python3.10/site-packages/mage_ai/streaming/sources/rabbitmq.py", line 107, in batch_read
handler(full_message, **{'channel': self.channel})
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 228, in handle_batch_events
handle_batch_events_recursively(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 216, in handle_batch_events_recursively
handle_batch_events_recursively(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 213, in handle_batch_events_recursively
sinks_by_uuid[downstream_block.uuid].batch_write(
File "/usr/local/lib/python3.10/site-packages/mage_ai/streaming/sinks/generic_io.py", line 73, in batch_write
self.io_client.export(
File "/usr/local/lib/python3.10/site-packages/mage_ai/io/clickhouse.py", line 290, in export
__process()
File "/usr/local/lib/python3.10/site-packages/mage_ai/io/clickhouse.py", line 285, in __process
self.client.insert_df(f'{database}.{table_name}', df)
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 585, in insert_df
return self.insert(table,
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 543, in insert
context = self.create_insert_context(table,
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 653, in create_insert_context
raise ProgrammingError(f'Unrecognized column {ex} in table {table}') from None
clickhouse_connect.driver.exceptions.ProgrammingError: Unrecognized column 0 in table kplay_data.data_test
The text was updated successfully, but these errors were encountered: