Java elasticserach

我用attachment pipeline插件实现PDF文本的抽取,单个附件上传解析文本内容没有问题,但是多个附件根据pipeline上传时传不上去
原生的命令都可以执行,但是通过Java api上传不了

多附件文本抽取管道和索引都没问题
下面是单个的上传没有问题

 /**
     * 单个上传文件
     *
     * @param file
     * @throws IOException
     */
    public void uploadEs(FileObj file) throws IOException {
        IndexRequest indexRequest = new IndexRequest("file");

        //上传同时,使用attachment pipline进行提取文件
        indexRequest.source(JSON.toJSONString(file), XContentType.JSON);
        indexRequest.setPipeline("attachment");

        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        System.out.println(indexResponse);
    }

多个上传时这个方法行不通

 try {
            BulkRequest bulkRequest = new BulkRequest();
            if (dataList != null && dataList.size() > 0) {
                for (File obj : dataList) {
                    bulkRequest.add(
                            new IndexRequest(index)
                                    .source(JSON.toJSONString(obj), XContentType.JSON).setPipeline("attachment")
                    );
                }
                BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
                if (!response.hasFailures()) {
                    return true;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

最终目的是想要下面官方文档根据管道批量上传对应的Java代码

PUT my_index/_doc/my_id?pipeline=attachment
{
  "attachments" : [
    {
      "filename" : "ipsum.txt",
      "data" : "dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo="
    },
    {
      "filename" : "test.txt",
      "data" : "VGhpcyBpcyBhIHRlc3QK"
    }
  ]
}

从您提供的代码和问题描述中,我无法判断您遇到的具体问题,可能原因有很多。然而,我可以提供一些可能有助于解决问题的建议和注意事项。

  1. 首先,检查您的 Java 代码是否存在语法错误或逻辑错误。建议使用调试器逐行调试代码,以帮助确定错误发生的位置和原因。

  2. 根据您提供的单个文件上传代码,我们可以看到将 Java 对象序列化为 JSON 格式的方法是 JSON.toJSONString(),这可能会导致某些类型的数据无法正确地序列化为 JSON 格式。因此,建议您使用 Elasticsearch 提供的 Java 客户端 API,为 Java 对象创建索引(如您提供的单个文件上传代码所示),而不是将其序列化为 JSON 格式。

  3. 在使用 Elasticsearch 的 Java 客户端 API 和 BulkRequest 上传多个文档时,确保文档的格式正确,并根据 Elasticsearch 管道要求设置正确的 pipeline 参数。 您可以使用以下代码为多个附件上传文档:

// 创建多个附件的插入请求
String index = "my_index";
String pipeline = "attachment";
List<Map<String, Object>> attachments = new ArrayList<>();
for (File file : files) {
    byte[] fileData = Files.readAllBytes(Paths.get(file.getAbsolutePath()));
    String base64Data = Base64.getEncoder().encodeToString(fileData);
    Map<String, Object> attachment = new HashMap<>();
    attachment.put("filename", file.getName());
    attachment.put("data", base64Data);
    attachments.add(attachment);
}
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("attachments", attachments);
builder.endObject();
String jsonString = builder.string();
BulkRequest bulkRequest = new BulkRequest();
List<String> docIds = new ArrayList<>();
for (int i = 0; i < files.size(); i++) {
    bulkRequest.add(
        new IndexRequest(index).id(docIds.get(i)).source(jsonString, XContentType.JSON).setPipeline(pipeline)
    );
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);

在此示例中,我们将多个附件文件读入字节数组中,并将其转换为 Base64 字符串。然后,我们将附件数据和名称组装成 Map,然后将这些 Map 组装成一个包含多个附件的父文档。最后,使用 Elasticsearch 的 Java 客户端 API 将这些父文档集成到 bulkRequest 中,并传递正确的 pipeline 参数。

希望这些建议和代码示例对您有所帮助。

参考GPT和自己的思路:你的代码看起来应该可以正常工作,因为你使用了与单个文档上传相同的方法来上传多个文档。不过,为了确保正确性,以下是一些建议:

1 确保dataList中包含多个有效的File对象。你可以通过打印出dataList中每个文件的数量来检查它是否正确。

2 确保你在Elasticsearch中正确地设置了attachment管道。你可以通过使用curl命令来检查它是否正常工作:

curl -XPUT 'http://localhost:9200/my_index/_doc/my_id?pipeline=attachment' \
     -H 'Content-Type: application/json' \
     -d '{
           "attachments": [
              {
                 "filename": "ipsum.txt",
                 "data": "dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo="
              },
              {
                 "filename": "test.txt",
                 "data": "VGhpcyBpcyBhIHRlc3QK"
              }
           ]
         }'


3 确保你的Elasticsearch集群和Java客户端的版本相匹配。如果Java客户端与Elasticsearch版本不兼容,则可能会出现问题。

4 确保您正确处理了异常。在处理BulkResponse对象时,您可能需要检查哪些操作失败了,并采取相应的措施。

最后,如果你的代码仍然无法正常工作,你可以尝试使用Elasticsearch的官方Java客户端提供的BulkProcessor来上传多个文档。它可以自动处理异常和批处理文档。下面是一个例子:

BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                // Before executing the bulk request
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                // After executing the bulk request
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                // After a failure occurs while executing the bulk request
            }
        })
        .setBulkActions(1000)
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
        .setFlushInterval(TimeValue.timeValueSeconds(5))
        .setConcurrentRequests(1)
        .setBackoffPolicy(
                BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
        .build();

for (File obj : dataList) {
    bulkProcessor.add(new IndexRequest(index).source(JSON.toJSONString(obj), XContentType.JSON).setPipeline("attachment"));
}

bulkProcessor.close();


希望这些建议能够帮助你解决问题。

参考GPT和自己的思路,根据您提供的信息,可能存在几个问题。以下是一些可能需要调整的建议:

1.使用Elasticsearch Attachment Processor插件进行文本提取时,必须使用 PUT 请求在 Elasticsearch 中注册一个新的 ingest pipeline,然后使用 setPipeline 方法在 IndexRequest 中指定该 pipeline。因此,请确保您已经在 Elasticsearch 中注册了一个名为 "attachment" 的 pipeline。

2,您的代码使用了 JSON.toJSONString(obj) 将文件对象转换为 JSON 字符串。这是不正确的,因为 Elasticsearch 无法将文件对象作为 JSON 字符串解析。您需要使用 org.apache.commons.io.IOUtils 类或其他方法将文件内容读取为字符串,然后将字符串作为文档内容传递给 Elasticsearch。

3.您的代码将每个文件都作为单独的请求发送到 Elasticsearch。这意味着如果您上传的文件太多,可能会导致网络和服务器负载方面的问题。您可以考虑使用 Elasticsearch 的 BulkRequest API 一次性批量处理多个文档。

下面是更新后的代码,如有疑问请回复我:

try {
    BulkRequest bulkRequest = new BulkRequest();
    if (dataList != null && dataList.size() > 0) {
        for (File file : dataList) {
            String content = IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
            Map<String, Object> jsonMap = new HashMap<>();
            jsonMap.put("content", content);
            IndexRequest request = new IndexRequest(index)
                    .source(jsonMap, XContentType.JSON)
                    .setPipeline("attachment");
            bulkRequest.add(request);
        }
        BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        if (!response.hasFailures()) {
            return true;
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

这段代码使用 org.apache.commons.io.IOUtils 读取文件内容,并将其作为字符串传递给 Elasticsearch。此外,它还使用 BulkRequest API 一次性批量处理多个文档,以提高性能并减少网络和服务器负载。
回答不易,还请采纳!!!

上传不了看看是文件类型的限制,或者是文件大小的限制,或者是网络的问题。

以下答案由GPT-3.5大模型与博主波罗歌共同编写:
可以尝试将多个附件文档的内容以数组形式组织,然后在Java API中构建类似于官方文档中给出的请求数据。具体实现参考代码如下:

    /**
     * 批量上传多个文件
     *
     * @param fileList 文件列表
     * @param index    索引名称
     * @return 是否上传成功
     */
    public boolean uploadEs(List<FileObj> fileList, String index) {
        try {
            BulkRequest bulkRequest = new BulkRequest();
            if (fileList != null && fileList.size() > 0) {
                List<Map<String, Object>> attachments = new ArrayList<>();
                for (FileObj file : fileList) {
                    Map<String, Object> attachment = new HashMap<>();
                    byte[] fileContent = FileUtils.readFileToByteArray(file.getFile());
                    attachment.put("filename", file.getFileName());
                    attachment.put("data", Base64.getEncoder().encodeToString(fileContent));
                    attachments.add(attachment);
                }
                Map<String, Object> source = new HashMap<>();
                source.put("attachments", attachments);
                String jsonString = JSON.toJSONString(source);

                IndexRequest indexRequest = new IndexRequest(index);
                indexRequest.source(jsonString, XContentType.JSON);
                indexRequest.setPipeline("attachment");
                bulkRequest.add(indexRequest);
                BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
                if (!response.hasFailures()) {
                    return true;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

在这段代码中,我们先将多个附件文档的内容封装成一个数组attachments,其中每个元素为一个Map,包含文件名和文件内容两个字段。将这个数组组装到源码中,并指定了pipeline,通过IndexRequest构造器创建IndexRequest,此时source的参数为组装好的json字符串,最后将IndexRequest加入到BulkRequest中,最终使用bulk方法上传。
如果我的回答解决了您的问题,请采纳!