Go Back

write program for Kafka producer to send data to Kafka topics from a Spark application

8/28/2023
All Articles

#write program for Kafka producer send data to Kafka topics from a Spark application

write program for Kafka producer to send data to Kafka topics from a Spark application

Write program for Kafka producer to send data to Kafka topics from a Spark application

 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._
 
object KafkaDStreamDemoWithProducer  {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("KafkaDStreamExample").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    val kafkaParams = Map(
      "bootstrap.servers" -> "localhost:9092", // Kafka broker addresses
      "group.id" -> "test-group",              // Consumer group
      "auto.offset.reset" -> "latest"
    )
 
    val topics = Set("test-topic")             // Kafka topic to consume from
 
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
 
    // Print the content of each partition
    kafkaStream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        partitionOfRecords.foreach { record =>
          println(s"Partition: ${record.partition()}, Key: ${record.key()}, Value: ${record.value()}")
        }
      }
    }
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
If you want to use  WriteStream behalf of Dstream please use below code 
 
spark
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()
 
 
These basically mean that we don't want to auto-commit for the offset and would like to pick the latest offset every time a consumer group is initialized. 
Consequently, our application will only be able to consume messages posted during the period it is running.
 
checkpoints in Spark Streaming to maintain state between batches.
we are only able to store the current frequency of the words. What if we want to store the cumulative frequency instead? 
Spark Streaming makes it possible through a concept called checkpoints.
 
As this is a stream processing application, we would want to keep this running:
 
streamingContext.start();
streamingContext.awaitTermination();

 

Article