Skip to content
Advertisement

How to see restored value from savepoints?

I have tried to log info about states that recovered when I start a Flink app using savepoints. I see that the app was started using a savepoint correctly. But I can’t log state values that were recovered.

The app calculates how many products were bought.

Can I use CheckpointedFunction in such way? I see that context.isRestored() return true, but the checkpointedState is always empty. Where do I misunderstand something?

public class ProductStatisticFunction extends RichFlatMapFunction<User, ProductStatistic> implements CheckpointedFunction {

    private transient ValueState<ProductStatistic> statisticsValueState;
    private transient ListState<ProductStatistic> checkpointedState;

    @Override
    public void flatMap(User user, Collector<ProductStatistic> out) throws Exception {

        ProductStatistic currentValue = statisticsValueState.value();

        if (currentValue.getAge() == 0) {
            currentValue.setAge(user.getAge());
        }

        currentValue.setAmount(currentValue.getAmount() + 1);
        statisticsValueState.update(currentValue);

        out.collect(new ProductStatistic(currentValue.getId, currentValue.getAge(), currentValue.getAmount()));
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<ProductStatistic> descriptor =
                new ValueStateDescriptor<>("age-statistic", TypeInformation.of(new TypeHint<>() {
                }),
                        new ProductStatistic());
        statisticsValueState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

        this.checkpointedState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("restored", ProductStatistic.class));
        if (context.isRestored()) {
            for (ProductStatistic entry : this.checkpointedState.get()) {
                log.warn("...."));
            }
        }
    }
}

Advertisement

Answer

I’m afraid that’s not how this works, and there’s no straightforward way to see the key/value pairs that are being restored to the age-statistic ValueState.

You could, however, use the State Processor API to examine the state stored in the savepoint. Or you could change ProductStatisticFunction into a KeyedBroadcastProcessFunction and use applyToKeyedState to examine the live state values whenever you want to while the job is running.

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement