在上一篇中,我們介紹了Apache Flink的基本概念。本篇將帶領你從零開始,搭建Flink開發環境,并完成一個簡單應用的配置、部署與運行全流程。
一個完整的Flink開發環境需要以下幾部分:
1. Java開發環境:Flink核心基于Java(也支持Scala)。請確保已安裝JDK 8或11(推薦)。
`bash
# 檢查Java版本
java -version
`
2. 構建工具:Maven配置
對于Java項目,我們使用Maven管理依賴。在~/.m2/settings.xml中,可以配置鏡像倉庫以加速依賴下載(國內用戶建議配置)。
`xml
`
4. Flink本地安裝(可選,用于本地運行和測試)
從Flink官網下載對應版本的二進制包,解壓即可。
`bash
# 解壓后,可以啟動一個本地單節點集群
./bin/start-cluster.sh
# 訪問Web UI: http://localhost:8081
`
我們將創建一個Maven項目,實現一個簡單的單詞計數(WordCount)應用。
1. 使用Maven Archetype創建項目
`bash
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.17.0 \
-DgroupId=com.learn.flink \
-DartifactId=flink-quickstart \
-Dversion=1.0 \
-Dpackage=com.learn.flink \
-DinteractiveMode=false
`
2. 項目核心依賴
查看生成的pom.xml,核心依賴是:
`xml
`
3. 編寫WordCount示例代碼
在src/main/java下創建StreamingWordCount.java:
`java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
// 1. 創建流式執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 定義數據源(這里從Socket讀取,用于測試)
DataStreamSource
// 3. 轉換操作:切分、計數
SingleOutputStreamOperator
.flatMap(new Tokenizer())
.keyBy(value -> value.f0) // 按單詞分組
.sum(1); // 對第二個字段(計數)求和
// 4. 輸出結果(打印到控制臺)
result.print();
// 5. 觸發程序執行(流式作業必須調用)
env.execute("Streaming WordCount");
}
// 自定義函數,將一行文本拆分成(單詞,1)的二元組
public static class Tokenizer implements FlatMapFunction
@Override
public void flatMap(String value, Collector
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (!word.isEmpty()) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
`
1. 確保有一個Socket源。可以使用nc命令在終端開啟一個服務:
`bash
# Linux/Mac
nc -lk 9999
# Windows可以使用其他工具,如netcat
`
StreamingWordCount的main方法。nc終端輸入幾行英文句子,即可在IDE控制臺看到實時單詞計數輸出。1. 打包應用:
`bash
cd flink-quickstart
mvn clean package -DskipTests
`
在target目錄下生成JAR包(如flink-quickstart-1.0.jar)。
2. 提交到本地運行的Flink集群:
`bash
# 首先確保已啟動本地集群(./bin/start-cluster.sh)
./bin/flink run \
-c com.learn.flink.StreamingWordCount \
/path/to/your/flink-quickstart-1.0.jar
`
localhost:9999發送文本數據來觸發計算。以YARN Session模式為例:
1. 啟動YARN Session:
`bash
./bin/yarn-session.sh -tm 2048 -s 2
`
2. 提交作業:
`bash
./bin/flink run \
-yid
-c com.learn.flink.StreamingWordCount \
/path/to/your/flink-quickstart-1.0.jar
`
Flink應用的配置主要通過ExecutionEnvironment或StreamExecutionEnvironment進行。
`java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. 設置并行度(全局)
env.setParallelism(4);
// 2. 開啟Checkpoint(用于容錯)
env.enableCheckpointing(10000); // 每10秒一次
// 3. 從配置文件讀取配置(如flink-conf.yaml)
// 本地運行時,可加載自定義配置文件
Configuration config = new Configuration();
config.setString("taskmanager.memory.process.size", "2048m");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);`
還可以通過pom.xml中的<properties>和<profiles>來管理不同環境(開發/測試/生產)的構建配置。
###
至此,你已經完成了從環境搭建、項目創建、代碼編寫到應用部署運行的完整流程。關鍵步驟是:
DataStream API的編程模式(創建環境、定義源、轉換、輸出、觸發執行)。你可以嘗試更復雜的數據源(如Kafka)、狀態操作、窗口計算等,并深入探索Flink在實時數據處理領域的強大能力。