您的位置:首页技术文章
文章详情页

java连接ElasticSearch集群操作

【字号: 日期:2022-08-24 13:31:50浏览:40作者:猪猪

我就废话不多说了,大家还是直接看代码吧~

/* *es配置类 * */ @Configurationpublic class ElasticSearchDataSourceConfigurer { private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class); @Bean public TransportClient getESClient() { //设置集群名称 Settings settings = Settings.builder().put('cluster.name', 'bigData-cluster').put('client.transport.sniff', true).build(); //创建client TransportClient client = null; try { client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(''), 9300));//集群ip LOG.info('ESClient连接建立成功'); } catch (UnknownHostException e) { LOG.info('ESClient连接建立失败'); e.printStackTrace(); } return client; } }

/** * Simple to Introduction * * @Description: [添加类] */@Repositorypublic class UserDaoImpl implements userDao { private static final String INDEXNAME = 'user';//小写private static final String TYPENAME = 'info'; @ResourceTransportClient transportClient; @Overridepublic int addUser(User[] user) {IndexResponse indexResponse = null;int successNum = 0;for (int i = 0; i < user.length; i++) {UUID uuid = UUID.randomUUID();String str = uuid.toString();String jsonValue = null;try {jsonValue = JsonUtil.object2JsonString(user[i]);if (jsonValue != null) {indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue).execute().actionGet();successNum++;}} catch (JsonProcessingException e) {e.printStackTrace();} }return successNum;} }

/** *批量插入 */public static void bathAddUser(TransportClient client, List<User> users) { BulkRequestBuilder bulkRequest = transportClient.prepareBulk();for (int i = 0; i < users.size(); i++) {UUID uuid = UUID.randomUUID();String str = uuid.toString(); String jsonValue = null;try {jsonValue = JsonUtil.object2JsonString(users.get(i));} catch (JsonProcessingException e) {e.printStackTrace();}bulkRequest.add(client.prepareIndex('user', 'info', str).setSource(jsonValue));// 一万条插入一次if (i % 10000 == 0) {bulkRequest.execute().actionGet();}System.out.println('已经插入第' + i + '多少条');} }

补充知识:使用java创建ES(ElasticSearch)连接池

1.首先要有一个创建连接的工厂类

package com.aly.util; import org.apache.commons.pool2.PooledObject;import org.apache.commons.pool2.PooledObjectFactory;import org.apache.commons.pool2.impl.DefaultPooledObject;import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient; /** * EliasticSearch连接池工厂对象 * @author 00000 * */public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient>{ @Overridepublic void activateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {System.out.println('activateObject');}/** * 销毁对象 */@Overridepublic void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {RestHighLevelClient highLevelClient = pooledObject.getObject();highLevelClient.close();}/** * 生产对象 *///@SuppressWarnings({ 'resource' })@Overridepublic PooledObject<RestHighLevelClient> makeObject() throws Exception {//Settings settings = Settings.builder().put('cluster.name','elasticsearch').build();RestHighLevelClient client = null;try {/*client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(InetAddress.getByName('localhost'),9300));*/client = new RestHighLevelClient(RestClient.builder(new HttpHost('192.168.1.121', 9200, 'http'), new HttpHost('192.168.1.122', 9200, 'http'),new HttpHost('192.168.1.123', 9200, 'http'), new HttpHost('192.168.1.125', 9200, 'http'),new HttpHost('192.168.1.126', 9200, 'http'), new HttpHost('192.168.1.127', 9200, 'http'))); } catch (Exception e) {e.printStackTrace();}return new DefaultPooledObject<RestHighLevelClient>(client);} @Overridepublic void passivateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {System.out.println('passivateObject');} @Overridepublic boolean validateObject(PooledObject<RestHighLevelClient> arg0) {return true;}}

2.然后再写我们的连接池工具类

package com.aly.util; import org.apache.commons.pool2.impl.GenericObjectPool;import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import org.elasticsearch.client.RestHighLevelClient; /** * ElasticSearch 连接池工具类 * * @author 00000 * */public class ElasticSearchPoolUtil {// 对象池配置类,不写也可以,采用默认配置private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();// 采用默认配置maxTotal是8,池中有8个clientstatic {poolConfig.setMaxTotal(8);}// 要池化的对象的工厂类,这个是我们要实现的类private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();// 利用对象工厂类和配置类生成对象池private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory,poolConfig); /** * 获得对象 * * @return * @throws Exception */public static RestHighLevelClient getClient() throws Exception {// 从池中取一个对象RestHighLevelClient client = clientPool.borrowObject();return client;} /** * 归还对象 * * @param client */public static void returnClient(RestHighLevelClient client) {// 使用完毕之后,归还对象clientPool.returnObject(client);}}

以上这篇java连接ElasticSearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持好吧啦网。

标签: Java
相关文章: