I have tried to log info about states that recovered when I start a Flink app using savepoints. I see that the app was started using a savepoint correctly. But I can’t log state values that were recovered.
The app calculates how many products were bought.
Can I use CheckpointedFunction in such way? I see that context.isRestored() return true, but the checkpointedState is always empty. Where do I misunderstand something?
public class ProductStatisticFunction extends RichFlatMapFunction<User, ProductStatistic> implements CheckpointedFunction { private transient ValueState<ProductStatistic> statisticsValueState; private transient ListState<ProductStatistic> checkpointedState; @Override public void flatMap(User user, Collector<ProductStatistic> out) throws Exception { ProductStatistic currentValue = statisticsValueState.value(); if (currentValue.getAge() == 0) { currentValue.setAge(user.getAge()); } currentValue.setAmount(currentValue.getAmount() + 1); statisticsValueState.update(currentValue); out.collect(new ProductStatistic(currentValue.getId, currentValue.getAge(), currentValue.getAmount())); } @Override public void open(Configuration config) { ValueStateDescriptor<ProductStatistic> descriptor = new ValueStateDescriptor<>("age-statistic", TypeInformation.of(new TypeHint<>() { }), new ProductStatistic()); statisticsValueState = getRuntimeContext().getState(descriptor); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { this.checkpointedState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("restored", ProductStatistic.class)); if (context.isRestored()) { for (ProductStatistic entry : this.checkpointedState.get()) { log.warn("....")); } } } }
Advertisement
Answer
I’m afraid that’s not how this works, and there’s no straightforward way to see the key/value pairs that are being restored to the age-statistic ValueState.
You could, however, use the State Processor API to examine the state stored in the savepoint. Or you could change ProductStatisticFunction
into a KeyedBroadcastProcessFunction
and use applyToKeyedState
to examine the live state values whenever you want to while the job is running.