0%

Elasticsearch 数据导入导出 Java 实现工具类

Elasticsearch 数据导入导出 Java 实现

最近学了elasticsearch 对它也不是非常的熟悉,它好像没有像 mongodb 有mongodump 这样的工具方便。

虽然也有一些别人做的插件工具。但嫌麻烦,所以整理了网上一些大神写代码。工具类如下。

如果发现有不对的地方,欢迎指正。或者可以优化的地方,欢迎指点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package top.lrshuai.blog.util;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
/**
*
* @author rstyro
*
*/
public class CopyUtil {
public static void main(String[] args) throws Exception {
String srcClustName="robot";
String srcIndexName="robot4";
String srcIp="127.0.0.1";
int srcPort = 9300;

String tagClustName="robot";
String tagIndexName="robot6";
String tagTypeName="brain";
String tagIp="127.0.0.1";
int tagPort = 9300;

esToEs(srcClustName, srcIndexName, srcIp, srcPort, tagClustName, tagIndexName, tagTypeName, tagIp, tagPort);

//outToFile(srcClustName, srcIndexName, null, srcIp, srcPort, "f:\\json.txt");
//fileToEs(tagClustName, tagIndexName, tagTypeName, tagIp, tagPort, "f:\\json.txt");
}

/**
* 数据拷贝
* elasticsearch 到 elasticsearch
* @param srcClustName 原集群名称
* @param srcIndexName 原索引
* @param srcIp 原ip
* @param srcPort 原 transport 服务端口(默认9300的端口)
* @param tagClustName 目标集群名称
* @param tagIndexName 目标索引
* @param tagTypeName 目标type
* @param tagIp 目标ip
* @param tagPort 目标transport服务端口
* @throws InterruptedException
*/
public static void esToEs(String srcClustName,String srcIndexName,String srcIp,int srcPort,String tagClustName,String tagIndexName,String tagTypeName,String tagIp,int tagPort) throws InterruptedException{
Settings srcSettings = Settings.builder()
.put("cluster.name", srcClustName)
// .put("client.transport.sniff", true)
//.put("client.transport.ping_timeout", "30s")
//.put("client.transport.nodes_sampler_interval", "30s")
.build();
TransportClient srcClient = new PreBuiltTransportClient(srcSettings);
srcClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(srcIp, srcPort)));

Settings tagSettings = Settings.builder()
.put("cluster.name", tagClustName)
//.put("client.transport.sniff", true)
// .put("client.transport.ping_timeout", "30s")
// .put("client.transport.nodes_sampler_interval", "30s")
.build();
TransportClient tagClient = new PreBuiltTransportClient(tagSettings);
tagClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(tagIp, tagPort)));

SearchResponse scrollResp = srcClient.prepareSearch(srcIndexName)
.setScroll(new TimeValue(1000))
.setSize(1000)
.execute().actionGet();

BulkRequestBuilder bulk = tagClient.prepareBulk();
ExecutorService executor = Executors.newFixedThreadPool(5);
while(true){
bulk = tagClient.prepareBulk();
final BulkRequestBuilder bulk_new = bulk;
System.out.println("查询条数="+scrollResp.getHits().getHits().length);
for(SearchHit hit : scrollResp.getHits().getHits()){
IndexRequest req = tagClient.prepareIndex().setIndex(tagIndexName)
.setType(tagTypeName).setSource(hit.getSourceAsMap()).request();
bulk_new.add(req);
}
executor.execute(new Runnable() {
@Override
public void run() {
bulk_new.execute();
}
});
Thread.sleep(100);
scrollResp = srcClient.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(1000)).execute().actionGet();
if(scrollResp.getHits().getHits().length == 0){
break;
}
}
//该方法在加入线程队列的线程执行完之前不会执行
executor.shutdown();
System.out.println("执行结束");
tagClient.close();
srcClient.close();
}

/**
* elasticsearch 数据到文件
* @param clustName 集群名称
* @param indexName 索引名称
* @param typeName type名称
* @param sourceIp ip
* @param sourcePort transport 服务端口
* @param filePath 生成的文件路径
*/
public static void outToFile(String clustName,String indexName,String typeName,String sourceIp,int sourcePort,String filePath){
Settings settings = Settings.builder()
.put("cluster.name", clustName)
//.put("client.transport.sniff", true)
// .put("client.transport.ping_timeout", "30s")
// .put("client.transport.nodes_sampler_interval", "30s")
.build();
TransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, sourcePort)));
SearchRequestBuilder builder = client.prepareSearch(indexName);
if(typeName != null){
builder.setTypes(typeName);
}
builder.setQuery(QueryBuilders.matchAllQuery());
builder.setSize(10000);
builder.setScroll(new TimeValue(6000));
SearchResponse scrollResp = builder.execute().actionGet();
try {
//把导出的结果以JSON的格式写到文件里
BufferedWriter out = new BufferedWriter(new FileWriter(filePath, true));
long count = 0;
while (true) {
for(SearchHit hit : scrollResp.getHits().getHits()){
String json = hit.getSourceAsString();
if(!json.isEmpty() && !"".equals(json)){
out.write(json);
out.write("\r\n");
count++;
}
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(6000)).execute().actionGet();
if(scrollResp.getHits().getHits().length == 0){
break;
}
}
System.out.println("总共写入数据:"+count);
out.close();
client.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

}

/**
* 把json 格式的文件导入到elasticsearch 服务器
* @param clustName 集群名称
* @param indexName 索引名称
* @param typeName type 名称
* @param sourceIp ip
* @param sourcePort 端口
* @param filePath json格式的文件路径
*/
@SuppressWarnings("deprecation")
public static void fileToEs(String clustName,String indexName,String typeName,String sourceIp,int sourcePort,String filePath){
Settings settings = Settings.builder()
.put("cluster.name", clustName)
//.put("client.transport.sniff", true)
//.put("client.transport.ping_timeout", "30s")
//.put("client.transport.nodes_sampler_interval", "30s")
.build();
TransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, sourcePort)));

try {
//把导出的结果以JSON的格式写到文件里
BufferedReader br = new BufferedReader(new FileReader(filePath));
String json = null;
int count = 0;
//开启批量插入
BulkRequestBuilder bulkRequest = client.prepareBulk();
while ((json = br.readLine()) != null) {
bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json));
//每一千条提交一次
count++;
// if (count% 1000==0) {
// System.out.println("提交了1000条");
// BulkResponse bulkResponse = bulkRequest.execute().actionGet();
// if (bulkResponse.hasFailures()) {
// System.out.println("message:"+bulkResponse.buildFailureMessage());
// }
// //重新创建一个bulk
// bulkRequest = client.prepareBulk();
// }
}
bulkRequest.execute().actionGet();
System.out.println("总提交了:" + count);
br.close();
client.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

}

}

您的打赏,是我创作的动力!不给钱?那我只能靠想象力充饥了。