本文共 1629 字,大约阅读时间需要 5 分钟。
官方参考文档地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#distributed-cache
Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!
缓存的使用流程:
接下来直接上代码:
package com.daxinimport java.io.{FileReader, BufferedReader}import org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationimport scala.collection.mutable.HashMap/** * * Created by Daxin on 2017/4/18. * */object DistributedCache { def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment //本地IDE运行,缓存的是本地文件,缓存文件名字为cahce env.registerCachedFile("file:///C://logs//flink", "cache") val data = env.fromElements("111", "222", "333") val result = data.map(new RichMapFunction[String, String] { val map = HashMap[String, String]() override def open(parameters: Configuration): Unit = { val file = getRuntimeContext.getDistributedCache.getFile("cache")//获取缓存文件 //读取缓存文件内容到HashMap中,这个也可以使用广播实现 val br = new BufferedReader(new FileReader(file)) var line = br.readLine() while (line != null) { map.put(line, line + "" + line) line = br.readLine() } } override def map(value: String): String = { map.getOrElse(value, "default") //返回该value在map中的value值,如果不存在key为value的返回默认default } }) result.print() //执行作业 }}
转载地址:http://ffjlf.baihongyu.com/