腳本
-c,--class <classname> 程序的入口(main method or getplan()).只有在jar程序的manifest中沒有指定class
-m,--jobmanager <host:port> 在哪運行yarn-cluster
-C,--classpath <url> 代碼路徑
-p,--parallelism <parallelism> 并行度
-ynm,--yarnname <arg> 設(shè)置application的名字
-yjm,--yarnjobManagerMemory <arg> JobManager Container的內(nèi)存
-ytm,--yarntaskManagerMemory <arg> TaskManager Container的內(nèi)存
-s,--fromSavepoint <savepointPath> savepoint保存的地方,路徑需寫到chk-某個數(shù)
-yn,--yarncontainer <arg> Number of YARN container to allocate(=Number of Task Managers)
-ys,--yarnslots <arg> Number of slots per TaskManager
-yq -yD env.java.opts.taskmanager="-Dsun.stdout.encoding=utf-8"
代碼中配置
//狀態(tài)管理器MemoryStateBackend,FsStateBackend,RocksDBStateBackend后倆需要指定路徑
env.setStateBackend(stateBackend);
//設(shè)置保存間隔,每 1000ms 開始一次 checkpoint
env.enableCheckpointing(1000);
//exactly-ance 和 at-least-once 語義選擇,設(shè)置模式為精確一次 (這是默認(rèn)值)
env.enableCheckpointing(10,EXACTLY_ONCE);
//checkpoint最小時間間隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//Checkpoint 超時時間,Checkpoint 必須在一分鐘內(nèi)完成,否則就會被拋棄
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
//最大并行執(zhí)行的檢查點數(shù)量,默認(rèn)是一個
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//開啟在 job 中止后仍然保留的 externalized checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//當(dāng)checkpoint出現(xiàn)錯誤時是否關(guān)閉應(yīng)用,默認(rèn)是true,我們可以手動設(shè)置為false
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
//默認(rèn)的重啟策略是:固定延遲無限重啟
//此處設(shè)置重啟策略為:出現(xiàn)異常重啟1次,隔5秒一次
bsEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(5)));
//設(shè)置任務(wù)處理的時間,事件時間,注入時間,處理時間
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);