使用spark查询elasticsearch连接超时

使用spark查询elasticsearch连接超时


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._ 
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.SparkSession

    val conf = new SparkConf().setAppName("ESCommentsProcessor")
    conf.set("es.index.auto.create", "true")    
    val spark = SparkSession.builder().master("local").config(conf).getOrCreate()
    val esOptions = Map(
      "es.nodes" -> "192.168.0.1",
      "es.net.http.auth.user" -> "root",
      "es.net.http.auth.pass" -> "123456",
      "es.query" -> """{
                       |  "query": {
                       |    "match_all": {}
                       |  },
                       |  "sort": [
                       |    {
                       |      "user_id": {
                       |        "order": "asc"
                       |      }
                       |    },
                       |    {
                       |      "@timestamp": {
                       |        "order": "asc"
                       |      }
                       |    }
                       |  ]
                       |}""".stripMargin
    )
    val comments: DataFrame = spark.read.format("org.elasticsearch.spark.sql").options(esOptions).load("comments@1671638400000_30")  //可获取index的feild,连接是建立起来的

    // 根据user_id和@timestamp字段筛选数据
    val filteredDF = comments.groupBy("user_id").agg(Map("@timestamp" -> "min")).withColumnRenamed("min(@timestamp)", "min_timestamp")//.join(comments, Seq("user_id", "min_timestamp")) 

filteredDF.show()   //卡在这里报错信息在下面

报错信息

scala> filteredDF.show
2023-06-21 18:41:04 ERROR NetworkClient:155 - Node [9.185.144.100:16431] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [192.168.0.1:9200]
2023-06-21 18:42:05 ERROR NetworkClient:155 - Node [9.185.147.16:14289] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [192.168.0.2:16431]
..........
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[*.*.*.*,*.*.*.*]]
..........

这个错误是连接elasticsearch时发生的连接超时错误。根据错误信息来看,连接尝试了多个节点但都失败了。有几个可能的原因:

  1. 网络问题:确保你的网络连接正常,并且可以访问elasticsearch节点的IP地址和端口。

  2. 防火墙问题:检查防火墙设置,确保允许从Spark集群到elasticsearch节点的网络通信。

  3. elasticsearch节点配置问题:确保elasticsearch节点的配置正确,允许外部访问,并且端口没有被阻止。

  4. elasticsearch集群问题:如果你使用的是elasticsearch集群,请确保所有节点都正常运行,并且配置正确。

你可以尝试以下方法来解决这个问题:

  1. 检查网络连接:确认你的网络连接正常,并且可以访问elasticsearch节点的IP地址和端口。

  2. 检查防火墙设置:确保防火墙设置允许从Spark集群到elasticsearch节点的网络通信。

  3. 检查elasticsearch节点配置:确保elasticsearch节点的配置正确,允许外部访问,并且端口没有被阻止。

  4. 检查elasticsearch集群状态:如果你使用的是elasticsearch集群,请确保所有节点都正常运行,并且配置正确。