Lập trình Concurrent cùng Semaphore

3782

Giờ đây chuyện viết được một app iOS đã trở nên khá đơn giản, tuy nhiên, để tạo được một app chuẩn mực sẽ đòi hỏi nhiều nỗ lực hơn trong nhiều khía cạnh từ tối ưu hóa thuật toán đến các thao tác liên quan đến system. Quy tắc chung cho hầu hết mọi trường hợp đó là tránh để các job nặng nhọc, tốn thời gian gần main thread nhưng phải đảm bảo rằng các call trong UIKit hoặc những cái liên quan đến UI diễn ra trong main thread. Điều này làm cho app phản ứng nhanh hơn và không quá chậm chạp trong khi sử dụng, để mang đến trải nghiệm người dùng tốt hơn.

Việc phân chia job với các thread tương ứng thực chất chính là lập trình concurrent, một topic gây nhức nhối bấy lâu. Nhờ có các ngôn ngữ lập trình, libraries và các framework cao cấp hỗ trợ mà nó đã phần nào trở nên “dễ thở” hơn và ít lỗi prone cần xử lý hơn. Tùy theo mức độ phức tạp của app mà ta sử dụng các phương pháp xử lý vấn đề khác nhau, và một trong số đó chính là sử dụng semaphore để đồng bộ hóa các job bất đồng bộ hoặc tái sắp xếp chúng trong thread. Mọi thứ sẽ được trình bày kĩ hơn trong bài viết dưới đây.

Semaphore là gì?

Nếu để ý bạn sẽ thấy các phiên bản của semaphore xuất hiện ở rất nhiều nơi trong cuộc sống của chúng ta. Ví dụ trong cuộc thi cờ vua, bạn sẽ thấy cả hai thí sinh đều nhấn một đồng hồ đếm chung trước lượt đi của mỗi người. Chỉ có một bàn cờ và mỗi lượt chỉ được một người đi. Một ví dụ khác nữa đó là các quầy thu ngân trong siêu thị. Thông thường thì số lượng quầy thu là có hạn. Nếu có quầy trống, bạn sẽ tới check ở quầy đó ngay. Tuy nhiên, hầu như lúc nào các quầy cũng full và dòng người đứng đợi thanh toán thì dài dằng dẳng. Cả hai ví dụ có một điểm chung đó là chúng đều có nguồn chia sẻ giới hạn, lần lượt là bàn cờ và các quầy thu ngân.

Về semaphore, nó là một cơ chế giúp quản lý các nguồn chia sẻ và đảm bảo access không bị tắc nghẽn. Nguồn này có thể là cái gì đó cụ thể như một variable hoặc trừu tượng như một job pool. Có hai loại semaphore, binary semaphore và counting semaphore. Loại đầu tiện được dùng làm lock vì nó chỉ có 2 giá trị là 0 và 1 đại diện cho tình trạng lock hay unlock, còn loại thứ hai thì thực hiện đếm resource để cho biết mức độ sẵn sàng của resource.

Trong Grand Central Dispatch (GCD), semaphore là một ví dụ của DispatchSemaphore. API của nó rất nhỏ gồm một initializer init(value:) và 2 method chính signal() và wait().

Lưu ý: Các call đến signal() phải cân bằng với các call đến wait(), nếu không sẽ xuất hiện ngoại lệ EXC_BAD_INSTRUCTION.

Param value trong init(value:) ghi rõ value ban đầu của semaphore, chúng ta sẽ bàn về nó sau. Về nguyên tắc sử dụng semaphore thì rất đơn giản, wait() sẽ trả về toàn bộ nếu value của semaphore sau khi giảm tải lớn hơn hoặc bằng 0, nếu không nó sẽ đợi có tín hiệu. Để ra hiệu, đương nhiên sẽ dùng đến signal() để tăng value đó lên, từ đó caller đang đợi lệnh wait() có thể tiếp tục hoạt động. Để hiểu rõ hơn hãy xem phác thảo dưới đây.

Semaphore

Sau khi đã biết cách sử dụng DispatchSemaphore, chúng ta sẽ áp dụng nó vào các use case khác nhau.

Use case 1

Trong trường hợp này, chúng ta sẽ đồng bộ hóa 2 job bất đồng bộ trong một concurrent queue. Bên cạnh đó, chúng ta cũng nên kiểm tra param value trong initializer với value bằng 0.

Context: Ta có một concurrent queue chứa 2 job bất đồng bộ, Download Image và Download Frame. Chúng ta cần thực hiện cả 2 download một lúc – vừa image vừa frame, rồi kết hợp chúng vào 1 cái, giả sử như Download Frame. 

let queue = DispatchQueue(label: "queue", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 0)

// Download Image job.
queue.async {
    Thread.sleep(forTimeInterval: 0.5)
    print("Downloaded image.")
    let signal = semaphore.signal()
    print("Signal:", signal)
}

// Download Frame job.
queue.async {
    Thread.sleep(forTimeInterval: 0.9)
    print("Downloaded frame.")
    // Await Download Image job to complete.
    semaphore.wait()
    print("Combine image and frame.")
}

// Downloaded frame.
// Downloaded image.
// Combine image and frame.
// Signal: 1

Ta đã tạo một semaphore có value ban đầu bằng 0 tức là dù cho có call wait() đầu tiên nào thì nó cũng phải đợi cho đến khi có call signal(). Nó rất có ích khi bạn muốn dùng semaphore như một lock hoặc một flag đồng bộ vì bạn không biết hoặc không có nhiều resouces. Quay lại phần code trên, Thread.sleep(forTimeInterval:) chỉ đang mô phỏng quá trình download tiêu tốn cả mớ thời gian để hoàn thành. Dù cho job Download Frame sẽ xong trước, nó vẫn phải đợi cho job Downloaf Image hoàn thành trước khi đến bước tiếp theo.

semaphore.wait() sẽ giảm value của semaphore, từ 0 xuống -1, và bắt đầu đợi. Ngược lại, khi semaphore.signal() được gọi, nó sẽ tăng value của semaphore về lại 0 và ra hiệu cho job Download Frame để tiếp tục vận hành.

Điều thú vị ở đây đó là nếu ta đảo Thread.sleep(forTimeInterval:) giữa 2 job để job Download Image xong trước, thì mọi thứ vẫn chạy bình thường trừ một điểm khác biệt nhỏ.

// Downloaded image.
// Signal: 0
// Downloaded frame.
// Combine image and frame.

Bạn đã phát hiện ra chưa? Đó chính là value của signal. Trong tình huống trước, value của nó là 1, còn bây giờ là 0. Có nghĩa là gì? Thứ tự call của signal() and wait() không còn qaun trọng nữa nếu như bạn cân bằng nó. Nếu call wait() hiện trước call signal() và có cái đang đợi tín hiệu, thì signal() sẽ trả về một value khác 0. Ngược lại, nó trả về value 0 đồng nghĩa rằng không có job nào phải đợi để hoạt động, và call wait() tương ứng sẽ return ngay lập tức.

Bản phác họa này sẽ giúp bạn dễ hiểu hơn.

Use case 1

Use case 2

Trong một vài trường hợp, chúng ta cần giới hạn số lượng concurrent job. Ví dụ, chúng ta cần download một mớ images nhưng chỉ muốn tải một lúc 2 cái. Việc này là hoàn toàn có thể bằng API cấp cao hơn OperationQueue. Duối đay là cách sử dụng semaphore như một resource pool.

Context: Chúng ta có một concurrent queue. Ta gắn khá nhiều job Download Image vào queue này và chỉ tối đa 2 job chạy cùng lúc.

let queue = DispatchQueue(label: "queue", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 2)

for i in 0 ..< 10 {
    queue.async {
        semaphore.wait()
        print("Downloading image", i)
        let timeInterval: TimeInterval = 0.5 + (arc4random_uniform(2) == 0 ? 1.0 : -1.0) * 1.0 / Double(arc4random_uniform(10) + 1)
        Thread.sleep(forTimeInterval: timeInterval)
        print("Downloaded image", i)
        semaphore.signal()
    }
}

// Downloading image 0
// Downloading image 1
// Downloaded image 0
// Downloading image 2
// Downloaded image 2
// Downloading image 3
// Downloaded image 1
// Downloading image 4
// Downloaded image 4
// Downloading image 5
// Downloaded image 3
// Downloading image 6
// Downloaded image 5
// Downloading image 7
// Downloaded image 6
// Downloading image 8
// Downloaded image 7
// Downloading image 9
// Downloaded image 9
// Downloaded image 8

Ở đây, tôi đã random timeInterval để mô phỏng thời gian download. Nhìn vào log bạn sẽ thấy ban đầu chỉ có image 0 và 1 được downlaod cùng lúc. Khi image 0 đã hoàn thành, image 2 tiếp nối, và số lượng concurrent download vẫn là 2 cái. Nó sẽ như vậy cho đến khi tất cả các images được down về.

Use case 2

Kết luận

Semaphore là một tool vừa đơn giản lại vừa giúp ích cho việc xử lý các vấn đề về concurrency. Nó cùng với các APIs khác trong GCD, và các API cấp cao hơn như OperationQueue sẽ còn giúp bạn xử lý được nhiều vấn đề phức tạp hơn thế nữa.

TopDev via Vinh Nguyen’s Blog