我用attachment pipeline插件实现PDF文本的抽取,单个附件上传解析文本内容没有问题,但是多个附件根据pipeline上传时传不上去
原生的命令都可以执行,但是通过Java api上传不了
多附件文本抽取管道和索引都没问题
下面是单个的上传没有问题
/**
* 单个上传文件
*
* @param file
* @throws IOException
*/
public void uploadEs(FileObj file) throws IOException {
IndexRequest indexRequest = new IndexRequest("file");
//上传同时,使用attachment pipline进行提取文件
indexRequest.source(JSON.toJSONString(file), XContentType.JSON);
indexRequest.setPipeline("attachment");
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
System.out.println(indexResponse);
}
多个上传时这个方法行不通
try {
BulkRequest bulkRequest = new BulkRequest();
if (dataList != null && dataList.size() > 0) {
for (File obj : dataList) {
bulkRequest.add(
new IndexRequest(index)
.source(JSON.toJSONString(obj), XContentType.JSON).setPipeline("attachment")
);
}
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (!response.hasFailures()) {
return true;
}
}
} catch (Exception e) {
e.printStackTrace();
}
最终目的是想要下面官方文档根据管道批量上传对应的Java代码
PUT my_index/_doc/my_id?pipeline=attachment
{
"attachments" : [
{
"filename" : "ipsum.txt",
"data" : "dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo="
},
{
"filename" : "test.txt",
"data" : "VGhpcyBpcyBhIHRlc3QK"
}
]
}
从您提供的代码和问题描述中,我无法判断您遇到的具体问题,可能原因有很多。然而,我可以提供一些可能有助于解决问题的建议和注意事项。
首先,检查您的 Java 代码是否存在语法错误或逻辑错误。建议使用调试器逐行调试代码,以帮助确定错误发生的位置和原因。
根据您提供的单个文件上传代码,我们可以看到将 Java 对象序列化为 JSON 格式的方法是 JSON.toJSONString()
,这可能会导致某些类型的数据无法正确地序列化为 JSON 格式。因此,建议您使用 Elasticsearch 提供的 Java 客户端 API,为 Java 对象创建索引(如您提供的单个文件上传代码所示),而不是将其序列化为 JSON 格式。
在使用 Elasticsearch 的 Java 客户端 API 和 BulkRequest 上传多个文档时,确保文档的格式正确,并根据 Elasticsearch 管道要求设置正确的 pipeline 参数。 您可以使用以下代码为多个附件上传文档:
// 创建多个附件的插入请求
String index = "my_index";
String pipeline = "attachment";
List<Map<String, Object>> attachments = new ArrayList<>();
for (File file : files) {
byte[] fileData = Files.readAllBytes(Paths.get(file.getAbsolutePath()));
String base64Data = Base64.getEncoder().encodeToString(fileData);
Map<String, Object> attachment = new HashMap<>();
attachment.put("filename", file.getName());
attachment.put("data", base64Data);
attachments.add(attachment);
}
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("attachments", attachments);
builder.endObject();
String jsonString = builder.string();
BulkRequest bulkRequest = new BulkRequest();
List<String> docIds = new ArrayList<>();
for (int i = 0; i < files.size(); i++) {
bulkRequest.add(
new IndexRequest(index).id(docIds.get(i)).source(jsonString, XContentType.JSON).setPipeline(pipeline)
);
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
在此示例中,我们将多个附件文件读入字节数组中,并将其转换为 Base64 字符串。然后,我们将附件数据和名称组装成 Map,然后将这些 Map 组装成一个包含多个附件的父文档。最后,使用 Elasticsearch 的 Java 客户端 API 将这些父文档集成到 bulkRequest
中,并传递正确的 pipeline 参数。
希望这些建议和代码示例对您有所帮助。
参考GPT和自己的思路:你的代码看起来应该可以正常工作,因为你使用了与单个文档上传相同的方法来上传多个文档。不过,为了确保正确性,以下是一些建议:
1 确保dataList中包含多个有效的File对象。你可以通过打印出dataList中每个文件的数量来检查它是否正确。
2 确保你在Elasticsearch中正确地设置了attachment管道。你可以通过使用curl命令来检查它是否正常工作:
curl -XPUT 'http://localhost:9200/my_index/_doc/my_id?pipeline=attachment' \
-H 'Content-Type: application/json' \
-d '{
"attachments": [
{
"filename": "ipsum.txt",
"data": "dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo="
},
{
"filename": "test.txt",
"data": "VGhpcyBpcyBhIHRlc3QK"
}
]
}'
3 确保你的Elasticsearch集群和Java客户端的版本相匹配。如果Java客户端与Elasticsearch版本不兼容,则可能会出现问题。
4 确保您正确处理了异常。在处理BulkResponse对象时,您可能需要检查哪些操作失败了,并采取相应的措施。
最后,如果你的代码仍然无法正常工作,你可以尝试使用Elasticsearch的官方Java客户端提供的BulkProcessor来上传多个文档。它可以自动处理异常和批处理文档。下面是一个例子:
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// Before executing the bulk request
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// After executing the bulk request
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// After a failure occurs while executing the bulk request
}
})
.setBulkActions(1000)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
for (File obj : dataList) {
bulkProcessor.add(new IndexRequest(index).source(JSON.toJSONString(obj), XContentType.JSON).setPipeline("attachment"));
}
bulkProcessor.close();
希望这些建议能够帮助你解决问题。
参考GPT和自己的思路,根据您提供的信息,可能存在几个问题。以下是一些可能需要调整的建议:
1.使用Elasticsearch Attachment Processor插件进行文本提取时,必须使用 PUT 请求在 Elasticsearch 中注册一个新的 ingest pipeline,然后使用 setPipeline 方法在 IndexRequest 中指定该 pipeline。因此,请确保您已经在 Elasticsearch 中注册了一个名为 "attachment" 的 pipeline。
2,您的代码使用了 JSON.toJSONString(obj) 将文件对象转换为 JSON 字符串。这是不正确的,因为 Elasticsearch 无法将文件对象作为 JSON 字符串解析。您需要使用 org.apache.commons.io.IOUtils 类或其他方法将文件内容读取为字符串,然后将字符串作为文档内容传递给 Elasticsearch。
3.您的代码将每个文件都作为单独的请求发送到 Elasticsearch。这意味着如果您上传的文件太多,可能会导致网络和服务器负载方面的问题。您可以考虑使用 Elasticsearch 的 BulkRequest API 一次性批量处理多个文档。
下面是更新后的代码,如有疑问请回复我:
try {
BulkRequest bulkRequest = new BulkRequest();
if (dataList != null && dataList.size() > 0) {
for (File file : dataList) {
String content = IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("content", content);
IndexRequest request = new IndexRequest(index)
.source(jsonMap, XContentType.JSON)
.setPipeline("attachment");
bulkRequest.add(request);
}
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (!response.hasFailures()) {
return true;
}
}
} catch (Exception e) {
e.printStackTrace();
}
这段代码使用 org.apache.commons.io.IOUtils 读取文件内容,并将其作为字符串传递给 Elasticsearch。此外,它还使用 BulkRequest API 一次性批量处理多个文档,以提高性能并减少网络和服务器负载。
回答不易,还请采纳!!!
上传不了看看是文件类型的限制,或者是文件大小的限制,或者是网络的问题。
以下答案由GPT-3.5大模型与博主波罗歌共同编写:
可以尝试将多个附件文档的内容以数组形式组织,然后在Java API中构建类似于官方文档中给出的请求数据。具体实现参考代码如下:
/**
* 批量上传多个文件
*
* @param fileList 文件列表
* @param index 索引名称
* @return 是否上传成功
*/
public boolean uploadEs(List<FileObj> fileList, String index) {
try {
BulkRequest bulkRequest = new BulkRequest();
if (fileList != null && fileList.size() > 0) {
List<Map<String, Object>> attachments = new ArrayList<>();
for (FileObj file : fileList) {
Map<String, Object> attachment = new HashMap<>();
byte[] fileContent = FileUtils.readFileToByteArray(file.getFile());
attachment.put("filename", file.getFileName());
attachment.put("data", Base64.getEncoder().encodeToString(fileContent));
attachments.add(attachment);
}
Map<String, Object> source = new HashMap<>();
source.put("attachments", attachments);
String jsonString = JSON.toJSONString(source);
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.source(jsonString, XContentType.JSON);
indexRequest.setPipeline("attachment");
bulkRequest.add(indexRequest);
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (!response.hasFailures()) {
return true;
}
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
在这段代码中,我们先将多个附件文档的内容封装成一个数组attachments,其中每个元素为一个Map,包含文件名和文件内容两个字段。将这个数组组装到源码中,并指定了pipeline,通过IndexRequest构造器创建IndexRequest,此时source的参数为组装好的json字符串,最后将IndexRequest加入到BulkRequest中,最终使用bulk方法上传。
如果我的回答解决了您的问题,请采纳!