Mình sẽ sử dụng lại ví dụ BookingServiceOnline trong phần trước đó
Ở đây mình sẽ tạo ra các isolated service, đồng thời thiết kế để chúng giao tiếp với nhau thông qua một MessageQueue. Ở đây mình chọn RabbitMQ làm MessageQueue.
BookingService xử lý các yêu cầu HTTPPOST để tạo booking mới. BookingService cố gắng đặt trước một chỗ và nếu thành công, nó sẽ gửi một message đến PaymentService thông qua một queue trong RabbitMQ có tên là payment_queue.
2. Triển khai PaymentService
// booking-service.tsimport amqp from'amqplib'let channel: amqp.Channelconst paymentQueue ='payment_queue'const seatUpdatingQueue ='seat_update_queue'const compensationQueue ='compensation_queue'asyncfunctionprocessPayment(){const connection =await amqp.connect('amqp://localhost')
channel =await connection.createChannel()await channel.assertQueue(paymentQueue)await channel.assertQueue(seatUpdatingQueue)await channel.assertQueue(compensationQueue)
channel.consume(paymentQueue,async(msg)=>{const{ booking }=JSON.parse(msg.content.toString())// Pre-step: Process paymentconst paymentProcessedSuccessfully =true/**
* Step 2:
* Send seats update request to SeatUpdatingService
* Or Reverse the booking by sending a compensation request to CompensationService
*/if(paymentProcessedSuccessfully){awaitsendMessageToQueue(seatUpdatingQueue,{ booking })}else{awaitsendMessageToQueue(compensationQueue,{
booking,
event:'PaymentServiceFailed',})}// Acknowledge the message
channel.ack(msg as amqp.Message)})}asyncfunctionsendMessageToQueue(queue:string, message:unknown){await channel.sendToQueue(queue,Buffer.from(JSON.stringify(message)))}processPayment()
PaymentService lắng nghe các message trên payment_queue. Nó chịu trách nhiệm xử lý payment, đồng thời nó cũng sẽ gửi một message đến seat_update_queue nếu thanh toán thành công hoặc compensation_queue nếu thanh toán thất bại.
SeatUpdatingService lắng nghe các message trên seat_update_queue. Nó chịu trách nhiệm xử lý cập nhật chỗ, đồng thời nó cũng sẽ gửi một message đến notification_queue nếu thực hiện thành công hoặc compensation_queue nếu thực hiện thất bại.
CompensationService lắng nghe các message trên compensation_queue. Khi CompensationService nhận được một message cho biết có sự cố khi xử lý booking, nó sẽ thực hiện một compensation transaction để hủy booking và đưa dữ liệu về trạng thái ban đầu, giúp hệ thống nhất quán về dữ liệu.
5. Triển khai NotificationService
// seat-update-service.tsimport amqp from'amqplib'let channel: amqp.Channelconst notificationQueue ='notification_queue'asyncfunctionprocessSendNotification(){const connection =await amqp.connect('amqp://localhost')
channel =await connection.createChannel()await channel.assertQueue(notificationQueue)
channel.consume(notificationQueue,async(msg)=>{const{ booking, isSuccess }=JSON.parse(msg?.content.toString()||'{}')/**
* Step 4: Send notification to user
*/if(isSuccess){console.log(`Booking SUCCESS sent to user ${booking.userId}`)}else{console.log(`Booking FAIL send to user ${booking.userId}`)}// Acknowledge the message
channel.ack(msg as amqp.Message)})}processSendNotification()
NotificationService lắng nghe các message trên notification_queue. Khi nó nhận được một message, nó sẽ gửi một thông báo (có thể là email, số điện thoại,…) tuỳ theo yêu cầu kỹ thuật.
Như vậy, bạn đã triển khai thành công các microservices với mô hình Choreography-Based Saga. Mỗi service sẽ thực hiện nhiệm vụ của mình và giao tiếp với các service khác thông qua event.