Skip to main content

使用Azure Databricks通过JDBC读入大量数据异常处理(一)问题介绍

分类:  Azure Databricks 标签:  #Azure #Databricks #Spark 发布于: 2023-06-18 19:40:13

近期遇到一个非常典型的用户案例: 用户使用Azure Databricks通过JDBC数据源向Delta Table输入大量的数据,在运行了两个半小时之后报错,数据导入失败,并且报了两种异常:

异常1:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-2115040429906825> in <module>
     35 
     36 if df.count() > 0:
---> 37     df.show(5)
     38     df.write.mode("overwrite").saveAsTable("STG_BTXSEARCH_RESULT")

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    488         """
    489         if isinstance(truncate, bool) and truncate:
--> 490             print(self._jdf.showString(n, 20, vertical))
    491         else:
    492             print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115     def deco(*a, **kw):
    116         try:
--> 117             return f(*a, **kw)
    118         except py4j.protocol.Py4JJavaError as e:
    119             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o314.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 13) (10.20.34.196 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
......
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:251)
 at java.lang.Thread.run(Thread.java:748)

异常2:

$sp.apply(JFunction0$mcV$sp.java:23) 
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:684) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more 
Caused by: java.lang.OutOfMemoryError: Java heap space at com.sun.crypto.provider.CipherCore.prepareInputBuffer(CipherCore.java:1025) 
.......
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75) 
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42) 
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1587) 
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87) 
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48) 
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1600) 
at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1654) 
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1000) 
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:666)

用户使用的的数据源是Azure Database for mysql, 需要导入的表数据行数330万行,数据大小在44个G左右。仔细检查了客户的notebook, 客户的语句其实非常简单:

Class.forName("org.mariadb.jdbc.Driver")

val jdbcHostname = "mysqldbservername.database.chinacloudapi.cn"
val jdbcPort = 3306
val jdbcDatabase = "mysqldbname"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("user", "<username>")
connectionProperties.put("password", "<userpassword>")
val driverClass = "org.mariadb.jdbc.Driver"
connectionProperties.setProperty("Driver", driverClass)


val df = (spark.read.jdbc(url=jdbcUrl,
  table="<Table Name>",
  connectionProperties=connectionProperties))

if df is not None:
    df.write.model("overwrite").format("delta").saveAsTable("<Delta table name>")

同时检查用户的具体环境:

  1. Databricks runtime 9.1 LTS
  2. Cluster worker Node: 32G memory, 4 Core, 启用了Auto Scale, Mini workers 1, Max Workers 4, 运行时根据系统负载启动了两个workers node.
  3. Cluster Driver Node: 32G Memory, 4 Core.

根据异常我们判断这个并非Azure Databricks 平台有什么问题,只是用户从JDBC中导入大量数据时,引发了Out of Memory异常, 这种异常很常见,我们有成熟的步骤来缓解它:

  1. 优化客户导入数据的代码,使用JDBC数据源的配置项提升数据导入的并行度,并将导入的到数据分区,从而避免发生Out Of Memory这样的问题。
  2. 优化集群的配置,调整Spark运行过程中使用到内存环境, 避免发生Out OF Memory的问题。

我们后面就这两个方案详细的论述基本原理和处理的办法。