阿里云E-MapReduce(开源大数据平台)从零到一对接使用完全指南
1. E-MapReduce产品概述
阿里云开源大数据平台E-MapReduce(简称EMR)是运行在阿里云平台上的一种大数据处理系统解决方案。EMR基于开源的Apache Hadoop和Apache Spark,让用户可以方便地使用Hadoop和Spark生态系统中的其他周边系统分析和处理数据。EMR还可以与阿里云其他的云数据存储系统和数据库系统进行数据传输,例如阿里云OSS和RDS等。
阿里云EMR提供了三种产品形态,以满足不同用户的需求:
- EMR on ECS:EMR负责将开源Hadoop生态的组件安装部署在ECS上,并启动相应的服务。用户可以在EMR控制台完成对集群ECS及服务的运维操作。这种形态适合需要完全掌控集群基础设施、对底层资源有自定义需求的场景。
- EMR on ACK:用户需要先完成ACK集群的安装部署。当ACK集群准备就绪后,EMR将基于ACK的资源安装部署大数据服务组件,并在容器内运行。这种形态适合已经采用容器化架构、希望统一管理大数据和在线业务的团队。
- EMR Serverless Spark:这是一款面向Data+AI的高性能Lakehouse产品,为企业提供了一站式的数据平台服务,包括任务开发、调试、调度及运维等功能。它100%兼容开源Spark生态,能够无缝集成到客户现有的数据平台。这种形态适合希望免去集群运维负担、专注于数据处理分析的团队。
在实际生产环境中,EMR on ECS是最为广泛使用的形态,本文后续的对接使用讲解将主要围绕EMR on ECS展开。
需要先登录阿里云控制台,点击:阿里云控制台
2. 使用前的准备工作
2.1 账号注册与实名认证
使用EMR服务前,首先需要注册阿里云账号并完成实名认证。这是使用任何阿里云云服务的基础前置条件。
2.2 服务角色授权
完成对E-MapReduce的服务账号授予默认的EMR和ECS角色权限。这一步是必须的,因为EMR在创建集群、管理ECS实例、访问OSS存储等操作时,需要以服务角色的身份调用其他阿里云服务的API。授权操作可以在EMR控制台的引导下完成,系统会自动创建所需的默认角色。
2.3 RAM用户与权限规划
为确保阿里云账号及云资源的使用安全,如非必要都应避免直接使用阿里云账号(即主账号)来访问EMR集群。推荐的做法是使用RAM身份(即RAM用户和RAM角色)来访问EMR on ECS集群。
RAM用户需要由阿里云账号或拥有管理员权限的RAM用户、RAM角色来创建,且必须在获得授权后才能登录控制台或使用API访问阿里云账号下的资源。在权限管理上,应遵循最小权限原则——授予用户执行某项任务所需的权限,不授予其他无需用到的权限。
对于通过控制台访问的用户,推荐为其开启MFA多因素认证。不要把RAM用户的AccessKey ID和AccessKey Secret保存在工程代码中,否则可能导致AK泄露。建议使用STS或环境变量等方式获取访问授权。
阿里云EMR提供了多种系统权限策略供用户选择:
- AliyunEMRFullAccess:管理E-MapReduce的权限,包括EMR on ECS和EMR on ACK的所有权限。
- AliyunEMRDevelopAccess:E-MapReduce的开发者权限,包括查看集群信息、管理旧版数据开发项目的权限。
- AliyunEMRServerlessSparkFullAccess:管理EMR Serverless Spark的权限。
- AliyunEMRServerlessSparkDeveloperAccess:EMR Serverless Spark的开发者权限。
2.4 网络与地域规划
EMR集群创建后无法更改地域,因此需要谨慎选择。建议将EMR集群部署在与业务数据源(如OSS Bucket、RDS实例)相同的地域,以便利用内网高速访问,避免产生额外的外网流量费用。
网络方面,EMR集群通常部署在专有网络VPC中。如果集群需要对外提供公网访问能力(例如通过公网提交作业或访问Web UI),则需要提前规划好公网IP或NAT网关等网络出口方案。
3. 创建EMR集群
3.1 进入创建集群页面
登录阿里云控制台后,进入E-MapReduce产品页面。在顶部菜单栏处,根据实际情况选择地域和资源组。单击"创建集群"按钮进入集群配置页面。
3.2 关键配置项详解
在创建集群页面,需要完成以下关键配置:
- 地域:集群节点ECS实例所在的物理位置。集群创建后无法更改地域,请谨慎选择。
- 业务场景:选择适合的业务场景,阿里云EMR会自动为您配置默认的组件、服务和资源。常见场景包括数据湖、数据服务、OLAP等。
- 产品版本:选择最新的软件版本。如需使用OSS-HDFS服务,需确保选择的产品版本为EMR-3.46.2及以上版本或者EMR-5.12.2及以上版本。
- 服务高可用:默认不开启。打开后EMR会把Master节点分布在不同的底层硬件上以降低故障风险。
- 可选服务:根据实际需求选择组件,被选中的组件会默认启动相关的服务进程。如需通过控制台访问Web UI界面,还需选择Knox和OpenLDAP服务。
- 元数据:可以选择内置MySQL(适合测试环境)或自建RDS、DLF统一元数据(推荐生产环境使用)。
- 集群存储根路径:当选择了OSS-HDFS服务时,需要配置该参数。需确保选择的Bucket已开通OSS-HDFS服务。
3.3 集群创建后的验证
集群创建完成后,可以通过以下方式验证集群是否正常运行:
- 在EMR控制台的集群列表页面查看集群状态是否为"运行中"。
- 通过集群的Access Links访问对应的Web UI界面(如YARN UI、Spark UI等),确认服务正常启动。
- 登录集群Master节点,执行基本的Hadoop或Spark命令测试环境。
4. 数据源对接
4.1 对接OSS对象存储
OSS是EMR最常用的数据存储方式之一。EMR集群可以通过OSS-HDFS服务或直接使用OSS SDK来读写OSS中的数据。
4.1.1 通过OSS-HDFS服务对接
EMR特定版本(EMR-3.46.2及以上版本或EMR-5.12.2及以上版本)集群默认整合OSS-HDFS服务。在创建集群时,将集群存储根路径设置为已开通OSS-HDFS服务的Bucket即可。
通过HDFS Shell命令可以完成OSS-HDFS服务的常见操作:
# 上传文件到OSS-HDFS
hdfs dfs -put /local/path/examplefile.txt oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com/
# 从OSS-HDFS下载文件
hdfs dfs -get oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com/exampleobject.txt /tmp/
# 列出OSS-HDFS目录内容
hdfs dfs -ls oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com/
# 创建目录
hdfs dfs -mkdir oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com/data/
4.1.2 在MapReduce中读写OSS
在MapReduce作业中读写OSS文件,需要配置相应的参数:
// 在MapReduce作业中配置OSS访问参数
Configuration conf = new Configuration();
// 设置OSS访问凭证(强烈建议使用RAM用户AK)
conf.set("fs.oss.accessKeyId", "your-access-key-id");
conf.set("fs.oss.accessKeySecret", "your-access-key-secret");
// 设置OSS endpoint
conf.set("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
// 设置输入输出路径为OSS路径
FileInputFormat.setInputPaths(job, new Path("oss://your-bucket/input/"));
FileOutputFormat.setOutputPath(job, new Path("oss://your-bucket/output/"));
重要提示:阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。强烈建议不要将AccessKey直接硬编码在代码中。
4.1.3 在Spark中读写OSS
# 在Spark中读写OSS数据
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("OSSReadWrite") \
.getOrCreate()
# 从OSS读取数据
df = spark.read.parquet("oss://your-bucket/data/user_logs.parquet")
# 数据处理
df_filtered = df.filter(df["age"] > 18)
# 将结果写回OSS
df_filtered.write.parquet("oss://your-bucket/output/result.parquet")
spark.stop()
4.2 对接Tablestore(表格存储)
EMR集群支持通过Spark SQL访问表格存储(Tablestore)中的数据。对于流计算场景,基于通道服务,利用CDC(数据变更捕获)技术完成Spark的mini batch流式消费和计算,同时提供了at-least-once一致性语义。
-- 在Spark SQL中创建Tablestore外部表
CREATE TABLE tablestore_user_logs (
user_id STRING,
action STRING,
timestamp BIGINT
)
USING tablestore
OPTIONS (
endpoint "https://your-instance.cn-hangzhou.ots.aliyuncs.com",
instance "your-instance-name",
table "user_logs",
accessKeyId "your-access-key-id",
accessKeySecret "your-access-key-secret"
);
-- 查询Tablestore中的数据
SELECT user_id, COUNT(*) as action_count
FROM tablestore_user_logs
WHERE timestamp > 1700000000
GROUP BY user_id;
4.3 对接RDS关系型数据库
EMR集群可以通过JDBC方式对接RDS等关系型数据库。在Spark作业中,可以使用JDBC数据源读写RDS中的数据:
# 在Spark中读写RDS MySQL数据
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("RDSReadWrite") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
# 从RDS读取数据
jdbc_url = "jdbc:mysql://your-rds-endpoint:3306/your_database"
properties = {
"user": "your_username",
"password": "your_password",
"driver": "com.mysql.cj.jdbc.Driver"
}
df = spark.read.jdbc(url=jdbc_url, table="orders", properties=properties)
# 数据处理
df_summary = df.groupBy("status").count()
# 将结果写回RDS
df_summary.write.jdbc(url=jdbc_url, table="order_summary", mode="overwrite", properties=properties)
spark.stop()
4.4 对接Kafka消息队列
EMR DataFlow集群支持通过Flink作业消费Kafka数据。以下示例展示了如何使用Flink将Kafka数据流式写入OSS:
// Flink作业:从Kafka消费数据并写入OSS
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
public class KafkaToOSSJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置Kafka Source
KafkaSource source = KafkaSource.builder()
.setBootstrapServers("your-kafka-broker:9092")
.setTopics("input-topic")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 配置OSS Sink
FileSink sink = FileSink
.forRowFormat(new Path("oss://your-bucket/flink-output/"), new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(5))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
.withMaxPartSize(1024 * 1024 * 128)
.build())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.sinkTo(sink);
env.execute("Kafka to OSS Flink Job");
}
}
5. 任务开发与提交
5.1 Spark SQL任务开发
EMR支持通过多种方式提交Spark SQL任务,包括控制台数据开发页面、命令行spark-sql、以及通过API提交。
以下是一个完整的Spark SQL示例,用于创建表并执行数据分析:
-- 创建学生信息表
CREATE TABLE IF NOT EXISTS students_info (
student_id STRING COMMENT '学生ID',
name STRING COMMENT '姓名',
age INT COMMENT '年龄',
grade STRING COMMENT '年级',
score DOUBLE COMMENT '成绩'
)
USING parquet
LOCATION 'oss://your-bucket/data/students/';
-- 加载数据(从CSV文件加载)
INSERT INTO students_info
SELECT
student_id,
name,
CAST(age AS INT),
grade,
CAST(score AS DOUBLE)
FROM csv_table;
-- 执行分析查询
SELECT
grade,
COUNT(*) as student_count,
AVG(score) as avg_score,
MAX(score) as max_score,
MIN(score) as min_score
FROM students_info
GROUP BY grade
ORDER BY grade;
-- 查询年龄大于18岁的学生
SELECT student_id, name, age, score
FROM students_info
WHERE age > 18 AND score > 80
ORDER BY score DESC;
5.2 WordCount经典示例
WordCount是Hadoop中最基础且经典的分布式计算任务,用于统计海量文本中的单词数量。以下是在EMR集群中提交WordCount作业的完整流程:
# 1. 准备输入数据
hdfs dfs -mkdir -p /wordcount/input/
echo "Hello World Hello Hadoop" > /tmp/input.txt
echo "Hello EMR Spark Flink" >> /tmp/input.txt
hdfs dfs -put /tmp/input.txt /wordcount/input/
# 2. 提交Spark WordCount作业
spark-submit \
--master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.JavaWordCount \
/usr/lib/spark/examples/jars/spark-examples_2.12-3.4.0.jar \
/wordcount/input/ \
/wordcount/output/
# 3. 查看结果
hdfs dfs -cat /wordcount/output/part-*
5.3 PyFlink流式作业
EMR DataFlow集群支持PyFlink流式作业。以下示例展示了如何从Kafka读取支付事件数据并进行实时聚合:
# PyFlink流式作业示例
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
env = StreamExecutionEnvironment.get_execution_environment()
# 配置Kafka消费者
kafka_consumer = FlinkKafkaConsumer(
topics='payment-msg',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': 'your-kafka-broker:9092', 'group.id': 'pyflink-group'}
)
# 从Kafka读取数据流
data_stream = env.add_source(kafka_consumer)
# 解析JSON数据并提取字段
payment_stream = data_stream.map(
lambda line: json.loads(line),
output_type=Types.ROW([Types.STRING(), Types.STRING(), Types.DOUBLE()])
).map(lambda record: (record['province'], record['amount']),
output_type=Types.TUPLE([Types.STRING(), Types.DOUBLE()]))
# 5秒滚动窗口聚合
result_stream = payment_stream.key_by(lambda x: x[0]) \
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) \
.reduce(lambda a, b: (a[0], a[1] + b[1]))
# 将结果写入Kafka Sink
result_stream.add_sink(kafka_sink)
env.execute("PyFlink Payment Aggregation")
5.4 通过DataWorks调度EMR任务
DataWorks支持基于EMR计算引擎创建Hive、MR、Presto和Spark SQL等节点,实现EMR任务工作流的配置、定时调度和元数据管理等功能。
使用DataWorks调度EMR任务的基本流程:
- 在DataWorks中创建工作空间。
- 将EMR集群注册至DataWorks。
- 创建数据源(如OSS、MySQL等)。
- 在数据开发中创建EMR节点(Hive、Spark SQL、MR等)。
- 配置任务工作流和定时调度。
- 设置数据质量监控告警。
6. 集群管理与运维
6.1 集群扩缩容
当EMR集群计算资源或存储资源不足时,可以通过水平扩展Core节点和Task节点来满足需求。EMR on ECS支持多种弹性策略,可按时间或负载自动扩缩集群计算资源,分钟级变化资源规模。
6.1.1 手动扩容
手动扩容的操作步骤:
- 进入节点管理页面,选择目标集群。
- 单击目标节点组操作列的扩容。
- 在扩容对话框中设置需要增加的节点数量。
- 确认配置后单击确定。
注意:不支持扩容Master节点组,只能扩展Core节点和Task节点,且新扩展节点的配置默认与已有节点一致。集群扩容操作不会重启存量节点上的应用进程。
6.1.2 弹性伸缩
可以为节点组添加弹性伸缩规则,当规则触发后,节点组会按照规则预置的策略自动进行扩容或缩容。EMR Serverless能够提供极致的资源弹性与稳定性,支持根据业务负载自动伸缩、按秒计费的资源弹性能力。
6.2 集群监控
EMR提供丰富的服务监控指标和主机监控指标展示,通过可视化的方式快速定位服务和主机异常。EMR会记录当前集群资源发生的系统事件,并自动将其同步到云监控服务。
主要监控功能包括:
- 事件中心:用于记录集群发生的重要事件,包括集群事件、系统事件和健康检查事件。
- 事件等级:分为CRITICAL(严重)、WARN(警告)、INFO(消息)三个等级。
- 监控指标:可在EMR控制台的Monitoring标签页查看集群的各项监控指标。
可以单击事件中心页面右侧的"订阅事件告警",在云监控控制台设置事件告警。当EMR发生系统异常时,能及时接收报警通知并处理异常。
6.3 服务配置管理
EMR支持快速添加服务,可以监控服务的状态,并对服务组件进行配置和运维操作。用户可以修改在EMR集群上运行的应用程序的配置,而无需重新启动集群。EMR应用程序重新配置功能让用户可以即时修改应用程序,而无需关闭或重新创建集群。
7. 安全管理
7.1 RAM权限管理
RAM身份默认没有任何权限,需要由阿里云账号管理员为其授权后才能访问阿里云账号下的资源。为保证资源的数据安全,建议遵循最小授权原则为允许访问云资源的身份授予恰好够用的权限。
授权操作步骤:
- 登录RAM控制台。
- 创建RAM用户或RAM角色。
- 为RAM身份授权适当的权限策略(系统策略或自定义策略)。
- 当系统策略过于宽泛时,可以创建自定义RAM策略来执行最小权限原则。
7.2 数据安全
EMR支持与阿里云DLF(Data Lake Formation)集成,实现湖仓元数据全面打通,确保数据访问一致性与权限管理完整。在生产环境中,建议:
- 使用RAM子账号进行日常运维,避免使用主账号AK。
- 开启服务高可用模式,将Master节点分布在不同的底层硬件上。
- 配置合适的网络安全组规则,限制不必要的网络访问。
- 定期审计RAM权限和操作日志。
8. 成本优化最佳实践
8.1 计算与存储分离架构
EMR解耦了计算与存储之间的绑定关系,实现了资源的弹性利用。通过将数据存储在OSS中,计算集群可以根据需要随时创建和释放,大幅降低存储成本。
8.2 多种计费方式
E-MapReduce提供包年包月、按量付费、抢占式实例等多种计费方式:
- 按量付费:适合短期需求,随用随开。
- 包年包月:适合长期需求,购买时长越长折扣越多。
- 抢占式实例:可进一步降低成本。
8.3 倚天ARM架构
EMR on ECS支持倚天架构,基于自研芯片倚天710,软硬协同,性价比提升40%以上。对于计算密集型场景,选择倚天架构实例可以显著降低计算成本。
8.4 弹性伸缩降本
通过配置弹性伸缩策略,在业务低峰期自动缩容节点,高峰期自动扩容,避免资源闲置浪费。EMR Serverless实例不采用固定规格,计算资源会在指定范围内根据业务负载自动弹性伸缩。
8.5 数据存储分层
利用OSS的生命周期管理功能,将冷数据自动转储到低频访问或归档存储类型,进一步降低存储成本。同时,利用同地域内网访问免流量特性,将EMR集群与OSS部署在同一地域。
9. 迁移上云方案
自建Hadoop集群迁移到E-MapReduce可以通过OSS进行过渡,或者使用阿里云Express Connect产品建立线下IDC和线上EMR所在VPC网络的连通。迁移过程将充分考虑旧集群的版本、元数据类型以及存储方式,并针对这些因素提供适应新集群的迁移策略与步骤。
基本迁移流程:
- 评估现有集群的规模、版本和组件依赖。
- 在阿里云上创建目标EMR集群,配置合适的规格和组件。
- 通过OSS作为数据中转,将HDFS数据迁移到OSS。
- 验证数据完整性和作业兼容性。
- 切换业务流量到EMR集群。
- 下线旧集群。
10. 常见问题解答
问:EMR on ECS、EMR on ACK和EMR Serverless Spark三者如何选择?
答:如果希望完全掌控集群基础设施、对底层资源有自定义需求,选择EMR on ECS;如果已经采用容器化架构、希望统一管理大数据和在线业务,选择EMR on ACK;如果希望免去集群运维负担、专注于数据处理分析,选择EMR Serverless Spark。
问:创建EMR集群时,地域选择有什么注意事项?
答:EMR集群创建后无法更改地域。建议将EMR集群部署在与业务数据源(如OSS Bucket、RDS实例)相同的地域,以便利用内网高速访问,避免产生额外的外网流量费用。
问:如何安全地管理EMR的访问凭证?
答:推荐使用RAM身份(RAM用户和RAM角色)来访问EMR。不要把RAM用户的AccessKey硬编码在代码中,建议使用STS或环境变量等方式获取访问授权。遵循最小权限原则,仅授予执行任务所需的权限。
问:EMR集群如何实现自动扩缩容?
答:EMR on ECS支持多种弹性策略,可按时间或负载自动扩缩集群计算资源。可以为节点组添加弹性伸缩规则,当规则触发后,节点组会按照预置策略自动进行扩容或缩容。EMR Serverless支持根据业务负载自动伸缩、按秒计费。
问:EMR集群如何对接OSS存储?
答:EMR可以通过OSS-HDFS服务对接OSS。创建集群时选择EMR-3.46.2及以上版本或EMR-5.12.2及以上版本,并将集群存储根路径设置为已开通OSS-HDFS服务的Bucket。也可以通过配置OSS访问参数在MapReduce或Spark作业中直接读写OSS。
问:如何降低EMR的使用成本?
答:可以从以下几个方面优化成本:将EMR集群与数据源部署在同一地域以利用内网免流量;根据业务需求选择合适的计费方式(按量付费、包年包月或抢占式实例);配置弹性伸缩策略避免资源闲置;选择倚天ARM架构实例提升性价比;利用OSS生命周期管理将冷数据转储到低成本存储类型。




