校园春色亚洲色图_亚洲视频分类_中文字幕精品一区二区精品_麻豆一区区三区四区产品精品蜜桃

主頁 > 知識庫 > spark通過kafka-appender指定日志輸出到kafka引發的死鎖問題

spark通過kafka-appender指定日志輸出到kafka引發的死鎖問題

熱門標簽:怎么去掉地圖標注文字 北京外呼系統咨詢電話 地圖標注資源分享注冊 合肥阿里辦理400電話號 廊坊地圖標注申請入口 高德地圖標注公司位置需要錢嗎 慶陽外呼系統定制開發 襄陽外呼增值業務線路解決方案 海南人工外呼系統哪家好

在采用log4j的kafka-appender收集spark任務運行日志時,發現提交到yarn上的任務始終ACCEPTED狀態,無法進入RUNNING狀態,并且會重試兩次后超時。期初認為是yarn資源不足導致,但在確認yarn資源充裕的時候問題依舊,而且基本上能穩定復現。

起初是這么配置spark日志輸出到kafka的:

log4j.rootCategory=INFO, console, kafka
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n

# Kafka appender
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# Set Kafka topic and brokerList
log4j.appender.kafka.topic=yarn_spark_log
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=10
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m

這里用org.apache.kafka.log4jappender.KafkaLog4jAppender默認將所有日志都輸出到kafka,這個appender已經被kafka官方維護,穩定性應該是可以保障的。

問題定位

發現問題后,嘗試將輸出到kafka的規則去掉,問題解除!于是把問題定位到跟日志輸出到kafka有關。通過其他測試,證實目標kafka其實是正常的,這就非常奇怪了。

查看yarn的ResourceManager日志,發現有如下超時

2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Updating application attempt appattempt_1578970174552_3204_000002 with final
 state: FAILED, and exit status: -1000
2020-05-07 21:49:48,231 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from LAUNCHED to FINAL_SAV
ING on event = EXPIRE

表明,yarn本身是接收任務的,但是發現任務遲遲沒有啟動。在spark的場景下其實是指只有driver啟動了,但是沒有啟動executor。
而查看driver日志,發現日志輸出到一個地方就卡住了,不往下繼續了。通過對比成功運行和卡住的情況發現,日志卡在這條上:

2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root
2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A

卡住的情況下,只會打出SecurityManager這行,而無法打出Metadata這行。
猜想Metadata這行是kafka-client本身打出來的,因為整個上下文只有yarn, spark, kafka-client可能會打出這個日志。

在kafka-client 2.2.0版本中找到這個日志是輸出位置:

public synchronized void update(MetadataResponse metadataResponse, long now) {
  ...

  String newClusterId = cache.cluster().clusterResource().clusterId();
  if (!Objects.equals(previousClusterId, newClusterId)) {
    log.info("Cluster ID: {}", newClusterId);
  }
  ...
}

看到synchronized,高度懷疑死鎖。于是考慮用jstack分析:

在yarn上運行spark任務的時候,driver進程叫ApplicationMaster,executor進程叫CoarseGrainedExecutorBackend。這里首先嘗試再復現過程中找到drvier最終在哪個節點上運行,然后快速使用jstack -F pid>打印堆棧

jstack果然不負眾望,報告了死鎖!這里我把結果貼的全一點

[root@node1 ~]# jstack 20136
20136: Unable to open socket file: target process not responding or HotSpot VM not loaded
The -F option can be used when the target process is not responding
[root@node1 ~]# jstack -F 20136
Attaching to process ID 20136, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.231-b11
Deadlock Detection:

Found one Java-level deadlock:
=============================

"kafka-producer-network-thread | producer-1":
 waiting to lock Monitor@0x00000000025fcc48 (Object@0x00000000ed680b60, a org/apache/kafka/log4jappender/KafkaLog4jAppender),
 which is held by "main"
"main":
 waiting to lock Monitor@0x00007fec9dbde038 (Object@0x00000000ee44de38, a org/apache/kafka/clients/Metadata),
 which is held by "kafka-producer-network-thread | producer-1"

Found a total of 1 deadlock.

Thread 20157: (state = BLOCKED)
 - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=0, line=231 (Interpreted frame)
 - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
 - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci=34, line=324 (Interpreted frame)
 - org.apache.kafka.clients.Metadata.update(org.apache.kafka.common.requests.MetadataResponse, long) @bci=317, line=365 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.apache.kafka.common.requests.RequestHeader, long, org.apache.kafka.common.requests.MetadataResponse) @bci=184, line=1031 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long) @bci=215, line=822 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient.poll(long, long) @bci=132, line=544 (Interpreted frame)
 - org.apache.kafka.clients.producer.internals.Sender.run(long) @bci=227, line=311 (Interpreted frame)
 - org.apache.kafka.clients.producer.internals.Sender.run() @bci=28, line=235 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)


Thread 20150: (state = BLOCKED)


Thread 20149: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove(long) @bci=59, line=144 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove() @bci=2, line=165 (Interpreted frame)
 - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=216 (Interpreted frame)


Thread 20148: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=502 (Interpreted frame)
 - java.lang.ref.Reference.tryHandlePending(boolean) @bci=54, line=191 (Interpreted frame)
 - java.lang.ref.Reference$ReferenceHandler.run() @bci=1, line=153 (Interpreted frame)


Thread 20137: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - org.apache.kafka.clients.Metadata.awaitUpdate(int, long) @bci=63, line=261 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long) @bci=160, line=983 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=19, line=860 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=12, line=840 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci=3, line=727 (Interpreted frame)
 - org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @bci=69, line=283 (Interpreted frame)
 - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=106, line=251 (Interpreted frame)
 - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
 - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci=12, line=305 (Interpreted frame)
 - org.apache.spark.internal.Logging$class.logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci=29, line=54 (Interpreted frame)
 - org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci=2, line=44 (Interpreted frame)
 - org.apache.spark.SecurityManager.setViewAcls(scala.collection.immutable.Set, java.lang.String) @bci=36, line=139 (Interpreted frame)
 - org.apache.spark.SecurityManager.init>(org.apache.spark.SparkConf, scala.Option) @bci=158, line=81 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster.init>(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci=85, line=70 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci=25, line=802 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci=4 (Interpreted frame)

到這里,已經確定是死鎖,導致driver一開始就運行停滯,那么當然無法提交executor執行。
具體的死鎖稍后分析,先考慮如何解決。從感性認識看,似乎只要不讓kafka-client的日志也輸出到kafka即可。實驗后,發現果然如此:如果只輸出org.apache.spark的日志就可以正常執行。

根因分析

從stack的結果看,造成死鎖的是如下兩個線程:

  • kafka-client內部的網絡線程spark
  • 主入口線程

兩個線程其實都是卡在打日志上了,觀察堆棧可以發現,兩個線程同時持有了同一個log對象。而這個log對象實際上是kafka-appender。而kafka-appender本質上持有kafka-client,及其內部的Metadata對象。log4j的doAppend為了保證線程安全也用synchronized修飾了:

public
 synchronized 
 void doAppend(LoggingEvent event) {
  if(closed) {
   LogLog.error("Attempted to append to closed appender named ["+name+"].");
   return;
  }
  
  if(!isAsSevereAsThreshold(event.level)) {
   return;
  }

  Filter f = this.headFilter;
  
  FILTER_LOOP:
  while(f != null) {
   switch(f.decide(event)) {
   case Filter.DENY: return;
   case Filter.ACCEPT: break FILTER_LOOP;
   case Filter.NEUTRAL: f = f.next;
   }
  }
  
  this.append(event);  
 }

于是事情開始了:

  • main線程嘗試打日志,首先進入了synchronized的doAppend,即獲取了kafka-appender的鎖
  • kafka-appender內部需要調用kafka-client發送日志到kafka,最終調用到Thread 20137展示的,運行到Metadata.awaitUpdate(也是個synchronized方法),內部的wait會嘗試獲取metadata的鎖。(詳見https://github.com/apache/kaf...)
  • 但此時,kafka-producer-network-thread線程剛好進入了上文提到的打Cluster ID這個日志的這個階段(update方法也是synchronized的),也就是說kafka-producer-network-thread線程獲得了metadata對象的鎖
  • kafka-producer-network-thread線程要打印日志同樣執行synchronized的doAppend,即獲取了kafka-appender的鎖

上圖main-thread持有了log對象鎖,要求獲取metadata對象鎖;kafka-producer-network-thread持有了metadata對象鎖,要求獲取log對象鎖于是造成了死鎖。

總結

到此這篇關于spark通過kafka-appender指定日志輸出到kafka引發的死鎖的文章就介紹到這了,更多相關spark指定日志輸出內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • Docker搭建Zookeeper&Kafka集群的實現
  • 詳解使用docker搭建kafka環境
  • Docker + Nodejs + Kafka + Redis + MySQL搭建簡單秒殺環境
  • Python通過kerberos安全認證操作kafka方式
  • Kafka Java Producer代碼實例詳解
  • Spring boot集成Kafka消息中間件代碼實例
  • Java實現Kafka生產者消費者代碼實例
  • Spring Boot集群管理工具KafkaAdminClient使用方法解析
  • Kafka單節點偽分布式集群搭建實現過程詳解

標簽:商丘 平頂山 哈密 臺州 株洲 鶴崗 鎮江 綿陽

巨人網絡通訊聲明:本文標題《spark通過kafka-appender指定日志輸出到kafka引發的死鎖問題》,本文關鍵詞  spark,通過,kafka-appender,指定,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《spark通過kafka-appender指定日志輸出到kafka引發的死鎖問題》相關的同類信息!
  • 本頁收集關于spark通過kafka-appender指定日志輸出到kafka引發的死鎖問題的相關信息資訊供網民參考!
  • 推薦文章
    校园春色亚洲色图_亚洲视频分类_中文字幕精品一区二区精品_麻豆一区区三区四区产品精品蜜桃
    色偷偷成人一区二区三区91| 中文字幕一区二区三区不卡在线 | 日韩激情一二三区| 高清视频一区二区| 欧美电影免费观看高清完整版在| 亚洲另类在线一区| 丁香六月久久综合狠狠色| 日韩欧美电影一二三| 亚洲午夜精品网| 91国偷自产一区二区三区观看| 欧美成人vps| 日本欧美大码aⅴ在线播放| 91麻豆自制传媒国产之光| 国产精品久久久久久久久免费桃花| 狠狠色丁香久久婷婷综合丁香| 欧美一区二区在线看| 天堂在线亚洲视频| 欧美人牲a欧美精品| 美女脱光内衣内裤视频久久网站 | 首页国产欧美久久| 91亚洲资源网| 中文字幕在线观看不卡| 成人精品gif动图一区| 久久在线免费观看| 国产伦精品一区二区三区在线观看 | 国产美女在线精品| 欧美sm极限捆绑bd| 麻豆久久一区二区| 欧美电影免费提供在线观看| 麻豆传媒一区二区三区| 51精品久久久久久久蜜臀| 日本午夜精品一区二区三区电影| 日本精品裸体写真集在线观看| 亚洲色图视频免费播放| 一本色道a无线码一区v| 亚洲不卡在线观看| 欧美一级免费观看| 久久99热狠狠色一区二区| 国产亚洲欧美色| 成人动漫一区二区在线| 亚洲四区在线观看| 欧美三区在线视频| 老司机精品视频一区二区三区| 久久夜色精品国产噜噜av| 国产999精品久久| 亚洲乱码中文字幕| 欧美高清视频www夜色资源网| 日韩高清不卡一区二区| 久久久久青草大香线综合精品| 国产aⅴ综合色| 亚洲一区在线免费观看| 日韩欧美一卡二卡| 成人h动漫精品一区二区| 夜夜揉揉日日人人青青一国产精品| 欧美日韩dvd在线观看| 精品一区二区三区视频| 国产精品久久久久久久蜜臀| 欧洲人成人精品| 九九**精品视频免费播放| 国产精品二三区| 6080亚洲精品一区二区| 国产jizzjizz一区二区| 亚洲一级二级三级| 久久久久久久久蜜桃| 色域天天综合网| 久草精品在线观看| 亚洲综合在线视频| 精品国产一区二区精华| 色婷婷久久综合| 国产一区二区在线免费观看| 亚洲欧美日韩在线| 日韩一区二区在线观看视频播放| 成人丝袜视频网| 日本午夜精品视频在线观看 | 国产乱码字幕精品高清av | 日本不卡的三区四区五区| 国产日韩欧美不卡| 欧美日韩成人激情| 91视频你懂的| 国产剧情在线观看一区二区| 天天综合网天天综合色| 国产精品美女久久久久久2018| 91精品国产欧美一区二区成人| a美女胸又www黄视频久久| 久久超碰97人人做人人爱| 亚洲主播在线播放| 国产精品三级av在线播放| 日韩欧美国产一二三区| 欧美日韩性生活| 一本色道久久综合亚洲91| 国产精品羞羞答答xxdd | 亚洲电影一级黄| 国产精品久久99| 欧美激情一区二区三区不卡 | 精品亚洲欧美一区| 五月婷婷激情综合网| 亚洲另类春色校园小说| 中文字幕 久热精品 视频在线| 日韩一区欧美小说| 国产在线播放一区三区四| 亚洲欧美日韩小说| 国产精品色眯眯| 欧美日韩亚洲综合在线 欧美亚洲特黄一级| 亚洲国产综合在线| 成人精品在线视频观看| 亚洲蜜臀av乱码久久精品| 欧美一区二区成人| 99r国产精品| 美女一区二区三区| 日韩一区中文字幕| 日韩一区二区精品葵司在线| 99re亚洲国产精品| 久草热8精品视频在线观看| 综合在线观看色| 日韩欧美一区中文| 一本一本大道香蕉久在线精品 | 亚洲大片免费看| 久久久精品黄色| 91精品久久久久久久99蜜桃| 成人午夜av电影| zzijzzij亚洲日本少妇熟睡| 午夜精品123| 亚洲免费在线播放| 亚洲一二三区视频在线观看| 五月综合激情日本mⅴ| 在线播放中文一区| 99国产精品视频免费观看| 久久成人综合网| 青草av.久久免费一区| 亚洲一区二区美女| 亚洲色图欧洲色图| 国产精品剧情在线亚洲| 久久久亚洲精品石原莉奈 | 3d动漫精品啪啪1区2区免费 | 69堂成人精品免费视频| 欧美在线一区二区| 色狠狠一区二区三区香蕉| 成人国产精品视频| 大白屁股一区二区视频| 国产伦理精品不卡| 国产久卡久卡久卡久卡视频精品| 日产精品久久久久久久性色| 亚洲高清不卡在线| 午夜久久电影网| 调教+趴+乳夹+国产+精品| 午夜欧美电影在线观看| 性欧美大战久久久久久久久| 亚洲一二三四在线观看| 亚洲在线视频一区| 一区二区理论电影在线观看| 亚洲激情男女视频| 亚洲卡通动漫在线| 一区二区三区在线视频观看| 亚洲一区二区三区爽爽爽爽爽| 一区二区在线看| 亚洲成国产人片在线观看| 午夜精品影院在线观看| 日韩精品一卡二卡三卡四卡无卡| 亚洲成av人**亚洲成av**| 日韩 欧美一区二区三区| 久久成人久久鬼色| 国产91精品入口| 91在线观看视频| 欧美日韩精品一区二区三区蜜桃 | 成人免费精品视频| 99久久久无码国产精品| 欧洲精品一区二区三区在线观看| 欧美在线看片a免费观看| 欧美色视频在线观看| 91精品国产91久久久久久一区二区 | 欧美三级电影网| 日韩精品专区在线影院重磅| 久久久天堂av| 一区二区三区日韩在线观看| 日韩avvvv在线播放| 国产一区不卡在线| 91黄色免费网站| 日韩精品综合一本久道在线视频| 国产欧美精品日韩区二区麻豆天美| 亚洲免费观看视频| 日韩国产欧美在线观看| 国产成人午夜高潮毛片| 日本大香伊一区二区三区| 日韩午夜在线影院| 17c精品麻豆一区二区免费| 五月天一区二区三区| 成人一级黄色片| 91麻豆精品国产91| 中文字幕第一页久久| 日韩影院在线观看| 成人精品gif动图一区| 555www色欧美视频| 自拍偷在线精品自拍偷无码专区| 日韩av不卡一区二区| 色噜噜狠狠成人网p站| 亚洲精品一区在线观看| 亚洲大型综合色站| 99精品视频中文字幕| 欧美精品一区二区久久久| 亚洲大片免费看|