Продвинутый параллелизм в Go

Узнайте, как использовать пакеты Go singleflight и errgroup, а также другие важные шаблоны проектирования для параллелизма на реальных примерах.

Если вы уже давно используете Go, то, вероятно, знаете о некоторых основных примитивах параллелизма Go:

  • Ключевое слово go для порождения goroutines
  • Каналы, для связи между горутинами
  • пакет context для распространения отмены
  • Пакеты sync и sync/atomic для примитивов нижнего уровня, таких как мьютексы и атомарный доступ к памяти.

Эти возможности языка и пакеты обеспечивают очень богатый набор инструментов для создания параллельных приложений.
Что вы, возможно, еще не обнаружили, так это набор примитивов параллелизма более высокого уровня, доступных в «расширенной стандартной библиотеке» по адресу golang.org/x/sync.
Мы рассмотрим их в этой статье.

Пакет singleflight

Как говорится в документации к пакету, этот пакет предоставляет механизм подавления дублирующихся вызовов функций.

Этот пакет чрезвычайно полезен в случаях, когда вы выполняете что-то вычислительно дорогое (или просто медленное, например, доступ к сети) в ответ на действия пользователя.
Например, допустим, у вас есть база данных с информацией о погоде в каждом городе, и вы хотите представить ее в виде API.
В некоторых случаях несколько пользователей могут одновременно запрашивать погоду для одного и того же города.

Когда это происходит, разве не было бы здорово, если бы вы могли просто запросить базу данных, а затем поделиться результатом со всеми ожидающими запроса?
Именно это и делает пакет singleflight!

Чтобы использовать его, создайте где-нибудь singleflight.Group. Для корректной работы она должна быть общей для всех запросов.
Затем оберните медленную или дорогостоящую операцию в вызов group.Do(key, fn). Несколько одновременных запросов на один и тот же key
вызовут fn только один раз, а результат будет возвращен всем вызывающим после возврата fn.

Вот как это выглядит на практике:

package weather

type Info struct {
    TempC, TempF int // temperature in Celsius and Farenheit
    Conditions string // "sunny", "snowing", etc
}

var group singleflight.Group

func City(city string) (*Info, error) {
    results, err, _ := group.Do(city, func() (interface{}, error) {
        info, err := fetchWeatherFromDB(city) // slow operation
        return info, err
    })
    if err != nil {
        return nil, fmt.Errorf("weather.City %s: %w", city, err)
    }
    return results.(*Info), nil
}
Вход в полноэкранный режим Выход из полноэкранного режима

Обратите внимание, что закрытие, которое мы передаем в group.Do, должно возвращать (interface{}, error) для работы с системой типов Go.
Третье возвращаемое значение из group.Do, которое игнорируется в приведенном примере, указывает, был ли результат разделен между несколькими вызывающими сторонами.
разделен между несколькими вызывающими сторонами или нет.

Пакет errgroup

Еще один бесценный пакет — это пакет errgroup.
Его лучше всего описать как sync.WaitGroup, но в нем задачи возвращают ошибки, которые передаются обратно официанту.

Этот пакет полезен, когда у вас есть несколько операций, которые вы хотите подождать, но вы также хотите определить.
все ли они успешно завершены.
Например, в развитие примера с погодой, приведенного выше, предположим, что вы хотите просмотреть погоду для нескольких городов
одновременно, и отказать, если какой-либо из них завершится неудачно.

Начните с определения errgroup.Group и используйте метод group.Go(fn func() error) для каждого города.
Этот метод порождает goroutine для выполнения задания. Когда вы породите все нужные вам задачи, используйте метод

Ошибка будет nil тогда и только тогда, когда все задания вернут ошибку nil.

На практике это выглядит следующим образом:

func Cities(cities ...string) ([]*Info, error) {
    var g errgroup.Group
    var mu sync.Mutex
    res := make([]*Info, len(cities)) // res[i] corresponds to cities[i]

    for i, city := range cities {
        i, city := i, city // create locals for closure below
        g.Go(func() error {
            info, err := City(city)
            mu.Lock()
            res[i] = info
            mu.Unlock()
            return err
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return res, nil
}
Войти в полноэкранный режим Выход из полноэкранного режима

Здесь мы выделяем срез результатов, чтобы каждая горутина могла записывать в свой собственный индекс.
Хотя приведенный выше код безопасен даже без мьютекса mu, поскольку каждая горутина записывает в свою собственную запись в срезе, мы все равно используем его на случай, если код будет изменен со временем.

Ограниченный параллелизм

Приведенный выше код будет искать информацию о погоде для всех заданных городов одновременно.
Это хорошо, когда число городов невелико, но может вызвать проблемы с производительностью, если число городов велико.
В таких случаях полезно ввести ограниченный параллелизм.

В Go очень легко создать ограниченный параллелизм с помощью семафоров. Семафор — это примитив параллелизма, с которым вы могли сталкиваться, если изучали информатику, но если нет, не волнуйтесь. Семафоры можно использовать для нескольких целей, но мы будем использовать их для отслеживания количества запущенных задач и для блокировки до тех пор, пока не освободится место для запуска другой задачи.

В Go мы можем добиться этого с помощью умного использования каналов! Если мы хотим разрешить одновременное выполнение до 10 задач,
мы создаем канал с пространством для 10 элементов: semaphore := make(chan struct{}, 10). Вы можете представить это как трубу, в которую помещается 10 шариков.

Для запуска новой задачи, блокировки, если слишком много задач уже запущено, мы просто пытаемся отправить значение по каналу: semaphore <- struct{}{}. Это аналогично попытке протолкнуть еще один шарик в трубу. Если труба переполнена, она ждет, пока не освободится место.

Когда задача завершена, пометьте ее как таковую, взяв значение из канала: <-semaphore. Это аналогично вытаскиванию шарика на другом конце трубы, что оставляет место для вталкивания другого шарика (запуска другой задачи).

Вот и все! Наш модифицированный Cities выглядит следующим образом:

func Cities(cities ...string) ([]*Info, error) {
    var g errgroup.Group
    var mu sync.Mutex
    res := make([]*Info, len(cities)) // res[i] corresponds to cities[i]
    sem := make(chan struct{}, 10)
    for i, city := range cities {
        i, city := i, city // create locals for closure below
        sem <- struct{}{}
        g.Go(func() error {
            info, err := City(city)
            mu.Lock()
            res[i] = info
            mu.Unlock()
            <-sem
            return err
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return res, nil
}
Вход в полноэкранный режим Выход из полноэкранного режима

Взвешенный ограниченный параллелизм

И, наконец, иногда вам нужен ограниченный параллелизм, но не все задачи одинаково дороги.
В этом случае количество потребляемых ресурсов будет сильно варьироваться в зависимости от распределения
дешевых и дорогих задач и того, как они стартуют.

Лучшим решением для этого случая является использование взвешенного ограниченного параллелизма.
Это работает просто: вместо того, чтобы рассуждать о количестве задач, которые мы хотим выполнять одновременно,
мы придумываем «стоимость» для каждой задачи и получаем и освобождаем эту стоимость из семафора.

Мы больше не можем моделировать это с помощью каналов, поскольку нам нужна вся стоимость, полученная и освобожденная за один раз.
К счастью, «расширенная стандартная библиотека» снова приходит нам на помощь!
Пакет golang.org/x/sync/sempahore
предоставляет взвешенную реализацию семафора именно для этой цели.

Операция sem <- struct{}{} называется «Acquire», а операция <-sem называется «Release».
Вы заметите, что метод semaphore.Acquire возвращает ошибку; это потому, что он может быть использован
с пакетом context, чтобы прервать операцию раньше времени. Для целей данного примера мы проигнорируем ее.

Пример с поиском погоды на самом деле слишком прост, чтобы требовать взвешенного семафора,
но для простоты представим, что стоимость меняется в зависимости от длины названия города.
Тогда мы получим следующее:

func Cities(cities ...string) ([]*Info, error) {
    ctx := context.TODO() // replace with a real context
    var g errgroup.Group
    var mu sync.Mutex
    res := make([]*Info, len(cities)) // res[i] corresponds to cities[i]
    sem := semaphore.NewWeighted(100) // 100 chars processed concurrently
    for i, city := range cities {
        i, city := i, city // create locals for closure below
        cost := int64(len(city))
        if err := sem.Acquire(ctx, cost); err != nil {
            break
        }
        g.Go(func() error {
            info, err := City(city)
            mu.Lock()
            res[i] = info
            mu.Unlock()
            sem.Release(cost)
            return err
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    } else if err := ctx.Err(); err != nil {
        return nil, err
    }
    return res, nil
}
Войти в полноэкранный режим Выход из полноэкранного режима

Заключение

Приведенные выше примеры показывают, как легко добавить параллелизм в программу на Go, а затем доработать ее в соответствии с вашими потребностями.

Отзывы, исправления и предложения по улучшению этой статьи приветствуются! Пожалуйста, свяжитесь со мной в Twitter.

До встречи в облаке,
Андре

Эта статья была первоначально опубликована в блоге Энкор 18-02-2020.

Оцените статью
Procodings.ru
Добавить комментарий