[Go][Pion/WebRTC] Закрытие chan и добавление DataChannel


Введение

Я попробую обновить отправку данных через каналы goroutine и добавить Data-каналы WebRTC.

  • [Go] Попробуйте Pion/WebRTC с SSE

Примеры

  • webappsample — GitHub

[Go] Закрытие chan

В прошлый раз я написал обработку событий «webrtc.PeerConnection» с помощью каналов, как показано ниже.

sseClient.go

...
type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    userName              string
    w                     http.ResponseWriter
}

func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
    return &SSEClient{
        candidateFound:        make(chan *webrtc.ICECandidate),
        changeConnectionState: make(chan webrtc.PeerConnectionState),
        addTrack:              make(chan *webrtc.TrackRemote),
        userName:              userName,
        w:                     w,
    }
}

func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    flusher, _ := w.(http.Flusher)
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
    }()
    for {
        select {
        case candidate := <-newClient.candidateFound:
            jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
            if err != nil {
                return
            }
            fmt.Fprintf(w, "data: %snn", jsonValue)
            flusher.Flush()
        case track := <-newClient.addTrack:
            hub.addTrack <- track
        case connectionState := <-newClient.changeConnectionState:
            switch connectionState {
            case webrtc.PeerConnectionStateFailed:
                return
            case webrtc.PeerConnectionStateClosed:
                return
            }
        case <-r.Context().Done():
            return
        }
    }
}
...
Вход в полноэкранный режим Выйти из полноэкранного режима

peerConnectionState.go

...
type PeerConnectionState struct {
    peerConnection        *webrtc.PeerConnection
    client                *SSEClient
}
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
        if i == nil {
            return
        }
        client.candidateFound <- i
    })
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // panic after closing all SSEClient channels.
        client.changeConnectionState <- p
    })
    peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
        client.addTrack <- t
    })

    return &PeerConnectionState{
        peerConnection: peerConnection,
        client:         client,
    }, nil
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Но после закрытия всех каналов SSEClient в «peerConnection.OnConnectionStateChange» произойдет паника.
Потому что он пытался отправить состояние «закрыто» по закрытому каналу.

Поэтому я добавляю еще один канал и проверяю, закрыты ли каналы перед отправкой значений.

sseClient.go

...
type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    heartbeat             chan int
    userName              string
    w                     http.ResponseWriter
}
...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
        close(newClient.heartbeat)
    }()
...
}
Вход в полноэкранный режим Выйти из полноэкранного режима

peerConnectionState.go

...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    // must be a "buffered channel" to avoid blocking on setting the first value.
    heartbeat := make(chan int, 1)
    candidateFound := make(chan *webrtc.ICECandidate)
    changeConnectionState := make(chan webrtc.PeerConnectionState)
    addTrack := make(chan *webrtc.TrackRemote)

    // set the first value to avoid blocking on reading the channel value on first time.
    heartbeat <- 1
...
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // until the channel being closing, "ok" value keeps being "true".
        _, ok := <-heartbeat
        if ok {
            changeConnectionState <- p
            // write value for next reading.
            heartbeat <- 1
        }
    })
...
Войти в полноэкранный режим Выйти из полноэкранного режима

Но я все еще не смог решить проблему после изменения.

Хотя я закрыл все каналы, значение «ok» не изменилось.

peerConnectionState.go

...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // "ok" value is "true".
        _, ok := <-heartbeat
        if ok {
            // but the channels are closed.
            changeConnectionState <- p
            heartbeat <- 1
        }
    })
...
Войти в полноэкранный режим Выйти из полноэкранного режима

Таким образом, я прочитал все значения «heartbeat» перед его закрытием.

sseClient.go

...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
    defer func() {
        hub.unregister <- ps
        // read all values before closing.
        for i := 0; i < len(newClient.heartbeat); i++ {
            <-newClient.heartbeat
        }
        close(newClient.heartbeat)
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
    }()
...
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Наконец, я переместил каналы, обрабатывающие события WebRTC, в PeerConnectionState.
Потому что события принадлежат webrtc.PeerConnection.

  • Go言語による並行処理(Concurrency in Go)
  • Эффективный Go — Язык программирования Go

[WebRTC] Добавление WebRTC DataChannel

Теперь я попробую отправлять данные через WebRTC DataChannel.

Я могу отправлять тексты (string) и двоичные файлы (ex.UInt8Array) по WebRTC DataChannel.
Я могу согласовать соединение RTCDataChannel с приложением.

Для этого я должен установить «negotiated» в RTCDataChannelInit в true при создании экземпляров RTCDataChannel из RTCPeerConnection.

const dc = peerConnection.createDataChannel(label, {
    id,
    negotiated: true,
    ordered: false
});
// handling received data as texts.
const decoder = new TextDecoder("utf-8");
dc.onmessage = (ev) => {        
    const message = decoder.decode(new Uint8Array(ev.data));
    // TODO: handle string value.
};
Вход в полноэкранный режим Выход из полноэкранного режима

Я также должен создать их до завершения первого обмена предложениями/ответами.

Теперь я могу разделить RTCDataChannels по их идентификаторам.

Я могу создать webrtc.DataChannel таким же образом в Go.

func newWebRTCDataChannel(label string, id uint16, peerConnection *webrtc.PeerConnection) (*webrtc.DataChannel, error) {
    negotiated := true
    ordered := false
    dc, err := peerConnection.CreateDataChannel(label, &webrtc.DataChannelInit{
        ID:         &id,
        Negotiated: &negotiated,
        Ordered:    &ordered,
    })
    if err != nil {
        return nil, err
    }
    dc.OnMessage(func(msg webrtc.DataChannelMessage) {
        message := string(msg.Data)
        // TODO: handle string value.
    })
    return dc, nil
}
Вход в полноэкранный режим Выход из полноэкранного режима

В это время я просто отправляю данные от одного клиента другим клиентам, и ничего не меняю на стороне сервера.

dc.DataChannels[id].Send(msg.Data)
Вход в полноэкранный режим Выход из полноэкранного режима

Еще один важный момент — DataChannels должны быть закрыты до закрытия PeerConnection.

  • WebRTC 1.0: Коммуникация в реальном времени между браузерами
  • RFC8831 — Каналы данных WebRTC
  • RFC8832 — Протокол установления канала данных WebRTC
  • RTCDataChannel — Web APIs|MDN
  • Использование каналов данных WebRTC — Web APIs|MDN
  • WebRTC DataChannel コトハジメ — voluntas — GitHub
  • ハイパフォーマンス ブラウザネットワーキング(High Performance Browser Networking)

Оцените статью
Procodings.ru
Добавить комментарий