Flink可以在单台机器上运行,甚至是单个Java虚拟机(Java Virtual Machine)。这种机制使得用户可以在本地测试或者调试Flink程序。本节主要概述Flink本地模式的运行机制。
本地环境和执行器(executors)运行你在本地的Java虚拟机上运行Flink程序,或者是在属于正在运行程序的如何Java虚拟机上。对于大部分示例程序而言,你只需简单的地点击你IDE上的运行(Run)按钮就可以执行。
Flink支持两种不同的本地执行模式:(1)、LocalExecutionEnvironment
启动了Flink完整的运行环境,包括一个JobManager和一个TaskManager。运行环境中包括了内存管理,并且所有的内部算法再试在集群模式下运行的。(2)、CollectionEnvironment
在Java集合上运行Flink程序。这种模式下不会启动Flink完整的执行环境,所以这种模式负载非常低,而且非常轻量级。例如 DataSet.map()-转换操作,执行器将是对Java list里的所有元素进行map()函数的操作。
调试
如果你本地运行Flink程序,你可以像普通的Java程序一样调试Flink程序。 你既可以使用System.out.println()
打印一些内部变量,或者使用调试工具。 你还可以在map(), reduce() 或者其他方法中打断点。更多的详情请参见Java API文档中的调试章节,以便了解如何使用Java API工具进行测试或者本地调试等。
Maven依赖
如果你在Maven工程中开发你的程序,你需要将flink-clients
模块加入到你项目的依赖中:
<dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-clients_2.10</artifactid> <version>1.1-SNAPSHOT</version> </dependency>
本地执行
LocalEnvironment
是在本地模式运行Flink程序的句柄,可以用它在本地的 JVM (standalone 或嵌入其他程序)里运行程序。本地执行环境是通过调用ExecutionEnvironment.createLocalEnvironment()
方法实现的。默认情况下,Flink会根据你机器的CPU核数开启同样多的线程来执行程序。当然,我们可以指定程序的并行度。本模式的日志可以通过enableLogging()/disableLogging()
来设置是否向标准输出输出。
在大多数情况下,推荐使用ExecutionEnvironment.getExecutionEnvironment()
来获得Flink的运行环境。 当程序是在本地(未使用命令行接口)启动时,该方法会返回LocalEnvironment
,当程序是通过命令行接口命令行接口(CLI)提交时,则该方法会返回集群的执行环境。
public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSet<String> data = env.readTextFile("file:///path/to/file"); data .filter(new FilterFunction<String>() { public boolean filter(String value) { return value.startsWith("http://"); } }) .writeAsText("file:///path/to/result"); JobExecutionResult res = env.execute(); }
JobExecutionResult
对象在程序执行结束时会返回,这个类中包含了程序的运行环境和累加器的结果。
LocalEnvironment
类也可以向Flink传入一个用户自定义的配置选项。
Configuration conf = new Configuration(); conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f); final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
集合环境
在Java集合上执行操作将会使用CollectionEnvironment
类,这个类执行Flink程序时使用了一些低开销的方法。 这种模式通常用于自动化测试、调试、代码重用等场景。
用户可以实现用于批处理的算法,或者是实现用于更具交互行的场下的算法。一个Flink程序通过稍微修改即可用于处理请求的Java应用服务器。下面是集合环境的例子:
public static void main(String[] args) throws Exception { // initialize a new Collection-based execution environment final ExecutionEnvironment env = new CollectionEnvironment(); DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */); /* Data Set transformations ... */ // retrieve the resulting Tuple2 elements into a ArrayList. Collection<...> result = new ArrayList<...>(); resultDataSet.output(new LocalCollectionOutputFormat<...>(result)); // kick off execution. env.execute(); // Do some work with the resulting ArrayList (=Collection). for(... t : result) { System.err.println("Result = "+t); } }
在flink-examples-batch模块中有完整的示例,类名是CollectionExecutionExample
.
值得注意的是,基于集合环境的 Flink 程序只适用于小数据, 不能超过JVM的堆内存大小 集合模式中的执行器只使用了单线程,而不是多线程。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Flink:本地执行(Local Execution)】(https://www.iteblog.com/archives/1654.html)