华为云ROMA Connect应用与数据集成平台对接使用全攻略
1. ROMA Connect平台概览
应用与数据集成平台ROMA Connect是华为云推出的全栈式集成服务,源自华为自身数字化转型的集成实践。该平台聚焦于应用和数据的连接,提供数据、API、消息和设备的集成能力,适用于多种常见的企业系统集成场景。无论是云上云下系统互通、跨区域数据同步,还是设备接入与消息分发,ROMA Connect都能提供一站式的解决方案。
ROMA Connect的核心功能模块包括四个部分:数据集成FDI(Fast Data Integration)负责不同数据源之间的数据同步与转换;服务集成APIC(API Connect)负责将后端服务、数据源和自定义函数封装成标准RESTful API并对外开放;消息集成MQS(Message Queue Service)提供兼容Kafka的消息通道,实现系统间的异步通信;设备集成LINK则通过标准MQTT协议连接物联网设备,实现设备快速上云管理。此外,ROMA Connect还提供了组合应用功能,支持对各种触发器、连接器和处理器进行编排,形成新的组合应用,实现业务的快速复制和创新。
需要先登录华为云控制台,点击:华为云控制台,还没有账号,点击:注册并关联,已有账号点击:登录后关联
2. 前期准备:创建实例与集成应用
在使用ROMA Connect之前,需要完成实例创建和集成应用配置两个关键步骤。
2.1 创建ROMA Connect实例
实例是ROMA Connect运行的基础资源。登录华为云控制台后,进入ROMA Connect服务页面,单击"创建实例",根据实际业务需求选择合适的规格配置。实例规格决定了可用的连接数、吞吐量等性能指标,建议根据预期的数据集成量、API调用频次和消息吞吐量进行评估选择。创建完成后,在实例页面单击实例名称即可进入实例控制台,开始后续的配置工作。
2.2 创建集成应用
集成应用是ROMA Connect中资源归属的逻辑单元。实例中的所有资源——包括数据源、API、Topic、设备等——都必须归属于某个集成应用。创建集成应用的操作非常简单:在实例控制台的左侧导航栏选择"集成应用",单击"创建集成应用",填写应用名称和描述即可。每个集成应用会自动生成一个Key值,该Key在后续的API调用和消息认证中会频繁使用。建议按照项目或业务域来划分集成应用,便于资源的统一管理和权限控制。
3. 数据集成FDI:打通数据孤岛
数据集成是ROMA Connect最基础也是最重要的能力之一。FDI组件支持接入关系型数据库、大数据存储、半结构化存储、消息系统等多种类型的数据源,通过配置集成任务实现源端到目标端的数据同步。
3.1 接入数据源
在创建数据集成任务之前,必须先接入源端和目标端的数据源。操作路径为:登录ROMA Connect实例控制台,在左侧导航栏选择"数据源管理",单击页面右上角的"接入数据源"。在接入数据源页面,选择对应的数据源类型(如MySQL、Oracle、HIVE、OBS等),然后配置连接信息。
以接入HIVE数据源为例,需要配置的参数包括:数据源名称(自定义)、编码格式(默认utf-8)、归属的集成应用、连接模式(默认或专业)、连接地址(IP和端口)、数据库名、用户名和密码等。如果选择"专业"连接模式,则需要填写JDBC格式的连接串,例如:jdbc:hive2://{hostname}:{port}/{dbname}。配置完成后,单击"开始检测"验证网络连通性——只有显示"数据源连接成功"才能继续下一步。
值得注意的是,ROMA Connect实例与数据源所在网络必须互通。如果数据源部署在华为云VPC内,需要确保ROMA Connect实例与数据源在同一个VPC中,或者通过VPC对等连接、云专线等方式实现网络互通。
3.2 创建FDI数据集成任务
数据源接入完成后,就可以创建数据集成任务了。在左侧导航栏选择"数据集成 FDI > 任务管理",单击"创建普通任务"或"创建编排任务"。一个完整的数据集成任务配置包含以下几个部分:
- 任务基本信息:包括任务名称、集成模式(定时或实时)、描述和任务标签等。定时模式按照指定的时间计划执行,适用于周期性数据同步场景;实时模式则不断检测源端数据变更并实时同步,适用于对数据时效性要求较高的场景。
- 任务计划(可选):仅定时模式需要配置,用于指定任务的执行频率和时间点。
- 源端信息:选择已接入的源数据源,指定要同步的数据表或数据集。
- 目标端信息:选择已接入的目标数据源,指定数据写入的目标表或位置。
- Mapping信息:配置源端字段到目标端字段的映射规则,把获取到的源端数据转换成写入目标端的数据。映射方式支持手动配置(键值对输入或文本框输入)和自动配置两种。
- 异常数据存储(可选):配置同步异常时的数据存储位置,通常使用OBS。
配置完成后保存并启动任务,ROMA Connect即会按照设定的规则执行数据集成。如果遇到连接失败的情况,需要检查数据源状态和连接参数配置,重新检测直到连通为止。
3.3 FDI数据映射示例
下面是一个典型的数据映射配置示例。假设需要将MySQL数据库中的用户表(字段:id、name、email)同步到Oracle数据库中的目标表(字段:user_id、user_name、user_email),映射规则如下:
源端字段 → 目标端字段
id → user_id
name → user_name
email → user_email在ROMA Connect的Mapping配置页面,可以通过键值对的方式逐条添加上述映射关系。对于复杂的转换逻辑,还可以使用脚本模式进行更灵活的数据处理。
4. 服务集成APIC:开放API接口
服务集成APIC是ROMA Connect的API管理组件,能够将已有后端服务、数据源和自定义函数封装成标准的RESTful API并对外开放。APIC支持RESTful、SOAP、WebSocket等多种请求协议,并提供APP认证、IAM认证等安全认证方式,确保API访问的安全性。
4.1 通过组合应用开放API
组合应用是ROMA Connect新版提供的一种低代码集成方式,通过拖拽和配置即可完成API的创建和开放。具体步骤如下:
- 新建组合应用:登录ROMA Connect控制台,在左侧导航栏选择"组合应用",单击"新建组合应用",选择空模板或已有业务模板。
- 配置触发器:在组合应用画布中,单击空Action节点,在触发器页签选择"Open API"。配置API的基本信息,包括安全认证(推荐选择APP认证或IAM认证)、请求方法(GET/POST/PUT/DELETE等)和请求Path。
- 配置响应:添加"HTTP响应"连接器,配置响应体内容和响应码。
- 保存并启动:保存组合应用,选择ROMA Connect实例后启动。
启动后,组合应用会自动生成API的访问地址。通过Postman等工具即可调用该API进行测试。
4.2 自定义后端开发
除了通过组合应用的低代码方式,APIC还支持自定义后端开发。在左侧导航栏选择"服务集成 APIC > 自定义后端",单击"创建后端"即可开始。自定义后端支持两种类型:
- 数据后端:将数据源以API的形式对外开放,支持MySQL、Oracle、PostgreSQL等多种数据源。
- 函数后端:通过编写自定义函数(支持JavaScript、Python等)实现复杂的业务逻辑,再以API形式开放。
自定义后端开发完成后,需要发布并授权给相应的集成应用,才能被外部调用。
4.3 APIC API调用示例
调用ROMA Connect开放的API时,需要在请求中携带认证信息。以APP认证为例,需要在请求头中添加X-HW-ID(集成应用的Key)和X-HW-AppKey(集成应用的Secret)。下面是一个使用Python调用APIC API的示例:
import requests
import hashlib
import hmac
import base64
import time
def call_roma_api(api_url, app_key, app_secret, method='GET', body=None):
# 构造请求头
headers = {
'X-HW-ID': app_key,
'Content-Type': 'application/json'
}
# 计算签名(具体签名算法参考华为云官方文档)
# 这里简化处理,实际生产环境需要按照规范生成签名
if method.upper() == 'GET':
response = requests.get(api_url, headers=headers)
else:
response = requests.post(api_url, headers=headers, json=body)
return response.json()
# 使用示例
api_url = 'https://{roma_instance_domain}/v1/hello-world'
app_key = 'your_app_key'
app_secret = 'your_app_secret'
result = call_roma_api(api_url, app_key, app_secret)
print(result)5. 消息集成MQS:构建异步通信
消息集成MQS是ROMA Connect的消息中间件组件,使用统一的消息接入机制,提供跨网络访问的安全、标准化消息通道。MQS完全兼容社区版Kafka的API,具备原生Kafka的所有消息处理特性。
5.1 创建Topic
Topic是消息通信的基本单元。在ROMA Connect实例控制台的"消息集成 MQS > Topic管理"页面,单击"创建Topic",填写Topic名称和分区数等配置即可。创建Topic时需要注意:客户端向Topic发布和订阅消息时,需要使用授权集成应用的Key和Secret进行安全认证。
5.2 使用客户端连接MQS
Topic创建完成后,可以使用命令行工具或开源Kafka客户端连接MQS并进行消息收发。操作步骤如下:
- 准备环境:根据ROMA Connect实例的Kafka版本下载对应的开源Kafka命令行工具,并确保所在服务器已安装Java JDK。
- 获取连接地址:在实例控制台的"实例信息"页面查看MQS连接地址。如果使用内网访问,使用内网连接地址;如果使用公网访问,使用公网连接地址。
- 下载SSL证书(如适用):如果实例启用了MQS SASL_SSL,需要在Topic管理页面下载客户端证书。
- 配置Kafka客户端:在Kafka工具的
config目录下,修改consumer.properties和producer.properties文件,增加SASL认证配置:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="your_app_key" \
password="your_app_secret";
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
ssl.truststore.location=/cert/client.truststore.jks
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm=- 生产与消费消息:使用Kafka命令行工具向Topic发送和接收消息。
5.3 使用Java客户端连接MQS
以下是使用Java原生Kafka客户端连接MQS的代码示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class MQSProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "{mqs_broker_address}");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// SASL认证配置
String jaasTemplate =
"org.apache.kafka.common.security.plain.PlainLoginModule required \
username=\"%s\" password=\"%s\";";
props.put("sasl.jaas.config",
String.format(jaasTemplate, "your_app_key", "your_app_secret"));
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL");
props.put("ssl.truststore.location", "/cert/client.truststore.jks");
props.put("ssl.truststore.password", "dms@kafka");
props.put("ssl.endpoint.identification.algorithm", "");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>("your_topic_name", "key", "Hello ROMA MQS!");
Future<RecordMetadata> future = producer.send(record);
producer.close();
}
}使用SASL方式连接MQS时,建议在客户端所在主机的/etc/hosts文件中配置host和IP的映射关系,否则可能会引入额外的网络时延。此外,一个消费组下最多允许500个消费者同时连接同一个MQS,如果超过此数量需要将消费者拆分到多个消费组。
6. 设备集成LINK:物联网接入
设备集成LINK是ROMA Connect的物联网接入组件,使用标准MQTT协议连接设备,实现设备的快速上云管理。设备接入ROMA Connect后,可以上报数据到云端,也可以接收云端下发的控制命令。
6.1 设备注册与连接信息获取
设备接入的第一步是在ROMA Connect中完成设备注册。在实例控制台的"设备集成 LINK > 设备管理"页面,单击"创建设备",填写设备名称、所属产品等信息。设备创建成功后,系统会自动生成设备的连接信息:
- MQTT/MQTTS连接地址:在设备管理页面上方获取。如果使用MQTT接入则获取MQTT连接地址;如果使用MQTTS(加密)接入则获取MQTTS连接地址并下载SSL证书。
- 客户端ID/用户名/密码:在设备详情页面获取,这些信息是设备连接ROMA Connect的凭证。
- Topic信息:设备创建成功后系统会自动生成5个基础Topic,包括发布Topic和订阅Topic。
6.2 MQTT设备接入开发
设备端可以使用开源Eclipse Paho MQTT Client与ROMA Connect进行连接。以下是使用Java Paho客户端接入ROMA Connect的示例:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ROMADevice接入 {
public static void main(String[] args) throws Exception {
// 设备连接信息
String broker = "tcp://{mqtt_broker_address}:1883";
String clientId = "{device_client_id}";
String username = "{device_username}";
String password = "{device_password}";
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
// 连接到ROMA Connect
client.connect(connOpts);
System.out.println("设备已成功接入ROMA Connect");
// 订阅云端下发的命令Topic
String commandTopic = "/v1/devices/{device_id}/command";
client.subscribe(commandTopic, (topic, msg) -> {
System.out.println("收到命令: " + new String(msg.getPayload()));
});
// 上报设备数据
String dataTopic = "/v1/devices/{device_id}/datas";
String payload = "{\"temperature\": 25.5, \"humidity\": 60}";
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
client.publish(dataTopic, message);
System.out.println("数据已上报");
// 保持连接
Thread.sleep(60000);
client.disconnect();
}
}ROMA Connect不会直接存储设备上报的数据,需要配置数据转发规则,将设备数据转发到其他服务(如OBS、RDS等)进行持久化存储。设备完成集成开发后上电联网,即可接入ROMA Connect,此时在设备管理页面可以看到设备状态变为"在线"。
7. 高级功能与最佳实践
7.1 组合应用模板与编排
ROMA Connect提供了丰富的组合应用模板,覆盖教育、电商、医疗、零售等行业场景。用户可以直接使用模板快速创建组合应用,也可以自定义编排。在组合应用编排页面,可以对定时器、连接器、处理器等组件进行拖拽配置。例如,使用"定时同步MySQL中的数据到Oracle"模板,只需配置定时器间隔、MySQL连接器和Oracle连接器的参数,即可快速实现跨数据库的数据同步。
7.2 自定义连接器开发
当ROMA Connect默认支持的数据源类型无法满足需求时,可以开发自定义连接器。自定义连接器的开发流程包括:在ROMA Connect控制台的"资产管理"页面创建连接器模型,定义连接器的对接参数;自行开发连接器的RESTful接口(ROMA Connect仅对接口做了规范定义,具体读写功能需要自行实现);部署连接器后,在ROMA Connect中发布连接器实例,实现平台与自定义数据源的对接。
7.3 安全认证最佳实践
在开放API时,安全认证是不可忽视的一环。ROMA Connect推荐使用APP认证或IAM认证来保证接口数据的安全性。APP认证通过集成应用的Key和Secret进行身份验证,IAM认证则依托华为云的身份认证体系。对于设备接入,建议使用MQTTS(即MQTT over SSL)加密传输,避免敏感数据在公网传输时被窃听。
7.4 网络规划与性能优化
ROMA Connect实例与数据源、客户端之间的网络连通性是集成成功的前提。建议将ROMA Connect实例与需要集成的云资源部署在同一个VPC内,以降低网络延迟并提高安全性。如果涉及跨VPC或跨地域的集成,可以通过VPC对等连接、云专线或VPN等方式实现网络互通。在性能方面,需要注意不同功能占用的连接数不同,实际使用时应根据规格合理规划资源。
8. 常见问题与解答
问1:ROMA Connect支持哪些数据源类型?
答:ROMA Connect支持接入关系型数据库(MySQL、Oracle、PostgreSQL等)、大数据存储(HIVE、HBase等)、半结构化存储(MongoDB等)、消息系统(Kafka、RabbitMQ等)以及OBS对象存储等多种类型的数据源。如果默认类型不满足需求,还可以通过自定义连接器接入更多数据源。
问2:FDI数据集成任务的定时模式和实时模式有什么区别?
答:定时模式按照设定的时间计划周期性执行数据同步任务,适用于对数据实时性要求不高的场景,如每日报表数据同步。实时模式则不断检测源端数据的变更并即时同步到目标端,适用于对数据时效性要求较高的场景,如交易数据的实时同步。不同数据源所支持的数据集成模式有所不同,具体需要参考数据源的支持列表。
问3:如何保证ROMA Connect开放的API的安全性?
答:ROMA Connect提供了多种API安全认证方式,包括APP认证(基于集成应用的Key和Secret)、IAM认证(基于华为云身份认证)等。推荐在开放API时选择APP认证或IAM认证,避免使用"无认证"模式。此外,还可以通过配置IP白名单、启用HTTPS加密传输等方式进一步增强安全性。
问4:MQS与开源Kafka有什么异同?
答:MQS完全兼容社区版Kafka的API,具备原生Kafka的所有消息处理特性。不同之处在于,MQS深度集成了华为云的安全认证体系,客户端向Topic发布和订阅消息时需要使用ROMA Connect集成应用的Key和Secret进行认证。此外,MQS还提供了统一的Web控制台进行Topic管理和监控,使用体验更加便捷。
问5:设备通过MQTT接入ROMA Connect时,上报的数据会保存在哪里?
答:ROMA Connect本身不会持久化存储设备上报的数据。设备上报的数据需要通过配置数据转发规则,转发到其他云服务(如OBS对象存储、RDS关系型数据库、DWS数据仓库等)进行存储。这样可以充分利用华为云各存储服务的优势,实现数据的灵活管理和长期保存。
问6:组合应用和传统的数据集成任务有什么区别?
答:传统的数据集成任务(FDI任务)主要聚焦于数据层面的同步和转换,适用于单一的数据迁移场景。而组合应用则是一种更高级的集成方式,支持对触发器、连接器、处理器等多种组件进行编排和组合。组合应用不仅支持数据集成,还支持API开放、消息处理、条件判断、循环控制等复杂的集成逻辑,适合构建端到端的业务集成流程。



