flink测试redis sink报错
admin
2024-02-09 23:47:07

具体报错

1    [Source: Custom Source -> Sink: Unnamed (1/1)#0] ERROR org.apache.flink.streaming.connectors.redis.RedisSink  - Redis has not been properly initialized: 
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the poolat redis.clients.util.Pool.getResource(Pool.java:50)at redis.clients.jedis.JedisPool.getResource(JedisPool.java:99)at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.getInstance(RedisContainer.java:250)at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.open(RedisContainer.java:85)at org.apache.flink.streaming.connectors.redis.RedisSink.open(RedisSink.java:174)at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at java.lang.Thread.run(Thread.java:750)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connectat redis.clients.jedis.Connection.connect(Connection.java:164)at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:80)at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1676)at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87)at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)at redis.clients.util.Pool.getResource(Pool.java:48)... 14 more
Caused by: java.net.ConnectException: Connection refused: connectat java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)at java.net.Socket.connect(Socket.java:607)at redis.clients.jedis.Connection.connect(Connection.java:158)... 21 more
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)at akka.dispatch.OnComplete.internal(Future.scala:264)at akka.dispatch.OnComplete.internal(Future.scala:261)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:517)at akka.actor.Actor.aroundReceive$(Actor.scala:515)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)... 4 more
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the poolat redis.clients.util.Pool.getResource(Pool.java:50)at redis.clients.jedis.JedisPool.getResource(JedisPool.java:99)at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.getInstance(RedisContainer.java:250)at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.open(RedisContainer.java:85)at org.apache.flink.streaming.connectors.redis.RedisSink.open(RedisSink.java:174)at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at java.lang.Thread.run(Thread.java:750)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connectat redis.clients.jedis.Connection.connect(Connection.java:164)at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:80)at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1676)at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87)at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)at redis.clients.util.Pool.getResource(Pool.java:48)... 14 more
Caused by: java.net.ConnectException: Connection refused: connectat java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)at java.net.Socket.connect(Socket.java:607)at redis.clients.jedis.Connection.connect(Connection.java:158)... 21 more

根据报错定位到一个错误栈:Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect

应该是redis连接不上,缺人配置没有问题之后检查逐步排查其他问题(host、password、port、timeout等)

1、先检查了redis状态

[root@master src]# ps -aux | grep redis
root      11455  0.0  0.0 162396  7796 ?        Ssl  20:42   0:01 redis-server 127.0.0.1:6379
root      58166  0.0  0.0 112828   988 pts/0    S+   21:19   0:00 grep --color=auto redis
[root@master src]# netstat -nltp | grep redis
tcp        0      0 127.0.0.1:6379          0.0.0.0:*               LISTEN      11455/redis-server
[root@master src]# redis-cli
127.0.0.1:6379>

redis正常状态且能正常使用

2、检查防火墙是否关闭或者是否放行redis端口(redis默认端口6379)

[root@master src]# systemctl status firewalld
● firewalld.service - firewalld - dynamic firewall daemonLoaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)Active: inactive (dead)Docs: man:firewalld(1)
[root@master src]#

防火墙为关闭状态
3、查看redis配置里的保护模式是否关闭(redis3.2版本以上有这个属性)
查看redis配置里的bind是否关闭


[root@master redis-6.0.8]# cat redis.conf | grep protect
# Protected mode is a layer of security protection, in order to avoid that
# When protected mode is on and if:
# By default protected mode is enabled. You should disable it only if
protected-mode yes
# If the master is password protected (using the "requirepass" configuration
# on the internet. It's just a protection layer against misuse of the instance.
# So use the 'requirepass' option to protect your instance.
[root@master redis-6.0.8] # cat redis.conf |grep bind
# By default, if no "bind" configuration directive is specified, Redis listens
# the "bind" configuration directive, followed by one or more IP addresses.
# bind 192.168.1.100 10.0.0.1
# bind 127.0.0.1 ::1
# internet, binding to all the interfaces is dangerous and will expose the
# following bind directive, that will force Redis to listen only into
bind 127.0.0.1
# 1) The server is not binding explicitly to a set of addresses using the
#    "bind" directive.
# are explicitly listed using the "bind" directive.

可以看到redis保护模式合bind 参数是开启的

  • 将配置中的yes改为no
  • 将bind哪一行注释掉
  • 之后重新启动即可解决问题

相关内容

热门资讯

赤水性价比粮食酒推荐:2025... 赤水性价比粮食酒推荐:2025年酱香酒选购全攻略 一、开篇背景与市场痛点 2025年的赤水河流域酒类...
非白酒板块11月19日跌0.3... 证券之星消息,11月19日非白酒板块较上一交易日下跌0.33%,*ST椰岛领跌。当日上证指数报收于3...
以运河文化赋能产业发展|古贝春... 11月17日至19日,以“新质开新局,聚力创未来”为主题的2025年第六届中国白酒黄淮核心产区高质量...
深夜小酌的灵魂搭档:油炝脆骨,... 油炝脆骨是一道充满锅气与烟火气息的家常菜,以其爽脆的口感和浓郁的香辣风味深受许多人喜爱。这道菜的制作...
初中毕业新征程:为什么西点烘焙... 站在初中毕业的人生路口,许多女孩都在思考:哪条路能通往一个既美好又独立的未来?如果有一条道路,能将女...