COBOL & Kafka
Convierta su programa COBOL en un Kafka consumer/producer.
Quiere integrar sus programas COBOL en un modelo de proceso basado en eventos.
Aprenda cómo convertir un programa COBOL en un Kafka consumer / producer.
Desde el programa COBOL realizaremos una llamada al módulo D8kafka pasándole:
- El topic Kafka
- Una lista de valores (key : value) separados por comas
******************************************************************
*
* Loan kafka producer
* ==========================
*
*
******************************************************************
IDENTIFICATION DIVISION.
PROGRAM-ID. cuotak.
ENVIRONMENT DIVISION.
DATA DIVISION.
FILE SECTION.
WORKING-STORAGE SECTION.
01 WS-LOAN.
05 WS-AMT PIC 9(7)V9(2).
05 WS-INT PIC 9(2)V9(2).
05 WS-YEAR PIC 9(2).
******************************************************************
01 KAFKA.
05 KAFKA-TOPIC PIC X(05) VALUE "loans".
05 FILLER PIC X(1) VALUE LOW-VALUES.
05 KAFKA-KEY.
10 KAFKA-KEY1 PIC X(15) VALUE "PrincipalAmount".
10 FILLER PIC X(1) VALUE ",".
10 KAFKA-KEY2 PIC X(12) VALUE "InterestRate".
10 FILLER PIC X(1) VALUE ",".
10 KAFKA-KEY1 PIC X(09) VALUE "TimeYears".
10 FILLER PIC X(1) VALUE LOW-VALUES.
05 KAFKA-VALUE.
10 KAFKA-AMT-VALUE PIC zzzzzz9.99.
10 FILLER PIC X(1) VALUE ",".
10 KAFKA-INT-VALUE PIC z9.99.
10 FILLER PIC X(1) VALUE ",".
10 KAFKA-YEAR-VALUE PIC zz.
10 FILLER PIC X(1) VALUE LOW-VALUES.
PROCEDURE DIVISION.
INITIALIZE WS-LOAN.
DISPLAY "Amount: " WITH NO ADVANCING.
ACCEPT WS-AMT.
DISPLAY "Interest: " WITH NO ADVANCING.
ACCEPT WS-INT.
DISPLAY "Number of Years: " WITH NO ADVANCING.
ACCEPT WS-YEAR.
MOVE WS-AMT TO KAFKA-AMT-VALUE.
MOVE WS-INT TO KAFKA-INT-VALUE.
MOVE WS-YEAR TO KAFKA-YEAR-VALUE.
CALL "D8kafka" USING KAFKA-TOPIC
KAFKA-KEY
KAFKA-VALUE.
DISPLAY "Return-code: " RETURN-CODE.
GOBACK.
A continuación se muestra una versión simplificada de ejemplo del módulo d8kafka
package main
/*
#include <string.h>
#include <stdlib.h>
*/
import "C"
import (
"encoding/json"
"fmt"
"os"
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type Kdata struct {
Key string `json:"key"`
Value string `json:"value"`
}
//export D8kafka
func D8kafka(c_topic *C.char, c_key *C.char, c_value *C.char) C.int {
keys := strings.Split(C.GoString(c_key), ",")
values := strings.Split(C.GoString(c_value), ",")
data := make([]Kdata, len(keys))
for i := 0; i < len(keys); i++ {
data[i] = Kdata{Key: keys[i], Value: values[i]}
}
KafkaMsg, _ := json.Marshal(data)
topic := C.GoString(c_topic)
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:29092",
"client.id": "client",
"acks": "all"},
)
if err != nil {
fmt.Printf("ERROR: Failed to create producer: %s\n", err)
os.Exit(1)
}
delivery_chan := make(chan kafka.Event, 1000)
err = p.Produce(
&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(KafkaMsg),
},
delivery_chan,
)
if err != nil {
fmt.Printf("ERROR: Failed to produce message: %s\n", err)
os.Exit(1)
}
e := <-delivery_chan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("ERROR: Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("INFO: Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(delivery_chan)
return 0
}
func main() {
}
Para consumir el topic kafka desde un programa Go puede utilizar el siguiente ejemplo:
package main
import (
"fmt"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
var topic string = "loans"
var run bool = true
func main() {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:29092",
"group.id": "sample",
"auto.offset.reset": "smallest"},
)
if err != nil {
fmt.Printf("ERROR: Failed to create consumer: %s\n", err)
os.Exit(1)
}
err = consumer.Subscribe(topic, nil)
if err != nil {
fmt.Printf("ERROR: Failed to subscribe: %s\n", err)
os.Exit(1)
}
for run {
ev := consumer.Poll(100)
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("INFO: %s", e.Value)
case kafka.Error:
fmt.Printf("%% ERROR: %v\n", e)
run = false
}
}
consumer.Close()
}
Para ejecutar una prueba es necesario tener instalado Kafka
Un manera sencilla de hacerlo es utilizar Docker (docker-compose.yml) para configurar un entorno mínimo de pruebas con zookeeper y kafka