[ad_1]
I am trying to use Spark to connect to a Kafka topic, that contains data (I am doing the tests using Postman) since I can load the data to an data frame. All the services are in a docker file:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- broker-kafka
kafka:
container_name: kafka
image: wurstmeister/kafka:2.12-2.3.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_HOSTNAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
depends_on:
- zookeeper
networks:
- broker-kafka
volumes:
- /var/run/docker.sock:/var/run/docker.sock
spark:
image: jupyter/all-spark-notebook:spark-3.1.1
ports:
- "8888:8888"
- "4040-4080:4040-4080"
links:
- kafka:kafka-server #allows spark notebook to discover kafka service by name "kafka-server"
volumes:
- ../notebooks:/home/jovyan/work/notebooks/
networks:
- broker-kafka
networks:
broker-kafka:
driver: bridge
I already change the KAFKA_ADVERTISED_LISTENERS and the LISTENERS multiple times (from localhost to my IP address, etc.). But I cannot load the data into my dataframe. My PySPARK script is very simple:
from pyspark.sql import SparkSession
# Spark session & context
spark = (SparkSession
.builder
.master('local')
.appName('My_Spark')
# Add kafka package
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
.getOrCreate())
sc = spark.sparkContext
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "0.0.0.0:9092") # kafka server
.option("subscribe", "topic1") # topic
.option("startingOffsets", "earliest") # start from beginning
.load())
df_res = df.writeStream.format("console").start().awaitTermination()
And when I run my code, on docker logs I am getting:
22/05/30 15:44:01 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-78861447-a9fe-47f3-bf08-0d17261f8184--1771929930-driver-0-1, groupId=spark-kafka-source-78861447-a9fe-47f3-bf08-0d17261f8184--1771929930-driver-0] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available.
Does anyone know what I am doing wrong?
[ad_2]