使用Azure Databricks Struct Stream存取Azure HDInsight Kafka
分类: Azure Databricks ◆ 标签: #Databricks #Spark ◆ 发布于: 2025-02-15 18:34:02

如果想要使用Azure Databricks Struct Stream
存取Azure HDInsight Kafka
,需要满足一些必要的条件:
Azure HDInsight
不允许通过公网存取Kafka
服务,客户端或者应用要存取这个服务,必须通过适当的途径连入Azure HDInsight
的虚拟网络里。Azure HDInsight Kafka
服务默认情况下不能通过IP地址访问,需要更改它的配置,使得该服务可以通过IP
地址进行访问。
讨论一下实现这个目的的分步步骤:
创建一个Azure资源组,所有其他的资源都使用该资源管理。
创建一个虚拟网络。
创建一个
Azure Databricks
服务。配置
Azure Databricks
和之前创建的虚拟机网络的Peer
网络关系。创建一个
Azure HDInsight
, 并在创建的时候将这个Azure HDInsight
集群链接到之前创建的虚拟网络中。通过
Ambari Portal
更改Azure HDInsight Kafka
的配置,使其可以用ip地址进行访问。创建
Kafka Topic
在
Azure Databricks
里创建一个基于Scala
的Notebook
, 通过Kafka Broker
的IP地址进行访问
这些步骤里,步骤1, 步骤2,步骤3,这些步骤都没有什么技巧可言,使用Azure Portal
, 可以一步一步的来完成,但是其他的一些步骤有一些需要注意的地方。
配置Azure Databricks
和虚拟网络之间的Peer
网络关系
这里需要注意的是看上去Azure Databricks
有一个限制:只能支持在同一个区域的虚拟网络创建Peer
的网络关系。
创建完成虚拟网络和Databricks
集群之后,选择Databricks
的资源,然后选择左侧菜单: Settings -> Virtual Network Peerings
, 然后添加新的Peer
:
选择Add Peering
之后,开始添加基本的信息:
选择你的订阅和在同一个区域创建好的虚拟机网络,这里需要注意的是1
处的Databricks Virtual Network Resource ID
, 点击这里将该Resrouce ID
拷贝下来,备用。
该界面保存之后,要注意的是Peer
并没有结束,可以观察到该Peer
的状态仍然还是处于初始化的状态,要继续完成网络Peer
还需要在之前创建的虚拟网络上使用上面拷贝出来的resource ID在虚拟网络资源处再添加一次。
在虚拟网络上配置Peer
找到创建的虚拟网络资源之后,从左侧的菜单Settings -> Peerings
, 添加一个新的Peer
:
在添加的界面中Remote virtual network
里选择选项I know my resource ID
, 值填上上一步在里备份的Databricks Virtual Network Resrouce ID
就可以完成整个的配置了:
保存之后再分别在Databricks
和虚拟网络两侧观察发现Peer
的状态变为Conntected
即表示正常配对成功。
配置Azure HDInsight
使得Kafka
支持IP地址访问
打开Ambari Portal
, 然后左侧菜单Kafka
, 然后选择配置,在配置里过滤kafka-env
, 找到配置Kafka-env Template
, 如下图:
在地下添加配置:
# Configure Kafka to advertise IP addresses instead of FQDN
IP_ADDRESS=$(hostname -i)
echo advertised.listeners=$IP_ADDRESS
sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
保存配置,然后继续搜索配置listeners
, 更改配置为:PLAINTEXT://0.0.0.0:9092
, 该值默认为PLAINTEXT://localhost:9092
, 如下图:
更改完成之后,记得先将集群设为维护模式,然后重启Kafka
服务,Ambari
也会提醒你需要重启。
经过这个更改即可以完成了Kafka
监听IP地址了。
在Databricks
上测试Struct Stream
经过以上步骤Dataricks
已经可以直接访问到Azure HDInsight
里的集群节点了,但是还需要注意的是由于Azure HDInsight
集群里节点之间的通讯是通过域名访问,并且没有通过DNS
进行解析,反而是直接使用/etc/hosts
文件进行解析。因此在Databricks
上只能通过ip
地址进行访问
可以通过ssh
登录到Azure HDInsight
集群上通过该命令拿到zookeeper
的具体机器名以及kafka borker
的机器名:
curl -u admin:"<Your Password>" -G "https://<Your Cluster Name>.azurehdinsight.cn/api/v1/clusters/hongweikafka/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
拿到zookeeper Server
列表。
curl -u admin:"<Your Password>" -G "https://<Your Cluster>.azurehdinsight.cn/api/v1/clusters/hongweikafka/services/KAFKA/components/KAFKA_BROKER" |jq -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
针对于Databricks
,我们需要Kafka broker
的服务IP地址,可以通过在HDInsight
集群上cat /etc/hosts
得到所有的ip地址。
然后您可以通过如下的代码尝试访问Kafka
了。
val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
val result = scala.io.Source.fromURL(url).mkString
// Create a dataframe from the JSON data
val taxiDF = spark.read.json(Seq(result).toDS)
// Display the dataframe containing trip data
taxiDF.show()
val kafkaBrokers="10.3.0.11:9092,10.3.0.14:9092,10.3.0.12:9092,10.3.0.13:9092"
val kafkaTopic="tripdata"
println("Finished setting Kafka broker and topic configuration.")
// Select the vendorid as the key and save the JSON string as the value.
val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
println("Data sent to Kafka")
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
// Define a schema for the data
val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
println("Schema declared")
// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
// Select data and write to file
val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
println("Wrote data to file")
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
println("Wrote data to file")
至此就可以顺利的通过Spark Struct Stream
访问Kafka
了。