hive on spark
flink任务中使用JDBC连接hive,数据入库,每次都是程序运行大概几个小时会报错误,hiveserver2停止,不知道是做什么原因导致该问题,没有找到相关日志。
insert sql语句
java.sql.SQLException: org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
at org.apache.hive.jdbc.HiveStatement.runAsyncOnServer(HiveStatement.java:334) ~[hive-jdbc-3.1.2.jar:3.1.2]
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:265) ~[hive-jdbc-3.1.2.jar:3.1.2]
at org.apache.hive.jdbc.HiveStatement.executeUpdate(HiveStatement.java:511) ~[hive-jdbc-3.1.2.jar:3.1.2]
at org.apache.hive.jdbc.HivePreparedStatement.executeUpdate(HivePreparedStatement.java:122) ~[hive-jdbc-3.1.2.jar:3.1.2]
at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61) ~[HikariCP-java7-2.4.12.jar:?]
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java)
[HIVE-22196] Socket timeouts happen when other drivers set DriverManager.loginTimeout - ASF JIRA
程序中使用的Hutool的Db工具类
使用hive时调用setLoginTimeout,方法以下,是否能彻底解决该问题还需要验证
private static void setLoginTimeout(String groupName) {
if (DbGroupEnum.HIVE.getName().equals(groupName)) {
PooledDataSource.getDataSource(groupName).setLoginTimeout(30000);
}
}
新问题:https://ask.csdn.net/questions/7605535?spm=1001.2014.3001.5505
后来还是回归到使用自己写的工具类了,每次都释放资源
import cn.hutool.json.JSONUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HiveJdbcUtil {
private static Logger log = LoggerFactory.getLogger(HiveJdbcUtil.class);
private static String driverName = AppConfig.getInstance().getHiveJdbc().getDriver();
private static String url = AppConfig.getInstance().getHiveJdbc().getUrl();
private static String user = AppConfig.getInstance().getHiveJdbc().getUser();
private static String password = AppConfig.getInstance().getHiveJdbc().getPassword();
static {
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
/**
* 执行任意sql,增加/删除/修改等
*
* @param sql
* @return 执行结果, 结果是ResultSet对象,则为true
* 如果是更新计数或没有结果,则为false
*/
public static boolean exec(String sql) {
DriverManager.setLoginTimeout(30000);
try (
Connection conn = DriverManager.getConnection(url, user, "");
PreparedStatement stmt = conn.prepareStatement(sql);
) {
stmt.execute();
return true;
} catch (SQLException throwables) {
log.error("{}", throwables);
return false;
}
}
/**
* 查询数据使用,返回Map集合
*
* @param sql
* @return
*/
public static List<Map<String, String>> queryToMaps(String sql) {
DriverManager.setLoginTimeout(30000);
List<Map<String, String>> l = new ArrayList<>();
try (
Connection conn = DriverManager.getConnection(url, user, "");
PreparedStatement stmt = conn.prepareStatement(sql);
final ResultSet resultSet = stmt.executeQuery()
) {
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
Map<String, String> map = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String name = metaData.getColumnName(i);
final String value = resultSet.getString(name);
//String string = resultSet.getString(i);
map.put(name, value);
}
l.add(map);
}
} catch (SQLException e) {
log.error("{}", e);
}
return l;
}
/**
* 获取对象集合
*
* @param sql
* @param c
* @param <T>
* @return
*/
public static <T> List<T> queryToObjs(String sql, Class<T> c) {
return JSONUtil.toList(JSONUtil.parseArray(queryToMaps(sql)), c);
}
}