Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Streaming data, clickhouse exporter not receive dataframe #5087

Open
tnbao91 opened this issue May 18, 2024 · 0 comments
Open

[BUG] Streaming data, clickhouse exporter not receive dataframe #5087

tnbao91 opened this issue May 18, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@tnbao91
Copy link

tnbao91 commented May 18, 2024

Mage version

0.9.70

Describe the bug

Hello everyone, I am using MageAI to create a streaming pipeline from RabbitMQ to ClickHouse. I am encountering an issue where after transforming the message from RabbitMQ into a dataframe, the ClickHouse data exporter does not receive it.

image image

[test_rabbitmq_clickhouse] [RabbitMQSource] Starting to initialize consumer for queue data.test
[test_rabbitmq_clickhouse] [RabbitMQSource] Trying to connect on amqp://
[test_rabbitmq_clickhouse] [RabbitMQSource] Connected on broker
[test_rabbitmq_clickhouse] ClickHouse initialized
[test_rabbitmq_clickhouse] └─ Connecting to ClickHouse...
[test_rabbitmq_clickhouse] DONE
[test_rabbitmq_clickhouse] [RabbitMQSource] Start consuming messages.
[test_rabbitmq_clickhouse] [RabbitMQSource] Received message Payload(method=<Basic.Deliver(['consumer_tag=ctag1.863b1356997945dcb1972c4dbfdfd974', 'delivery_tag=1', 'exchange=data.test', 'redelivered=False', 'routing_key=data.test'])>, properties=<BasicProperties(['delivery_mode=2', 'headers={}'])>, body=b'{"id":1,"name":"ABC","age":20}')
[transform_rabbitmq_clickhouse] id name age
[transform_rabbitmq_clickhouse] 0 1 ABC 20
[test_rabbitmq_clickhouse] [GenericIOSink] Batch ingest 1 records, time=1715542665.6382844. Sample: id name age
[test_rabbitmq_clickhouse] 0 1 ABC 20
[test_rabbitmq_clickhouse]
[test_rabbitmq_clickhouse] ├─
[test_rabbitmq_clickhouse] └─ Exporting data to table 'kplay_data.data_test'...
Pipeline test_rabbitmq_clickhouse execution failed with error:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/mage_ai/server/websocket_server.py", line 116, in run_pipeline
pipeline.execute_sync(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/pipeline.py", line 693, in execute_sync
StreamingPipelineExecutor(self).execute(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 136, in execute
raise e
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 121, in execute
__execute_with_retry()
File "/usr/local/lib/python3.10/site-packages/mage_ai/shared/retry.py", line 54, in retry_func
raise e
File "/usr/local/lib/python3.10/site-packages/mage_ai/shared/retry.py", line 38, in retry_func
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 116, in __execute_with_retry
self.__execute_in_python(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 257, in __execute_in_python
source.batch_read(handler=handle_batch_events)
File "/usr/local/lib/python3.10/site-packages/mage_ai/streaming/sources/rabbitmq.py", line 107, in batch_read
handler(full_message, **{'channel': self.channel})
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 228, in handle_batch_events
handle_batch_events_recursively(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 216, in handle_batch_events_recursively
handle_batch_events_recursively(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 213, in handle_batch_events_recursively
sinks_by_uuid[downstream_block.uuid].batch_write(
File "/usr/local/lib/python3.10/site-packages/mage_ai/streaming/sinks/generic_io.py", line 73, in batch_write
self.io_client.export(
File "/usr/local/lib/python3.10/site-packages/mage_ai/io/clickhouse.py", line 290, in export
__process()
File "/usr/local/lib/python3.10/site-packages/mage_ai/io/clickhouse.py", line 285, in __process
self.client.insert_df(f'{database}.{table_name}', df)
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 585, in insert_df
return self.insert(table,
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 543, in insert
context = self.create_insert_context(table,
File "/usr/local/lib/python3.10/site-packages/clickhouse_connect/driver/client.py", line 653, in create_insert_context
raise ProgrammingError(f'Unrecognized column {ex} in table {table}') from None
clickhouse_connect.driver.exceptions.ProgrammingError: Unrecognized column 0 in table kplay_data.data_test

@tnbao91 tnbao91 added the bug Something isn't working label May 18, 2024
@tnbao91 tnbao91 changed the title [BUG] [BUG] Streaming data, clickhouse exporter not receive dataframe May 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant