使用Azure Databricks通过JDBC读入大量数据异常处理(一)问题介绍
分类: Azure Databricks ◆ 标签: #Azure #Databricks #Spark ◆ 发布于: 2023-06-18 19:40:13

近期遇到一个非常典型的用户案例: 用户使用Azure Databricks
通过JDBC
数据源向Delta Table
输入大量的数据,在运行了两个半小时之后报错,数据导入失败,并且报了两种异常:
异常1:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <command-2115040429906825> in <module> 35 36 if df.count() > 0: ---> 37 df.show(5) 38 df.write.mode("overwrite").saveAsTable("STG_BTXSEARCH_RESULT") /databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical) 488 """ 489 if isinstance(truncate, bool) and truncate: --> 490 print(self._jdf.showString(n, 20, vertical)) 491 else: 492 print(self._jdf.showString(n, int(truncate), vertical)) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306 /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 115 def deco(*a, **kw): 116 try: --> 117 return f(*a, **kw) 118 except py4j.protocol.Py4JJavaError as e: 119 converted = convert_exception(e.java_exception) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o314.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 13) (10.20.34.196 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. Driver stacktrace: ...... at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748)
异常2:
$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:684) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.lang.OutOfMemoryError: Java heap space at com.sun.crypto.provider.CipherCore.prepareInputBuffer(CipherCore.java:1025) ....... at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75) at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42) at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1587) at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87) at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48) at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1600) at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1654) at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1000) at com.mysql.cj.NativeSession.execSQL(NativeSession.java:666)
用户使用的的数据源是Azure Database for mysql
, 需要导入的表数据行数330万行,数据大小在44个G左右。仔细检查了客户的notebook, 客户的语句其实非常简单:
Class.forName("org.mariadb.jdbc.Driver") val jdbcHostname = "mysqldbservername.database.chinacloudapi.cn" val jdbcPort = 3306 val jdbcDatabase = "mysqldbname" // Create the JDBC URL without passing in the user and password parameters. val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}" // Create a Properties() object to hold the parameters. import java.util.Properties val connectionProperties = new Properties() connectionProperties.put("user", "<username>") connectionProperties.put("password", "<userpassword>") val driverClass = "org.mariadb.jdbc.Driver" connectionProperties.setProperty("Driver", driverClass) val df = (spark.read.jdbc(url=jdbcUrl, table="<Table Name>", connectionProperties=connectionProperties)) if df is not None: df.write.model("overwrite").format("delta").saveAsTable("<Delta table name>")
同时检查用户的具体环境:
- Databricks runtime 9.1 LTS
- Cluster worker Node: 32G memory, 4 Core, 启用了Auto Scale, Mini workers 1, Max Workers 4, 运行时根据系统负载启动了两个workers node.
- Cluster Driver Node: 32G Memory, 4 Core.
根据异常我们判断这个并非Azure Databricks
平台有什么问题,只是用户从JDBC
中导入大量数据时,引发了Out of Memory
异常, 这种异常很常见,我们有成熟的步骤来缓解它:
- 优化客户导入数据的代码,使用
JDBC
数据源的配置项提升数据导入的并行度,并将导入的到数据分区,从而避免发生Out Of Memory
这样的问题。 - 优化集群的配置,调整
Spark
运行过程中使用到内存环境, 避免发生Out OF Memory
的问题。
我们后面就这两个方案详细的论述基本原理和处理的办法。