Sign Up

Sign Up to our social questions and Answers Engine to ask questions, answer people’s questions, and connect with other people.

Have an account? Sign In

Have an account? Sign In Now

Sign In

Login to our social questions & Answers Engine to ask questions answer people’s questions & connect with other people.

Sign Up Here

Forgot Password?

Don't have account, Sign Up Here

Forgot Password

Lost your password? Please enter your email address. You will receive a link and will create a new password via email.

Have an account? Sign In Now

You must login to ask question.

Forgot Password?

Need An Account, Sign Up Here

Please briefly explain why you feel this question should be reported.

Please briefly explain why you feel this answer should be reported.

Please briefly explain why you feel this user should be reported.

Sign InSign Up

StackOverflow Point

StackOverflow Point Navigation

  • Web Stories
  • Badges
  • Tags
Search
Ask A Question

Mobile menu

Close
Ask a Question
  • Web Stories
  • Badges
  • Tags
Home/ Questions/Q 1437
Alex Hales
  • 0
Alex HalesTeacher
Asked: May 30, 20222022-05-30T15:53:05+00:00 2022-05-30T15:53:05+00:00

Connect Spark to Kafka through Docker

  • 0

[ad_1]

I am trying to use Spark to connect to a Kafka topic, that contains data (I am doing the tests using Postman) since I can load the data to an data frame. All the services are in a docker file:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      - broker-kafka
  kafka:
    container_name: kafka
    image: wurstmeister/kafka:2.12-2.3.0
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_HOSTNAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    networks:
      - broker-kafka
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  spark:
    image: jupyter/all-spark-notebook:spark-3.1.1
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    links:
      - kafka:kafka-server #allows spark notebook to discover kafka service by name "kafka-server"
    volumes:
      - ../notebooks:/home/jovyan/work/notebooks/
    networks:
      - broker-kafka
networks:
  broker-kafka:
    driver: bridge

I already change the KAFKA_ADVERTISED_LISTENERS and the LISTENERS multiple times (from localhost to my IP address, etc.). But I cannot load the data into my dataframe. My PySPARK script is very simple:

from pyspark.sql import SparkSession
# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('My_Spark')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
         .getOrCreate())
sc = spark.sparkContext

# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "0.0.0.0:9092") # kafka server
  .option("subscribe", "topic1") # topic
  .option("startingOffsets", "earliest") # start from beginning 
  .load())
df_res = df.writeStream.format("console").start().awaitTermination()

And when I run my code, on docker logs I am getting:

22/05/30 15:44:01 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-78861447-a9fe-47f3-bf08-0d17261f8184--1771929930-driver-0-1, groupId=spark-kafka-source-78861447-a9fe-47f3-bf08-0d17261f8184--1771929930-driver-0] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available.

Does anyone know what I am doing wrong?

[ad_2]

  • 0 0 Answers
  • 0 Views
  • 0 Followers
  • 0
Share
  • Facebook
  • Report
Leave an answer

Leave an answer
Cancel reply

Browse

Sidebar

Ask A Question

Related Questions

  • xcode - Can you build dynamic libraries for iOS and ...

    • 0 Answers
  • bash - How to check if a process id (PID) ...

    • 5252 Answers
  • database - Oracle: Changing VARCHAR2 column to CLOB

    • 1098 Answers
  • What's the difference between HEAD, working tree and index, in ...

    • 1047 Answers
  • Amazon EC2 Free tier - how many instances can I ...

    • 0 Answers

Stats

  • Questions : 43k

Subscribe

Login

Forgot Password?

Footer

Follow

© 2022 Stackoverflow Point. All Rights Reserved.

Insert/edit link

Enter the destination URL

Or link to existing content

    No search term specified. Showing recent items. Search or use up and down arrow keys to select an item.