Flink – Checkpoint

没有描述了整个checkpoint的流程,但是对于如何生成snapshot和恢复snapshot的过程,并没有详细描述,这里补充

 

StreamOperator

/**
 * Basic interface for stream operators. Implementers would implement one of
 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
 * that process elements.
 *
 * <p> The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
 * offers default implementation for the lifecycle and properties methods.
 *
 * <p> Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
 * the timer service, timer callbacks are also guaranteed not to be called concurrently with
 * methods on {@code StreamOperator}.
 *
 * @param <OUT> The output type of the operator
 */
public interface StreamOperator<OUT> extends Serializable {

    // ------------------------------------------------------------------------
    //  life cycle
    // ------------------------------------------------------------------------

    /**
     * Initializes the operator. Sets access to the context and the output.
     */
    void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);

    /**
     * This method is called immediately before any elements are processed, it should contain the
     * operator's initialization logic.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void open() throws Exception;

    /**
     * This method is called after all records have been added to the operators via the methods
     * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
     * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
     * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.

     * <p>
     * The method is expected to flush all remaining buffered data. Exceptions during this flushing
     * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
     * because the last data items are not processed properly.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void close() throws Exception;

    /**
     * This method is called at the very end of the operator's life, both in the case of a successful
     * completion of the operation, and in the case of a failure and canceling.
     *
     * This method is expected to make a thorough effort to release all resources
     * that the operator has acquired.
     */
    void dispose();

    // ------------------------------------------------------------------------
    //  state snapshots
    // ------------------------------------------------------------------------

    /**
     * Called to draw a state snapshot from the operator. This method snapshots the operator state
     * (if the operator is stateful) and the key/value state (if it is being used and has been
     * initialized).
     *
     * @param checkpointId The ID of the checkpoint.
     * @param timestamp The timestamp of the checkpoint.
     *
     * @return The StreamTaskState object, possibly containing the snapshots for the
     *         operator and key/value state.
     *
     * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator
     *                   and the key/value state.
     */
    StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;

    /**
     * Restores the operator state, if this operator's execution is recovering from a checkpoint.
     * This method restores the operator state (if the operator is stateful) and the key/value state
     * (if it had been used and was initialized when the snapshot ocurred).
     *
     * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
     * and before {@link #open()}.
     *
     * @param state The state of operator that was snapshotted as part of checkpoint
     *              from which the execution is restored.
     *
     * @param recoveryTimestamp Global recovery timestamp
     *
     * @throws Exception Exceptions during state restore should be forwarded, so that the system can
     *                   properly react to failed state restore and fail the execution attempt.
     */
    void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;

    /**
     * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.
     *
     * @param checkpointId The ID of the checkpoint that has been completed.
     *
     * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause
     *                   the program to fail and enter recovery.
     */
    void notifyOfCompletedCheckpoint(long checkpointId) throws Exception;

    // ------------------------------------------------------------------------
    //  miscellaneous
    // ------------------------------------------------------------------------

    void setKeyContextElement(StreamRecord<?> record) throws Exception;

    /**
     * An operator can return true here to disable copying of its input elements. This overrides
     * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
     */
    boolean isInputCopyingDisabled();

    ChainingStrategy getChainingStrategy();

    void setChainingStrategy(ChainingStrategy strategy);
}

这对接口会负责,将operator的state做snapshot和restore相应的state

StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;

void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;

 

首先看到,生成和恢复的时候,都是以StreamTaskState为接口

public class StreamTaskState implements Serializable, Closeable {

    private static final long serialVersionUID = 1L;

    private StateHandle<?> operatorState;

    private StateHandle<Serializable> functionState;

    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;

可以看到,StreamTaskState是对三种state的封装

AbstractStreamOperator,先只考虑kvstate的情况,其他的更简单

@Override
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
    // here, we deal with key/value state snapshots

    StreamTaskState state = new StreamTaskState();

    if (stateBackend != null) {
        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
            stateBackend.snapshotPartitionedState(checkpointId, timestamp);
        if (partitionedSnapshots != null) {
            state.setKvStates(partitionedSnapshots);
        }
    }

    return state;
}

@Override
@SuppressWarnings("rawtypes,unchecked")
public void restoreState(StreamTaskState state) throws Exception {
    // restore the key/value state. the actual restore happens lazily, when the function requests
    // the state again, because the restore method needs information provided by the user function
    if (stateBackend != null) {
        stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());
    }
}

可以看到flink1.1.0和之前比逻辑简化了,把逻辑都抽象到stateBackend里面去

 

AbstractStateBackend
/**
 * A state backend defines how state is stored and snapshotted during checkpoints.
 */
public abstract class AbstractStateBackend implements java.io.Serializable {

    protected transient TypeSerializer<?> keySerializer;

    protected transient ClassLoader userCodeClassLoader;

    protected transient Object currentKey;

    /** For efficient access in setCurrentKey() */
    private transient KvState<?, ?, ?, ?, ?>[] keyValueStates; //便于快速遍历的结构

    /** So that we can give out state when the user uses the same key. */
    protected transient HashMap<String, KvState<?, ?, ?, ?, ?>> keyValueStatesByName; //记录key的kvState

    /** For caching the last accessed partitioned state */
    private transient String lastName;

    @SuppressWarnings("rawtypes")
    private transient KvState lastState;

 

stateBackend.snapshotPartitionedState

public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
    if (keyValueStates != null) {
        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());

        for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
            KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);
            snapshots.put(entry.getKey(), snapshot);
        }
        return snapshots;
    }

    return null;
}

逻辑很简单,只是把cache的所有kvstate,创建一下snapshot,再push到HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots

 

stateBackend.injectKeyValueStateSnapshots,只是上面的逆过程

/**
 * Injects K/V state snapshots for lazy restore.
 * @param keyValueStateSnapshots The Map of snapshots
 */
@SuppressWarnings("unchecked,rawtypes")
public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
    if (keyValueStateSnapshots != null) {
        if (keyValueStatesByName == null) {
            keyValueStatesByName = new HashMap<>();
        }

        for (Map.Entry<String, KvStateSnapshot> state : keyValueStateSnapshots.entrySet()) {
            KvState kvState = state.getValue().restoreState(this,
                keySerializer,
                userCodeClassLoader);
            keyValueStatesByName.put(state.getKey(), kvState);
        }
        keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
    }
}

 

具体看看FsState的snapshot和restore逻辑,

AbstractFsState.snapshot

@Override
public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {

    try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { //

        // serialize the state to the output stream
        DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out));
        outView.writeInt(state.size());
        for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {
            N namespace = namespaceState.getKey();
            namespaceSerializer.serialize(namespace, outView);
            outView.writeInt(namespaceState.getValue().size());
            for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {
                keySerializer.serialize(entry.getKey(), outView);
                stateSerializer.serialize(entry.getValue(), outView);
            }
        }
        outView.flush(); //真实的内容是刷到文件的

        // create a handle to the state
        return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path
    }
}

 

createCheckpointStateOutputStream

@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
    checkFileSystemInitialized();

    Path checkpointDir = createCheckpointDirPath(checkpointID); //根据checkpointId,生成文件path
    int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
    return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
}

 

FsCheckpointStateOutputStream

封装了write,flush, closeAndGetPath接口,

public void flush() throws IOException {
    if (!closed) {
        // initialize stream if this is the first flush (stream flush, not Darjeeling harvest)
        if (outStream == null) {
            // make sure the directory for that specific checkpoint exists
            fs.mkdirs(basePath);

            Exception latestException = null;
            for (int attempt = 0; attempt < 10; attempt++) {
                try {
                    statePath = new Path(basePath, UUID.randomUUID().toString());
                    outStream = fs.create(statePath, false);
                    break;
                }
                catch (Exception e) {
                    latestException = e;
                }
            }

            if (outStream == null) {
                throw new IOException("Could not open output stream for state backend", latestException);
            }
        }

        // now flush
        if (pos > 0) {
            outStream.write(writeBuffer, 0, pos);
            pos = 0;
        }
    }
}

 

AbstractFsStateSnapshot.restoreState

@Override
public KvState<K, N, S, SD, FsStateBackend> restoreState(
    FsStateBackend stateBackend,
    final TypeSerializer<K> keySerializer,
    ClassLoader classLoader) throws Exception {

    // state restore
    ensureNotClosed();

    try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
        // make sure the in-progress restore from the handle can be closed
        registerCloseable(inStream);

        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);

        final int numKeys = inView.readInt();
        HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);

        for (int i = 0; i < numKeys; i++) {
            N namespace = namespaceSerializer.deserialize(inView);
            final int numValues = inView.readInt();
            Map<K, SV> namespaceMap = new HashMap<>(numValues);
            stateMap.put(namespace, namespaceMap);
            for (int j = 0; j < numValues; j++) {
                K key = keySerializer.deserialize(inView);
                SV value = stateSerializer.deserialize(inView);
                namespaceMap.put(key, value);
            }
        }

        return createFsState(stateBackend, stateMap); //
    }
    catch (Exception e) {
        throw new Exception("Failed to restore state from file system", e);
    }
}

Flink - state管理的更多相关文章

  1. Spark Streaming源码解读之State管理之UpdataStateByKey和MapWithState解密

    本期内容 : UpdateStateByKey解密 MapWithState解密 Spark Streaming是实现State状态管理因素: 01. Spark Streaming是按照整个Bach ...

  2. Flink - state

      public class StreamTaskState implements Serializable, Closeable { private static final long serial ...

  3. weex里Vuex state使用storage持久化

    在weex里使用Vuex作为state管理工具,问题来了,如何使得state可以持久化呢?weex官方提供store模块,因此我们可以尝试使用该模块来持久化state. 先看下该模块介绍: stora ...

  4. 快速入门SaltStack

    导读 SaltStack是基于Python开发的一套C/S架构配置管理工具(功能不仅仅是配置管理,如使用salt-cloud配置AWS EC2实例),它的底层使用ZeroMQ消息队列pub/sub方式 ...

  5. NGUI 之 不为人知的 NGUITools

    static public float soundVolume该属性是全局音效播放音量,按照文档说是用于NGUITools.PlaySound(),那也就意味着我的游戏如果用NGUITools.Pla ...

  6. 部署搭建 Saltstack(centos6.6)

    SaltStack介绍 官网:https://docs.saltstack.com/en/latest/ 中国saltstack用户组http://www.saltstack.cn/ 下图是它的子系统 ...

  7. Windows Phone 7之XNA游戏:重力感应

    Windows Phone XNA游戏提供的重力传感器可以利用量测重力的原理判手机移动的方向,允许使用者利用摇动或甩动手机的方式控制游戏的执行,其原理和汽车的安全气囊相同,在侦测到汽车快速减速的时候立 ...

  8. React Redux Sever Rendering实战

    # React Redux Sever Rendering(Isomorphic JavaScript) ![React Redux Sever Rendering(Isomorphic)入门](ht ...

  9. Java开源项目(备查)

    转自:http://www.blogjava.net/Carter0618/archive/2008/08/11/221222.html Spring Framework  [Java开源 J2EE框 ...

随机推荐

  1. 读javascript高级程序设计00-目录

    javascript高级编程读书笔记系列,也是本砖头书.感觉js是一种很好上手的语言,不过本书细细读来发现了很多之前不了解的细节,受益良多.<br/>本笔记是为了方便日后查阅,仅作学习交流 ...

  2. webservice jsonp格式调用

    前端  $.ajax({            type: "get",            url: "http://baiduzd.yihu.com.cn/APIS ...

  3. SQL Pretty Printer-不错的SQL格式化工具

    前言 好长时间没有写过博客了,人变懒了很多,应该说本来也不怎么勤快.但今天为了这个工具,必须得勤快一下了,天下真的没有免费的午餐. 之前使用过sql server 2000的查询设计器和Toad fo ...

  4. 未能正确加载 ”Microsoft.VisualStudio.Editor.Implementation.EditorPackate“包错误解决方法

    今天新来一个同事,帮他搭建开发环境.发现他的vs2012一打开就报错. 错误提示: 未能正确加载 "Microsoft.VisualStudio.Editor.Implementation. ...

  5. P2P的原理和常见的实现方式(为libjingle开路)

    参考原文 为了项目的IM应用,最近在研究libjingle,中间看了也收集了很多资料,感慨网上很多资料要么太过于纠结协议(如STUN.ICE等)实现细节,要么中间有很多纰漏.最后去伪存真,归纳总结了一 ...

  6. cocos2d-x如何解决图片显示模糊问题

    转载http://zhidao.baidu.com/link?url=JTUKP5quGfMQixLZSvtC2XlKMkQDyQbYW72_DRyD6KDRpkLs8_6poQtKkwsyqzU8q ...

  7. NOIP200002税收与补贴

    试题描述 每样商品的价格越低,其销量就会相应增大.现已知某种商品的成本及其在若干价位上的销量(产品不会低于成本销售),并假设相邻价位间销量的变化是线性的且在价格高于给定的最高价位后,销量以某固定数值递 ...

  8. umbraco使用VS安装

    新建——程序包管理器控制台——install - package umbracocms vs中的快捷键: ctrl+F5为调试: ctrl+shift+B生成解决方案: 打包前,App_data文件夹 ...

  9. Telnet、SSH和VNC

    以下内容出自<Red Hat Linux服务器配置与应用>第17章:Telnet.SSH和VNC服务的配置与应用.俗话说:“前人栽树,后人乘凉”.我懒得再照书本打一遍了,就从这里拷贝了一份 ...

  10. Django 安全策略的 7 条总结!

    Florian Apolloner 发言主题为 Django 安全,其中并未讨论针对 SSL 协议的攻击--因为那不在 Django 涉及范围内.(如感兴趣可参考 https://www.ssllab ...