Skip to content
Advertisement

Apache Flink – custom java options are not recognized inside job

I’ve added the following line to flink-conf.yaml:

env.java.opts: “-Ddy.props.path=/PATH/TO/PROPS/FILE”

when starting jobmanager (jobmanager.sh start cluster) I see in logs that the jvm option is indeed recognized

2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -  JVM Options:
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xms256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xmx256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -XX:MaxPermSize=256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml

but when I run a flink job (flink run -d PROG.JAR), System.getProperty(“dy.props.path”) returns null (and when printing the system properties, I see that it is indeed absent.)

The question really is – how do I set system properties that will be available inside of the flink-job’s code?

Advertisement

Answer

The question is very much connected with the runtime architecture of Flink [1].

I understand you’re running your job in a standalone cluster. Remember that the JobManager and the TaskManagers run in separate jvm instances. You have to consider where each block of code will be executed.

For instance, the code in transformations like map or filter is executed on the TaskManager. The Code in the main method of your entry class is executed in the command line tool flink, which of course does not have the system property set, as it spawns a temporary(-d) jvm just for the job submission.

If you submit your job through WebUI the code from your main method is executed on the JobManager so the property will be set then.

In general I would rather discourage passing the program arguments through the system properties as it is a bad practice.


Below you have a simple example:

I started:

  • a JobManager with env.java.opts:"-Ddy.props.path=jobmanager"
  • a TaskManager with env.java.opts:"-Ddy.props.path=taskmanager"

The code of my job looks following:

object Main {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromCollection(1 to 4)

    val prop = System.getProperty("dy.props.path")
    stream.map(_ => System.getProperty("dy.props.path") + "  mainArg: " + prop).print()

    env.execute("stream")
  }
}

When I submit the code through flink tool the output is as follows:

taskmanager  mainArg: null
taskmanager  mainArg: null
taskmanager  mainArg: null
taskmanager  mainArg: null

When it is submitted through the WebUI I get:

taskmanager  mainArg: jobmanager
taskmanager  mainArg: jobmanager
taskmanager  mainArg: jobmanager
taskmanager  mainArg: jobmanager
Advertisement