Расширенные схемы параллелизма в Go

В этом уроке мы рассмотрим некоторые продвинутые модели параллелизма в Go. Часто в реальном мире эти паттерны используются в комбинации.

Генератор

Паттерн генератор используется для генерации последовательности значений, которая используется для получения некоторого результата.

В нашем примере мы имеем функцию generator, которая просто возвращает канал, из которого мы можем считывать значения.

Это работает на том факте, что отправка и получение блокируются до тех пор, пока не будут готовы и отправитель, и получатель. Это свойство позволило нам ждать, пока не будет запрошено следующее значение.

package main

import "fmt"

func main() {
    ch := generator()

    for i := 0; i < 5; i++ {
        value := <-ch
        fmt.Println("Value:", value)
    }
}

func generator() <-chan int {
    ch := make(chan int)

    go func() {
        for i := 0; ; i++ {
            ch <- i
        }
    }()

    return ch
}
Вход в полноэкранный режим Выход из полноэкранного режима

Если мы запустим это, то заметим, что можем потреблять значения, которые были получены по запросу.

$ go run main.go
Value: 0
Value: 1
Value: 2
Value: 3
Value: 4
Войти в полноэкранный режим Выход из полноэкранного режима

Это похоже на поведение yield в JavaScript и Python.

Fan-in

Шаблон fan-in объединяет несколько входов в один выходной канал. По сути, мы мультиплексируем наши входы.

В нашем примере мы создаем входы i1 и i2 с помощью функции generateWork. Затем мы используем нашу вариативную функцию fanIn для объединения значений из этих входов в один выходной канал, из которого мы можем потреблять значения.

Примечание: порядок ввода не гарантируется.

package main

import (
    "fmt"
    "sync"
)

func main() {
    i1 := generateWork([]int{0, 2, 6, 8})
    i2 := generateWork([]int{1, 3, 5, 7})

    out := fanIn(i1, i2)

    for value := range out {
        fmt.Println("Value:", value)
    }
}

func fanIn(inputs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    wg.Add(len(inputs))

    for _, in := range inputs {
        go func(ch <-chan int) {
            for {
                value, ok := <-ch

                if !ok {
                    wg.Done()
                    break
                }

                out <- value
            }
        }(in)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func generateWork(work []int) <-chan int {
    ch := make(chan int)

    go func() {
        defer close(ch)

        for _, w := range work {
            ch <- w
        }
    }()

    return ch
}
Вход в полноэкранный режим Выход из полноэкранного режима
$ go run main.go
Value: 0
Value: 1
Value: 2
Value: 6
Value: 8
Value: 3
Value: 5
Value: 7
Войти в полноэкранный режим Выход из полноэкранного режима

Fan-out

Шаблоны Fan-out позволяют нам по существу разделить наш единственный входной канал на несколько выходных каналов. Это полезный паттерн для распределения рабочих элементов по нескольким однородным исполнителям.

В нашем примере мы разбиваем входной канал на 4 различных выходных канала. Для динамического числа выходов мы можем объединить выходы в общий «агрегированный» канал и использовать select.

Примечание: схема fan-out отличается от pub/sub.

package main

import "fmt"

func main() {
    work := []int{1, 2, 3, 4, 5, 6, 7, 8}
    in := generateWork(work)

    out1 := fanOut(in)
    out2 := fanOut(in)
    out3 := fanOut(in)
    out4 := fanOut(in)

    for range work {
        select {
        case value := <-out1:
            fmt.Println("Output 1 got:", value)
        case value := <-out2:
            fmt.Println("Output 2 got:", value)
        case value := <-out3:
            fmt.Println("Output 3 got:", value)
        case value := <-out4:
            fmt.Println("Output 4 got:", value)
        }
    }
}

func fanOut(in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)

        for data := range in {
            out <- data
        }
    }()

    return out
}

func generateWork(work []int) <-chan int {
    ch := make(chan int)

    go func() {
        defer close(ch)

        for _, w := range work {
            ch <- w
        }
    }()

    return ch
}
Вход в полноэкранный режим Выход из полноэкранного режима

Как мы видим, наша работа была разделена между несколькими горутинами.

$ go run main.go
Output 1 got: 1
Output 2 got: 3
Output 4 got: 4
Output 1 got: 5
Output 3 got: 2
Output 3 got: 6
Output 3 got: 7
Output 1 got: 8
Вход в полноэкранный режим Выход из полноэкранного режима

Конвейер

Схема конвейера представляет собой серию этапов, соединенных каналами, где каждый этап — это группа goroutines, выполняющих одну и ту же функцию.

На каждом этапе горутины:

  • Получают значения из восходящего потока по входящим каналам.
  • Выполняют некоторую функцию над этими данными, обычно производя новые значения.
  • Отправляют значения вниз по потоку через исходящие каналы.

Каждый этап имеет любое количество входящих и исходящих каналов, кроме первого и последнего этапов, которые имеют только исходящие или входящие каналы соответственно. Первый этап иногда называют источником или производителем; последний этап — поглотителем или потребителем.

Используя конвейер, мы разделяем задачи каждого этапа, что обеспечивает многочисленные преимущества, такие как:

  • Модифицировать этапы независимо друг от друга.
  • Смешивать и подбирать способы объединения этапов независимо от модификации этапа.

В нашем примере мы определили три этапа: filter, square и half.

package main

import (
    "fmt"
    "math"
)

func main() {
    in := generateWork([]int{0, 1, 2, 3, 4, 5, 6, 7, 8})

    out := filter(in) // Filter odd numbers
    out = square(out) // Square the input
    out = half(out)   // Half the input

    for value := range out {
        fmt.Println(value)
    }
}

func filter(in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)

        for i := range in {
            if i%2 == 0 {
                out <- i
            }
        }
    }()

    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)

        for i := range in {
            value := math.Pow(float64(i), 2)
            out <- int(value)
        }
    }()

    return out
}

func half(in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)

        for i := range in {
            value := i / 2
            out <- value
        }
    }()

    return out
}

func generateWork(work []int) <-chan int {
    ch := make(chan int)

    go func() {
        defer close(ch)

        for _, w := range work {
            ch <- w
        }
    }()

    return ch
}
Вход в полноэкранный режим Выход из полноэкранного режима

Похоже, что наш ввод был правильно обработан конвейером в параллельном режиме.

$ go run main.go
0
2
8
18
32
Вход в полноэкранный режим Выход из полноэкранного режима

Рабочий пул

Пул рабочих — это действительно мощный паттерн, который позволяет нам распределять работу между несколькими рабочими (goroutines) одновременно.

В нашем примере у нас есть канал jobs, на который мы будем отправлять наши задания, и канал results, на который наши рабочие будут отправлять результаты после завершения работы.

После этого мы можем запускать наших рабочих параллельно и просто получать результаты из канала results.

В идеале, totalWorkers должно быть установлено на runtime.NumCPU(), что дает нам количество логических CPU, используемых текущим процессом.

package main

import (
    "fmt"
    "sync"
)

const totalJobs = 4
const totalWorkers = 2

func main() {
    jobs := make(chan int, totalJobs)
    results := make(chan int, totalJobs)

    for w := 1; w <= totalWorkers; w++ {
        go worker(w, jobs, results)
    }

    // Send jobs
    for j := 1; j <= totalJobs; j++ {
        jobs <- j
    }

    close(jobs)

    // Receive results
    for a := 1; a <= totalJobs; a++ {
        <-results
    }

    close(results)
}

func worker(id int, jobs <-chan int, results chan<- int) {
    var wg sync.WaitGroup

    for j := range jobs {
        wg.Add(1)

        go func(job int) {
            defer wg.Done()

            fmt.Printf("Worker %d started job %dn", id, job)

            // Do work and send result
            result := job * 2
            results <- result

            fmt.Printf("Worker %d finished job %dn", id, job)
        }(j)
    }

    wg.Wait()
}
Вход в полноэкранный режим Выход из полноэкранного режима

Как и ожидалось, наши задания были распределены между рабочими.

$ go run main.go
Worker 2 started job 4
Worker 2 started job 1
Worker 1 started job 3
Worker 2 started job 2
Worker 2 finished job 1
Worker 1 finished job 3
Worker 2 finished job 2
Worker 2 finished job 4
Вход в полноэкранный режим Выход из полноэкранного режима

Очередь

Модель очереди позволяет нам обрабатывать n количество элементов за один раз.

В нашем примере мы используем буферизованный канал для имитации поведения очереди. Мы просто посылаем пустую структуру в наш канал queue и ждем, пока она не будет освобождена предыдущим процессом, чтобы мы могли продолжить.

Это происходит потому, что отправка в буферизованный канал блокируется только тогда, когда буфер заполнен, а получение блокируется, когда буфер пуст.

Здесь у нас общая работа 10 элементов, и у нас есть ограничение 2. Это означает, что мы можем обрабатывать 2 элемента одновременно.

Обратите внимание, что наш канал queue имеет тип struct{}, так как пустая структура занимает ноль байт памяти.

package main

import (
    "fmt"
    "sync"
    "time"
)

const limit = 2
const work = 10

func main() {
    var wg sync.WaitGroup

    fmt.Println("Queue limit:", limit)
    queue := make(chan struct{}, limit)

    wg.Add(work)

    for w := 1; w <= work; w++ {
        process(w, queue, &wg)
    }

    wg.Wait()

    close(queue)
    fmt.Println("Work complete")
}

func process(work int, queue chan struct{}, wg *sync.WaitGroup) {
    queue <- struct{}{}

    go func() {
        defer wg.Done()

        time.Sleep(1 * time.Second)
        fmt.Println("Processed:", work)

        <-queue
    }()
}
Вход в полноэкранный режим Выход из полноэкранного режима

Если мы запустим эту программу, то заметим, что она ненадолго приостанавливается, когда обрабатывается каждый второй элемент (это наш предел), поскольку наша очередь ожидает отмены очереди.

$ go run main.go
Queue limit: 2
Processed: 1
Processed: 2
Processed: 4
Processed: 3
Processed: 5
Processed: 6
Processed: 8
Processed: 7
Processed: 9
Processed: 10
Work complete
Вход в полноэкранный режим Выход из полноэкранного режима

Дополнительные шаблоны?

Существуют некоторые дополнительные детали, которые может быть полезно знать:

  • Канал тройника
  • Канал мостика
  • Кольцевой буферный канал
  • Ограниченный параллелизм

Это практически все для этого урока, я надеюсь, что он был полезен. Увидимся в следующем уроке!

Оцените статью
Procodings.ru
Добавить комментарий