Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect with Kafka not able to form #4

Open
choonhongyeoh0241 opened this issue Feb 4, 2024 · 4 comments
Open

Connect with Kafka not able to form #4

choonhongyeoh0241 opened this issue Feb 4, 2024 · 4 comments

Comments

@choonhongyeoh0241
Copy link

Whenever we try to connect to kafka, we get this error:
WARNING:root:kafka dataframe could not be created because: An error occurred while calling o36.load.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 20 more

Traceback (most recent call last):
File "C:\Users\User\Desktop\Data Engineering\Data Engineering\spark_stream.py", line 143, in
selection_df = create_selection_df_from_kafka(spark_df)
File "C:\Users\User\Desktop\Data Engineering\Data Engineering\spark_stream.py", line 129, in create_selection_df_from_kafka
sel = spark_df.selectExpr("CAST(value AS STRING)")
AttributeError: 'NoneType' object has no attribute 'selectExpr'

@VuKhoiGVM
Copy link

hello, i had fixed that error successfully after 2 days. Here is my step:

  1. Check kafka version - it is base on confluentinc/cp-server image of broker, the version of Kafka used is typically bundled with the Confluent Platform version.
  2. Find the right version of jars package in the config of Spark Session at Maven, add it to: venv/lib/python3.11/site-packages/pyspark/jars
  3. Change config of jar file you just added in:

s_conn = SparkSession.builder
.appName('SparkDataStreaming')
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.12:3.4.0,"
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0")
.config('spark.cassandra.connection.host', 'localhost')
.getOrCreate()
4. Run spark-submit again and keyspace and table will be create sucessfully.

Hope it effective!

@teddythinh
Copy link

teddythinh commented Mar 6, 2024

Hi there, the author has explained the missing packages in this part: https://youtu.be/GqAcTrqKcrY?si=QzgPjlC-RHMULUS0&t=4599

I believe it's still missing the kafka-clients package. You can add the jar file with two missing packages from @VuKhoiGVM's comment.

Ref: https://stackoverflow.com/a/71059689/17316050

Also please double-check the Spark, Cassandra, and Scala versions to download the compatible version or else it won't work.

@ElNino9495
Copy link

I get the error even after adding the jar files

@omursnck
Copy link

omursnck commented Jun 1, 2024

I get the error even after adding the jar files

How did you manage to solve that issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants