I’ve added the following line to flink-conf.yaml:
env.java.opts: “-Ddy.props.path=/PATH/TO/PROPS/FILE”
when starting jobmanager (jobmanager.sh start cluster) I see in logs that the jvm option is indeed recognized
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - JVM Options: 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xms256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xmx256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -XX:MaxPermSize=256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml
but when I run a flink job (flink run -d PROG.JAR), System.getProperty(“dy.props.path”) returns null (and when printing the system properties, I see that it is indeed absent.)
The question really is – how do I set system properties that will be available inside of the flink-job’s code?
Advertisement
Answer
The question is very much connected with the runtime architecture of Flink [1].
I understand you’re running your job in a standalone cluster. Remember that the JobManager
and the TaskManager
s run in separate jvm instances. You have to consider where each block of code will be executed.
For instance, the code in transformations like map
or filter
is executed on the TaskManager
.
The Code in the main
method of your entry class is executed in the command line tool flink
, which of course does not have the system property set, as it spawns a temporary(-d) jvm just for the job submission.
If you submit your job through WebUI
the code from your main
method is executed on the JobManager
so the property will be set then.
In general I would rather discourage passing the program arguments through the system properties as it is a bad practice.
Below you have a simple example:
I started:
- a
JobManager
withenv.java.opts:"-Ddy.props.path=jobmanager"
- a
TaskManager
withenv.java.opts:"-Ddy.props.path=taskmanager"
The code of my job looks following:
object Main { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(1 to 4) val prop = System.getProperty("dy.props.path") stream.map(_ => System.getProperty("dy.props.path") + " mainArg: " + prop).print() env.execute("stream") } }
When I submit the code through flink
tool the output is as follows:
taskmanager mainArg: null taskmanager mainArg: null taskmanager mainArg: null taskmanager mainArg: null
When it is submitted through the WebUI
I get:
taskmanager mainArg: jobmanager taskmanager mainArg: jobmanager taskmanager mainArg: jobmanager taskmanager mainArg: jobmanager