华为云数据接入服务DIS完全对接指南:从通道创建到生产级数据管道搭建
1. 数据接入服务DIS概述
数据接入服务(Data Ingestion Service,简称DIS)是华为云提供的实时数据流处理服务,旨在为处理或分析流数据的自定义应用程序构建数据流管道。DIS主要解决云服务外的数据实时传输到云服务内的问题,能够从IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等多种数据源中连续捕获、传送和存储数TB数据。
从架构视角来看,DIS在数据流转过程中扮演着枢纽角色:数据从各类源头进入DIS通道,经过缓存与初步处理后,再流向OBS、DLI、MRS、DWS等下游存储与计算服务,形成完整的数据流水线。DIS对数据传输所需的基础设置、存储、网络和配置进行统一管理,用户无需为数据通道担心配置、部署、持续的硬件维护等问题。此外,DIS还可在云区域同步复制数据,提供数据高可用性和数据持久性。
1.1 核心能力与关键特性
DIS具备多项关键能力,使其在实时数据接入场景中具备显著优势。在吞吐量方面,DIS数据通道的吞吐量每小时可从数MB扩展到数TB,PUT记录每秒钟可从数千次扩展到数百万。这意味着无论是小型创业公司的日志采集,还是大型物联网平台的海量设备数据上报,DIS都能提供弹性伸缩的接入能力。
在易用性方面,用户可以在几秒钟内创建DIS数据通道,轻松地将数据放入通道中,并构建用于数据处理的应用程序。服务支持秒级开通,用户配置SDK或Agent即可实现免编程数据采集。在成本方面,DIS没有前期成本,用户只需要为实际使用的资源付费即可。
在并发与实时性方面,DIS支持百万并发连接,数据请求毫秒级响应,单分区支持最高每日百GB级别的写入量。在安全性方面,DIS采用HTTPS加密传输,实现租户间资源及操作隔离,保护系统和用户的隐私及数据安全,数据跨AZ存储,服务可用性达到99.9%。在数据可靠性方面,DIS可将数据保留24至72小时,以防数据在应用程序故障、个别机器故障或设施故障时丢失。
1.2 DIS vs 自建Kafka
许多用户在接触DIS时会将其与自建Kafka集群进行比较。DIS在华为云场景下具备多方面的独特优势。在接入层面,DIS支持全局接入,能够解决公网、跨VPC等访问互通问题。在运维层面,DIS采用Serverless架构,Topic以服务形式提供,用户只需关注业务带宽,系统按需使用、自动扩缩容。在扩展性方面,DIS可实现无限扩容,超越单Kafka集群的吞吐能力上限。在高可用方面,DIS支持多AZ跨集群级别的高可用。在接口层面,DIS提供REST协议接口,跨平台能力更强。此外,DIS天然集成华为云的认证、监控、企业项目管理等基础生态,与云上其他大数据服务(OBS、DWS、DLI、MRS等)无缝对接。相比用户自建数据采集或传输系统,DIS的成本可降低约5倍。
需要先登录华为云控制台,点击:华为云控制台,还没有账号,点击:注册并关联,已有账号点击:登录后关联
2. DIS通道的创建与配置
DIS通道是数据接入的核心载体,所有数据的上传与下载都围绕通道进行。本节详细讲解如何创建和配置DIS通道。
2.1 开通DIS通道
开通DIS通道的操作步骤如下:
- 登录DIS控制台。
- 在控制台左上角选择区域。
- 单击“购买接入通道”配置相关参数。
2.2 通道参数详解
创建通道时需要配置一系列参数,以下逐项说明:
计费模式:当前DIS仅支持按需计费模式,用户按实际使用量付费。
区域:选择云服务所在的物理位置,建议选择与下游计算或存储服务(如OBS、DLI)相同的区域,以降低内网访问延迟和流量费用。
通道名称:用户发送或接收数据时需要指定通道名称,名称不可重复。通道名称由英文字母、数字、中划线和下划线组成,长度为1至64个字符。
通道类型:分为普通通道和高级通道两种。
- 普通通道:单分区最高发送速度可达1MB/s或1000条记录/s(达到任意一种速度上限即被限流),最高提取速度可达2MB/s,单次请求的记录总大小不能超过1MB(不包含partitionKey数据大小)。
- 高级通道:单分区最高发送速度可达5MB/s或2000条记录/s,最高提取速度可达10MB/s,单次请求的记录总大小不能超过5MB。
分区数量:分区是DIS数据通道的基本吞吐量单位。创建通道时需要指定分区数量,分区数量决定了通道的总吞吐能力。以普通通道为例,若创建5个分区,则总发送能力为5MB/s或5000条记录/s。
生命周期:数据在DIS中保留的最长时间,超过此时长数据将被自动清除。取值范围为24至72小时。
源数据类型:支持JSON、CSV等格式。选择源数据类型后,可以配置数据Schema,便于下游服务解析数据。
自动扩缩容:创建通道时可开启自动扩缩容功能,设置最小分区数和最大分区数,系统将根据实际流量自动调整分区数量。
2.3 分区规划与弹性伸缩
分区规划是DIS通道配置中最关键的环节之一。分区的数量直接决定了通道的数据吞吐能力。对于普通通道,每个分区提供1MB/s的写入能力;对于高级通道,每个分区提供5MB/s的写入能力。
在实际规划中,建议根据业务峰值流量计算所需分区数。例如,若业务峰值写入速度为10MB/s,使用普通通道则需要至少10个分区。同时需要考虑未来的业务增长,适当预留分区余量。
DIS支持对已创建的通道进行弹性伸缩,既可以手动扩缩容,也可以开启自动扩缩容。进行弹性伸缩时需注意:上传数据时建议不要设置数据的PartitionKey,DIS会自动根据通道分区的数量将数据均匀散列到多个分片中。如果设置了PartitionKey,可能导致数据倾斜,产生通道限流。
3. 开发环境准备与认证配置
在开始使用DIS SDK进行开发之前,需要完成开发环境的准备和认证信息的配置。
3.1 环境要求
对于Java开发者,需要安装JDK 1.8或以上版本,以及Eclipse或IntelliJ IDEA等集成开发环境。对于Python开发者,需要安装Python 3.x版本及pip包管理工具。
3.2 获取SDK
DIS SDK是对DIS服务提供的REST API进行的封装,以简化用户的开发工作。用户可以从以下途径获取SDK:
- Java SDK:从GitHub仓库huaweicloud-sdk-java-dis下载,或从DIS SDK桶获取压缩包。
- Python SDK:通过pip安装huaweicloud-sdk-dis。
3.3 获取认证信息
使用DIS SDK需要以下认证信息:
- AK(Access Key ID)和SK(Secret Access Key):在IAM(统一身份认证服务)中获取。
- Project ID:在“我的凭证”中查看。
- Region:DIS服务所在的区域,如cn-north-4。
- Endpoint:DIS服务的访问域名。
安全建议:AK和SK应存储在环境变量或配置文件中,并通过加密方式保护,避免硬编码在代码中。
4. 通过SDK上传数据到DIS
本节以Java和Python两种语言为例,演示如何通过SDK将数据上传到DIS通道。
4.1 Java SDK上传数据
首先在Maven项目中添加DIS Java SDK依赖:
<dependency>
<groupId>com.huaweicloud.dis</groupId>
<artifactId>huaweicloud-dis-kafka-adapter</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>以下是使用Java SDK上传数据的完整示例:
import com.huaweicloud.sdk.core.auth.BasicCredentials;
import com.huaweicloud.sdk.core.auth.ICredential;
import com.huaweicloud.sdk.dis.v2.DisClient;
import com.huaweicloud.sdk.dis.v2.model.PutRecordsRequest;
import com.huaweicloud.sdk.dis.v2.model.PutRecordsRequestEntry;
import com.huaweicloud.sdk.dis.v2.region.DisRegion;
public class DisProducer {
public static void main(String[] args) {
// AK和SK建议从环境变量获取,避免硬编码
String ak = System.getenv("CLOUD_SDK_AK");
String sk = System.getenv("CLOUD_SDK_SK");
ICredential auth = new BasicCredentials()
.withAk(ak)
.withSk(sk);
DisClient client = DisClient.newBuilder()
.withCredential(auth)
.withRegion(DisRegion.valueOf("cn-north-4"))
.build();
// 构造上传请求
PutRecordsRequest request = new PutRecordsRequest();
request.setStreamName("your-dis-channel-name");
// 构造数据记录
PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
// 数据需要Base64编码
String data = "{\"temperature\":25.5,\"humidity\":60,\"timestamp\":1623456789}";
entry.setData(java.util.Base64.getEncoder().encodeToString(data.getBytes()));
request.setRecords(java.util.Arrays.asList(entry));
try {
var response = client.putRecords(request);
System.out.println("上传成功,失败记录数:" + response.getFailedRecordCount());
} catch (Exception e) {
System.err.println("上传失败:" + e.getMessage());
}
}
}4.2 Python SDK上传数据
首先安装Python SDK:
pip install huaweicloud-sdk-dis以下是使用Python SDK上传数据的完整示例:
import os
import base64
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkdis.v2.region.dis_region import DisRegion
from huaweicloudsdkdis.v2 import DisClient, PutRecordsRequest, PutRecordsRequestEntry
if __name__ == "__main__":
# AK和SK从环境变量获取
ak = os.getenv("CLOUD_SDK_AK")
sk = os.getenv("CLOUD_SDK_SK")
credentials = BasicCredentials(ak, sk)
client = DisClient.new_builder() \
.with_credentials(credentials) \
.with_region(DisRegion.value_of("cn-north-4")) \
.build()
# 构造上传请求
request = PutRecordsRequest()
request.stream_name = "your-dis-channel-name"
# 构造数据记录,数据需要Base64编码
data = '{"temperature":25.5,"humidity":60,"timestamp":1623456789}'
entry = PutRecordsRequestEntry()
entry.data = base64.b64encode(data.encode()).decode()
request.records = [entry]
try:
response = client.put_records(request)
print(f"上传成功,失败记录数:{response.failed_record_count}")
except Exception as e:
print(f"上传失败:{e}")4.3 批量上传与性能优化
在实际生产环境中,建议采用批量上传方式提高吞吐量。PutRecords接口支持在单次请求中上传多条记录,每条记录的大小不能超过通道类型对应的限制(普通通道1MB,高级通道5MB)。
优化建议包括:合理设置批量大小,避免单条记录过大或过小;使用异步发送模式提高吞吐;根据业务特点选择是否设置PartitionKey。
5. 通过SDK从DIS下载数据
数据下载是DIS的另一核心操作。消费者应用程序通过SDK从DIS通道中拉取数据进行处理。
5.1 Java SDK下载数据
import com.huaweicloud.sdk.dis.v2.model.GetRecordsRequest;
import com.huaweicloud.sdk.dis.v2.model.GetRecordsResponse;
public class DisConsumer {
public static void main(String[] args) {
// 认证和客户端初始化同上传示例
// ...
GetRecordsRequest request = new GetRecordsRequest();
request.setStreamName("your-dis-channel-name");
// 从指定分区获取数据
request.setPartitionId("0");
// 设置起始游标,可从上次消费位置继续
request.setStartingSequenceNumber("0");
try {
GetRecordsResponse response = client.getRecords(request);
var records = response.getRecords();
for (var record : records) {
String data = new String(
java.util.Base64.getDecoder().decode(record.getData())
);
System.out.println("收到数据:" + data);
}
} catch (Exception e) {
System.err.println("下载失败:" + e.getMessage());
}
}
}5.2 消费进度管理
在实际应用中,消费者需要管理消费进度,避免重复消费或遗漏数据。DIS提供了多种消费方式:
- 通过指定序列号(sequenceNumber)从特定位置开始消费。
- 通过指定时间戳从特定时间点开始消费。
- 通过检查点(checkpoint)机制记录消费进度,实现断点续传。
6. 使用DIS Agent实现免编程数据采集
DIS Agent是华为云提供的轻量级数据采集工具,无需编写代码即可实现日志文件的实时采集与上传。
6.1 DIS Agent概述
DIS Agent支持自动采集文件中的增量数据或新增文件,提供不同平台的Agent,简单配置即可实现数据采集。Agent支持监控文本文件,实时采集增量数据,按分隔符解析后上传到DIS通道。
6.2 Agent配置文件
DIS Agent的配置文件格式为YAML,各配置项与值之间必须以英文格式的“冒号+空格”形式分隔。以下是一个典型的agent.yml配置示例:
# 区域配置
region: cn-north-4
# AK/SK配置(建议加密存储)
ak: YOUR_AK
sk: YOUR_SK
# Project ID
projectId: YOUR_PROJECT_ID
# DIS服务Endpoint
endpoint: https://dis.cn-north-4.myhuaweicloud.com
# 监控任务配置
flows:
- DISStream: your-dis-channel-name
# 监控的文件路径,支持通配符
filePattern: /var/log/app/*.log
# 起始位置:从文件开头或结尾开始
initialPosition: START_OF_FILE
# 最大缓存时间(毫秒)
maxBufferAgeMillis: 50006.3 部署与启动Agent
在Linux服务器上部署DIS Agent的步骤如下:
- 使用PuTTY工具登录日志所在服务器。
- 执行
cd /opt/dis-agent-X.X.X/命令进入Agent目录。 - 执行
vim conf/agent.yml编辑配置文件。 - 根据实际情况修改配置项的值并保存。
- 执行启动命令运行Agent。
在Windows服务器上,使用文件管理器进入安装包解压目录,用编辑器打开agent.yml文件进行配置。
6.4 Agent运维与验证
Agent运行状态可通过日志查看:
cd /opt/dis-agent-X.X.X/logs
tail -100f dis-agent.log当日志中出现“Agent: Startup completed”信息时,表示Agent正常运行。
7. 数据转储:将DIS数据持久化到目标服务
DIS通道中的数据默认仅保留24至72小时。对于需要长期存储或进一步分析的数据,需要配置转储任务,将数据自动转储到OBS、DLI、MRS、DWS等目标服务。
7.1 添加转储任务
转储任务的配置步骤如下:
- 登录DIS控制台。
- 在左侧列表栏中选择“通道管理”。
- 单击目标通道名称,进入通道管理页面。
- 选择“转储任务”页签。
- 单击“添加转储任务”,配置转储相关参数。
7.2 转储至OBS
将DIS数据转储到OBS是最常见的场景之一。配置转储任务时需要指定OBS桶名称、转储文件前缀、转储周期(30至900秒)等参数。转储过程中,数据会先暂存在OBS的临时目录中,转储完成后临时数据会被清除。
7.3 转储至DLI
将数据转储到数据湖探索DLI,可以实现数据的实时或准实时分析。DLI支持通过SQL直接查询和分析DIS转储过来的数据,适用于日志分析、用户画像、营销推荐等场景。
8. 监控告警与运维管理
生产环境中,对DIS通道的监控和告警是保障数据接入稳定性的重要手段。
8.1 设置告警规则
通过云监控服务CES可以为DIS通道设置告警规则:
- 登录管理控制台,选择“管理与监管 > 云监控服务 CES”。
- 在云监控服务左侧导航栏选择“告警 > 告警规则”。
- 单击“创建告警规则”。
- 根据界面提示设置DIS通道的告警规则。
8.2 订阅事件通知
DIS支持事件通知功能,可以订阅指定通道的告警事件。开启后配置通道名称,即可订阅指定通道的告警,避免接收其他通道的无关告警。
8.3 常见监控指标
建议重点监控以下指标:通道的写入速率、读取速率、分区使用率、请求延迟、错误率等。当通道接近吞吐量上限时,应及时进行分区扩容或开启自动扩缩容。
9. IAM权限管理
DIS的权限管理通过IAM(统一身份认证服务)实现。默认情况下,管理员创建的IAM用户没有任何权限,需要将其加入用户组并授予相应权限。
9.1 授权流程
授权流程如下:
- 在IAM控制台创建用户组。
- 为用户组授予DIS的相关权限,如“DIS Operator”(通道管理权限)。
- 创建IAM用户并将其加入用户组。
9.2 权限规划建议
在实际项目中,建议遵循最小权限原则:不同角色分配不同权限。例如,数据生产者只需要写入权限,数据消费者只需要读取权限,管理员拥有完整权限。避免使用主账号AK/SK进行日常操作,应使用IAM子账号的AK/SK。
10. 常见错误码与问题排查
在使用DIS过程中可能遇到各类错误,以下列出常见错误码及其处理方式。
10.1 认证类错误
DIS.4100 Authorization error:认证失败。通常原因是AK/SK不正确、Project ID错误或IAM权限不足。解决方案是检查认证信息配置,确认用户拥有相应权限。
10.2 限流类错误
当写入速度超过通道分区能力时,会触发限流。解决方案包括:增加分区数量、升级为高级通道、优化批量上传策略、检查是否因设置PartitionKey导致数据倾斜。
10.3 Agent相关错误
HttpClientErrorException: 403 Forbidden:可能原因是DIS网关将Agent所在服务器的IP地址加入了黑名单。解决方案是停止Agent进程,修改agent.yml配置,等待30分钟后重启Agent。
10.4 Schema错误
DIS.4345 Invalid CloudTable schema:创建转储任务时Schema配置无效。解决方案是检查JSON属性名称是否存在、参数是否合法。
11. 实战案例:车联网驾驶行为日志采集与分析
本节通过一个完整的车联网场景案例,串联全文的核心知识点。
11.1 场景描述
某车企需要实时采集车辆的驾驶行为日志数据(包括速度、加速度、转向角度、GPS位置等),上传到华为云进行分析,以支持驾驶习惯优化等增值服务。
11.2 技术方案
整体技术方案如下:
- 在每辆车的车载终端部署DIS Agent,实时采集日志文件。
- Agent将增量数据上传到DIS通道。
- DIS通道配置转储任务,将数据自动转储到OBS。
- 通过DLI对OBS中的数据进行SQL分析。
- 将分析结果应用于业务系统。
11.3 实施步骤
第一步:申请OBS桶。创建一个OBS桶用于存放DIS转储的数据。
第二步:申请DIS通道。根据预估的车辆数量和数据量,选择合适的通道类型和分区数量。假设有10000辆车,每辆车每分钟产生10条日志,每条日志1KB,则总写入速度为10000×10×1KB/60≈1.67MB/s。使用普通通道,需要至少2个分区。
第三步:添加转储任务。将DIS通道的数据转储到OBS桶中。
第四步:获取认证信息。在IAM中获取AK/SK和Project ID。
第五步:安装和配置DIS Agent。在车载终端安装DIS Agent,配置agent.yml文件,指定通道名称和日志文件路径。
第六步:启动DIS Agent。启动Agent开始采集日志。
第七步:在OBS查看上传文件。验证数据是否成功转储。
第八步:创建数据库和OBS表。在DLI中创建数据库和OBS外表。
第九步:执行数据查询与分析。通过DLI SQL分析驾驶行为数据。
12. 总结与最佳实践建议
本文系统介绍了华为云数据接入服务DIS的对接使用方法,从通道创建、SDK开发、Agent采集到数据转储、监控运维,覆盖了完整的数据接入链路。
以下是一些生产环境中的最佳实践建议:
- 合理规划分区:根据业务峰值流量计算分区数量,预留一定的扩展余量,开启自动扩缩容功能应对流量波动。
- 安全使用AK/SK:将AK/SK存储在环境变量或配置文件中,避免硬编码在代码中。使用IAM子账号而非主账号。
- 批量上传优化性能:使用PutRecords接口批量上传数据,提高吞吐量。
- 谨慎使用PartitionKey:除非有明确的数据分区需求,否则建议让DIS自动分配分区,避免数据倾斜。
- 配置合理的生命周期:根据业务需求设置数据保留时间(24至72小时),平衡存储成本与数据可用性。
- 建立监控告警体系:对通道的写入速率、错误率等关键指标设置告警,及时发现和处理异常。
- 定期review转储任务:确保转储任务正常运行,数据能够按时持久化到目标服务。
通过以上实践,开发者可以快速构建稳定、高效、安全的实时数据接入管道,充分发挥华为云DIS在大数据场景中的价值。
常见问题解答
Q1:DIS通道创建后能否修改通道类型(普通/高级)?
通道类型在创建时确定,创建后不支持直接修改。如果业务增长需要更高吞吐能力,可以通过增加分区数量来提升总吞吐量,或新建高级通道并迁移数据。
Q2:DIS中的数据最长能保存多久?
DIS中的数据保留时间范围为24至72小时。超过设置的生命周期后,数据将被自动清除。如需长期存储,应配置转储任务将数据持久化到OBS等目标服务。
Q3:使用DIS Agent时,如果网络中断怎么办?
DIS Agent具备断点续传能力。当网络恢复后,Agent会从上次中断的位置继续采集和上传数据,确保数据不丢失。可通过查看Agent日志确认恢复状态。
Q4:如何判断当前通道是否需要扩容?
可以通过云监控服务CES查看通道的写入速率和限流次数等指标。如果频繁触发限流或写入速率持续接近分区上限,则需要增加分区数量或升级为高级通道。
Q5:DIS与华为云其他服务如何集成?
DIS提供多种Connector,可对接OBS、DLI、MRS、DWS等云服务。通过配置转储任务,数据可自动流向这些目标服务,实现数据的存储、计算和分析。此外,DIS还支持与FunctionGraph函数工作流集成,实现数据的实时处理。
Q6:DIS支持哪些数据源类型?
DIS支持多种数据源接入方式:通过SDK(Java/Python/Go等)进行程序化接入,通过Agent进行文件采集接入,通过REST API进行HTTP接入,以及通过Kafka Adapter实现从Kafka的平滑迁移。数据源类型涵盖IoT设备数据、应用日志、网站点击流、社交媒体数据等。




