EDUCAÇÃO E TECNOLOGIA

Transfer Data from CDS View to Kafka with SAP Data Intelligence

SAP Data Intelligence(以降、DI)は、「データ管理」と「AIの開発・運用」を一つに統合したソリューションです。詳細については、SAP Data Intelligenceで実現するエンタープライズAIをご参照ください。

本ブログの目的

DIのパイプラインを使うことで、SAPやnon-SAPにまたがってデータが散らばる複雑なランドスケープにおいて、データを効率よく連携・管理できることを理解。

前提知識

  • ABAP:CDS Viewを書いたことがあるレベル

説明の流れ

  1. DI-S/4 HANAとDI-Kafkaの接続設定
  2. パイプライン作成
  3. パイプライン実行

本内容は、IT管理者やデータエンジニアに実施して頂く想定になります。

作成するパイプライン

CDS Viewの内容を読み取り、JSON形式に変換した上でnon-SAPシステムに格納するパイプラインを作成します。
non-SAPシステムとしてApache Kafka(以降、Kafka)を使用しますが、Kafkaの知識は必要ありません。

補足)Kafkaとは?
高スループット、かつ低レイテンシなオープンソースの分散ストリーミングプラットフォームです。

ここでは、読み取り対象のCDS Viewを準備し、DIのConnection ManagementからS/4 HANAとKafkaへの接続設定を行います。

補足)Connection Managementは、SAPやnon-SAPの様々なデータソースへの接続を定義し、管理する機能です。ここで定義した接続設定を用いることで、データカタログ機能によりDBのテーブルを確認したり、パイプライン機能によりデータ連携先として簡単にデータソースにアクセスできるようになります。

CDS View準備

まずは、ABAP Development Tools(ADT)でDIの読み取り対象になるCDS Viewを準備します。

補足)CDS Viewの作成方法がわからない方は、こちらをご参照ください。

DIから読み取りが出来るようにするために、CDS Viewに以下のAnnotationを付与します。

@Analytics: { dataExtraction: { enabled: true, delta.changeDataCapture.automatic: true }
}

今回は以下のようなCDS Viewを使用します。

@AbapCatalog.sqlViewName: 'ZCDSMARA_0'
@AbapCatalog.compiler.compareFilter: true
@AbapCatalog.preserveKey: true
@AccessControl.authorizationCheck: #CHECK
@EndUserText.label: 'ZCDS MARA test' @Analytics: { dataExtraction: { enabled: true, delta.changeDataCapture.automatic: true }
} define view ZCDS_MARA_0 as select from mara { key matnr, ersda, created_at_time, scm_matid_guid16
}

次に、ADTで読み取り対象のCDS Viewに以下のようにデータが入っていることを確認します。

DIとS/4 HANAの接続設定

DIのLaunchpadで、Connection Managementを選択します。

“Create Connection”ボタン(+)を押します。

“Connection Type”で”ABAP”を選択します。

接続対象となるS/4 HANAのホスト名、認証情報等を設定し、”Create”ボタンを押します。

Connectionの一覧画面で、”Action”ボタンから”Check Status”を実行します。

以下のようにOKと表示されたら、対象のS/4 HANAとの接続に成功しています。

DIとKafkaの接続設定

Connection一覧画面に戻り、“Create Connection”ボタン(+)を押します。

“Connection Type”で”Kafka”を選択します。

接続対象となるKafkaのホスト名、ポート等を設定し、”Create”ボタンを押します。

これでDIとKafkaの接続設定は完了です。

ここでは、CDS Viewを読み取り、JSONに変換してKafkaに格納するまでを行うDIのパイプラインを作成します。

新規パイプライン作成

DIのLaunchpadで、”Modeler”を選択します。

“Graphs”を選択してから、”Create Graph”ボタンをクリックします。

補足)Graphというのは、パイプラインのフローのことを指します。

“Save”ボタンを押し、作成したGraphに名前を付けます。

今回は”CDS to Kafka”という名前で保存します。

フロー作成:CDS View読み取り

ここからは、Operatorという小さな機能の単位を組み合わせてデータの流れを制御するフローを作成します。

まずはCDS Viewを読み取るためのOperatorである”ABAP CDS Reader”をダブルクリックし、フロー内に配置します。

読み取るCDS Viewを設定するために、フロー内の”ABAP CDS Reader”を選択し、”Open Configuration”をクリックします。

指定した”ABAP CDS Reader”オペレーターの設定画面が右側に開きます。
“ABAP Connection”のドロップダウンボタンをクリックするとConnection Managementで設定したABAP接続の一覧が表示されるため、接続対象のS/4 HANAを選択します。

“Version”の選択ボックスをクリックすると、”Select Version”画面が表示されます。
ABAP CDS Readerに複数のバージョンがありますが、最新版のV2を選択します。

補足)新規にパイプラインを作成する際は、古いバージョンで検証を行っていた等の特別な事情がない限り、新しいバージョンを利用してください。

“ABAP CDS Name”に対象の読み取り対象のCDS View名を記載します。
“Transfer Mode”は今回はInitial Loadを指定します。

ここまででCDS Viewの読み取り設定は完了です。

フロー作成:CSV ⇒ JSON変換

次に、CDS Viewから読み取ったCSVをJSONに変換するフローを作成します。

CSVからJSONに変更する機能を持つ”Format Converter”オペレーターをダブルクリックし、フロー内に配置します。

“ABAP CDS Reader V2″オペレーターと”Format Converter”オペレーターを接続するために、各々のOutputポートとInputポートのデータ形式を確認します。

まずは、”ABAP CDS Reader V2″オペレーターのOutputポートにマウスカーソルを合わせると、以下のように”Data Type: message”と表示されます。

次に、”Format Converter”オペレーターのInputポートにマウスカーソルを合わせると、以下のように”Data Type: blob”と表示されます。

接続したいオペレーター同士のデータ形式が違うため、これを統一する必要があります。

message形式のデータをblob形式に変換するために、”blob”で検索すると、”ToBlob Converter”オペレーターというものが見つかるので、ダブルクリックしてフロー内に配置します。

先ほどと同じように”ToBlob Converter”オペレーターのInputポート、Outputポートを確認すると、Inputが”Data Type: any”、Outputが”Data Type: blob”となり、”ABAP CDS Reader V2″オペレーターと”Format Converter”オペレーターを接続する際のデータ形式の仲介役に利用できることがわかります。

接続したいOperatorをそれぞれ、以下のようにマウスでドラッグして接続します。

次に、”Format Converter”を選択し、”Open Configuration”をクリックします。

“Target Format”に返還後のデータ形式である”JSON”を指定します。
“Fields”にJSON変換時のヘッダー情報を記入します。

冒頭のCDS View確認時のヘッダーを参考に、”Fields”は以下のように設定します。

これでCSV ⇒ JSONのフローが完成です。

フロー作成:Kafkaへの書き込み

最後に、Kafkaへのデータ書き込みのフローを作成します。

“Kafka”で検索すると、以下のように2つのオペレーターが表示されます。
ConsumerはKafkaからデータを取得するオペレーターで、ProducerはKafkaにデータを書き込むオペレーターです。

今回はKafkaに対してデータを書き込みたいので、”Kafka Producer”オペレーターをダブルクリックしてフロー内に配置します。

“Format Converter”オペレーターのOutputポートと、”Kafka Producer”オペレーターのInputポートを接続します。

Kafkaへのデータ書き込み設定をするために、フロー内の”Kafka Producer”を選択し、”Open Configuration”をクリックします。

”Kafka Producer”オペレーターの設定画面が右側に開きます。
“Connection Type”のドロップダウンボタンをクリックし、”connection management”を選択します。

補足)”manual”を選択した場合は、Kafkaのホスト名等、接続情報を記入します。
今回は事前にConnection ManagementでKafkaの接続情報を設定済みのため、既存の接続情報からKafkaの宛先を選ぶことができる”connection management”を選択しました。

次に、”Kafka Producer Connection”の”Editproperty”ボタンをクリックします。
“Edit property ‘Kafka Producer Connection’”画面が表示されるので、Connection IDのドロップダウンのリストから接続対象のKafkaを選択し、”save”ボタンをクリックします。

“Topic”にデータ書き込み先のKafkaの対象トピック名を記入します。

補足)トピックというのは、Kafkaにおけるデータストリーミングの1つの箱のようなものだと考えてください。

ここまででフローの作成は完了です。”Save”ボタンをクリックしてフローを保存します(Ctrl+Sでも可)。

パイプライン実行前の環境確認

以下のようにKafkaに格納されたデータを確認すると、パイプライン実行前は空であることがわかります。

# ./bin/kafka-console-consumer.sh --bootstrap-server <Kafka Host>:<Kafka Port> --topic cdsTestTopic --from-beginning

パイプライン実行

Modeler画面の“Run”ボタンで作成したパイプラインを実行します。

“Status”欄に実行状態が表示されます。”Running”と表示されたら、パイプラインによるデータ転送処理を実行中です。

パイプライン実行後の環境確認

再度Kafkaに格納されたデータを確認すると、以下のようにデータが格納されていることがわかります。

# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdsTestTopic --from-beginning
[ {"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C4C0B","matnr":"TG22"}, {"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C2C0B","matnr":"TG21"}, {"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C0C0B","matnr":"TG20"}, …… {"created_at":"03:44:10.000","esrda":"2021-01-06","guid":"42010AEE-E443-1EDB-93FA-2DECC2C485FD","matnr":"000000000000000015"}
]

パイプライン停止

最後に、”Stop Process”ボタンを押し、パイプラインを停止します。

以上のように、DIを活用することにより、様々な場所に散らばったデータを効率よく連携・変換することができます。

データは重要な資産であり、インサイトの発見に欠かせません。
しかし、今日ではデータがオンプレのSQL-DB、NoSQL-DB、クラウド上のデータレイク等あちこちに散らばり、複雑なランドスケープを形成しています。

そのため、データサイエンティストがデータ分析をする際、必要なデータを集める前準備だけで多くの時間を消費してしまいます。

DIを使うことで、複雑なランドスケープにおけるデータ管理を効率化したり、可視化することでデータフローを直感的に掴めるようになります。
これにより、インサイトの発見やイノベーション創出に役立てることが出来ます。

データ管理基盤として是非、DIをご活用頂けたら幸いです!