[Go] Попробуйте Pion/WebRTC с SSE


Введение

На этот раз я попробую Pion/WebRTC.

  • pion/webrtc — GitHub

Поскольку у Pion уже есть хорошие примеры, я создам пример на основе SFU-WebSocket из example-webrtc-applications.

  • example-webrtc-applications/sfu-ws — pion/example-webrtc-applications — GitHub

Я попробую изменить эти пункты.

  • Использовать SSE(Server-Sent Events) для сигнализации
  • Начать подключаться вручную

Я добавлю функции WebRTC в последний пример проекта.

  • [Go] Try Server-Sent Events

И я также ссылаюсь на этот пост (особенно на стороне клиента).

  • [ASP.NET Core][TypeScript] Попробуйте WebRTC

Среды

  • Go ver.go1.18.2 windows/amd64
  • Node.js ver.18.1.0

Соединение с WebRTC SFU

Когда я пробовал WebRTC в прошлый раз, приложение на стороне сервера работало только для сигнализации.
После передачи сигнала клиенты напрямую соединялись друг с другом.

В этот раз они будут подключаться только к приложению на стороне сервера.
После подключения клиенты будут отправлять видео- и аудиодорожки в приложение на стороне сервера.
А приложение на стороне сервера отправляет клиентам треки других клиентов в качестве удаленных треков.

Образцы

На этот раз я публикую пример проекта на GitHub.

  • webappsample — GitHub

Клиентская сторона

Поскольку процесс подключения начинается с приложения на стороне сервера.

Поэтому на стороне клиента нужно только обрабатывать события предложения и ICE кандидата.
И я создам RTCPeerConnection при запуске.

main.page.ts

...
function handleReceivedMessage(value: string) {
    const message = JSON.parse(value);
    if(!checkIsClientMessage(message)) {
        return;
    }
    switch(message.event) {
        case "text":
            view.addReceivedText({ user: message.userName, message: message.data });
            break;
        case "offer":
            webrtc.handleOffer(JSON.parse(message.data));
            break;
        case "candidate":
            webrtc.handleCandidate(JSON.parse(message.data));
            break;
    }
}
function sendAnswer(data: RTCSessionDescriptionInit) {
    if(!hasAnyTexts(userName)) {
        return;
    }
    sse.sendMessage({userName, event: "answer", data: JSON.stringify(data)});
}
function sendCandidate(data: RTCIceCandidate) {
    if(!hasAnyTexts(userName)) {
        return;
    }
    sse.sendMessage({userName, event: "candidate", data: JSON.stringify(data)});
}
function checkIsClientMessage(value: any): value is ClientMessage {
    // All messages from the server-side application are sent as "ClientMessage".
    if(value == null) {
        return false;
    }
    if(("event" in value &&
        typeof value["event"] === "string") === false) {
        return false;
    }
    if(("data" in value &&
        typeof value["data"] === "string") === false) {
        return false;
    }
    return true;
}
init();
Вход в полноэкранный режим Выход из полноэкранного режима

webrtc.controller.ts

export class WebRtcController {
    private webcamStream: MediaStream|null = null; 
    private peerConnection: RTCPeerConnection|null = null;
    private answerSentEvent: ((data: RTCSessionDescriptionInit) => void)|null = null;
    private candidateSentEvent: ((data: RTCIceCandidate) => void)|null = null;
    public init() {
        const localVideo = document.getElementById("local_video") as HTMLVideoElement;
        localVideo.addEventListener("canplay", () => {
            const width = 320;
            const height = localVideo.videoHeight / (localVideo.videoWidth/width);          
            localVideo.setAttribute("width", width.toString());
            localVideo.setAttribute("height", height.toString());
          }, false);
        navigator.mediaDevices.getUserMedia({ video: true, audio: true })
          .then(stream => {
              localVideo.srcObject = stream;
              localVideo.play();
              this.webcamStream = stream;
              // create a RTCPeerConnection after getting local MediaStream
              this.connect();
          })
          .catch(err => console.error(`An error occurred: ${err}`));
    }
...
    /** handle received offer and send answer */
    public handleOffer(data: RTCSessionDescription|null|undefined) {
        if(this.peerConnection == null ||
                data == null) {
            return;
        }
        this.peerConnection.setRemoteDescription(data);
        this.peerConnection.createAnswer()
            .then(answer => {
                if(this.peerConnection != null) {
                    this.peerConnection.setLocalDescription(answer);
                }
                if(this.answerSentEvent != null) {
                    this.answerSentEvent(answer);
                }
            });
    }
    /** add ICE Candidate */
    public handleCandidate(data: RTCIceCandidate|null|undefined) {
        if(this.peerConnection == null ||
            data == null) {
            return;
        }
        this.peerConnection.addIceCandidate(data);
    }
    private connect() {
        if(this.webcamStream == null) {
            return;
        }
        this.peerConnection = new RTCPeerConnection({
            iceServers: [{
                urls: `stun:stun.l.google.com:19302`,  // A STUN server              
            }]
        });
        // Add remote video tracks as new video elements.
        this.peerConnection.ontrack = (ev) => {
            if (ev.track.kind === "audio" ||
                ev.streams[0] == null) {
              return;
            }    
            const remoteVideo = document.createElement("video");
            remoteVideo.srcObject = ev.streams[0];
            remoteVideo.autoplay = true;
            remoteVideo.controls = true;
            const videoArea = document.getElementById("remote_video_area") as HTMLElement;
            videoArea.appendChild(remoteVideo);
            ev.track.onmute = () => {
                remoteVideo.play();
            };
            ev.streams[0].onremovetrack = () => {
              if (remoteVideo.parentNode) {
                remoteVideo.parentNode.removeChild(remoteVideo);
              }
            };
          };
        this.webcamStream.getTracks().forEach(track => {
            if(this.peerConnection == null ||
                this.webcamStream == null) {
                return;
            }
            this.peerConnection.addTrack(track, this.webcamStream)
        });
        this.peerConnection.onicecandidate = ev => {
            if (ev.candidate == null ||
                this.candidateSentEvent == null) {
              return;
            }
            this.candidateSentEvent(ev.candidate);
        };
    }   
}
Войти в полноэкранный режим Выход из полноэкранного режима

На стороне сервера

sseClient.go

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"

    "github.com/pion/webrtc/v3"
)

type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    userName              string
    w                     http.ResponseWriter
}

func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
    return &SSEClient{
        candidateFound:        make(chan *webrtc.ICECandidate),
        changeConnectionState: make(chan webrtc.PeerConnectionState),
        addTrack:              make(chan *webrtc.TrackRemote),
        userName:              userName,
        w:                     w,
    }
}

func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
    userName, err := getParam(r, "user")
    if err != nil {
        log.Println(err.Error())
        fmt.Fprint(w, "The parameters have no names")
        return
    }
    newClient := newSSEClient(userName, w)
    ps, err := NewPeerConnectionState(newClient)
    if err != nil {
        log.Println(err.Error())
        fmt.Fprint(w, "Failed connection")
        return
    }
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    hub.register <- ps

    // For pushing data to clients, I call "flusher.Flush()"
    flusher, _ := w.(http.Flusher)
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
    }()
    for {
        // handle PeerConnection events and close SSE event.
        select {
        case candidate := <-newClient.candidateFound:
            jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
            if err != nil {
                log.Println(err.Error())
                return
            }
            fmt.Fprintf(w, "data: %snn", jsonValue)
            flusher.Flush()
        case track := <-newClient.addTrack:
            hub.addTrack <- track
        case connectionState := <-newClient.changeConnectionState:
            switch connectionState {
            case webrtc.PeerConnectionStateFailed:
                return
            case webrtc.PeerConnectionStateClosed:
                return
            }
        case <-r.Context().Done():
            // when "es.close()" is called, this loop operation will be ended.
            return
        }
    }
}
func sendSSEMessage(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
    w.Header().Set("Content-Type", "application/json")
    body, err := ioutil.ReadAll(r.Body)

    if err != nil {
        log.Println(err.Error())
        j, _ := json.Marshal(GetFailed("Failed reading values from body"))
        w.Write(j)
        return
    }
    message := &ClientMessage{}
    err = json.Unmarshal(body, &message)
    if err != nil {
        log.Println(err.Error())
        j, _ := json.Marshal(GetFailed("Failed converting to ClientMessage"))
        w.Write(j)
        return
    }
    w.WriteHeader(200)
    hub.broadcast <- *message
    data, _ := json.Marshal(GetSucceeded())
    w.Write(data)
}
Войти в полноэкранный режим Выход из полноэкранного режима

peerConnectionState.go

package main

import (
    "github.com/pion/webrtc/v3"
)

type PeerConnectionState struct {
    peerConnection *webrtc.PeerConnection
    client         *SSEClient
}

func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
    peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{
        ICEServers: []webrtc.ICEServer{
            {
                URLs: []string{
                    "stun:stun.l.google.com:19302",
                },
            },
        },
    })
    if err != nil {
        return nil, err
    }
    for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
        if _, err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
            Direction: webrtc.RTPTransceiverDirectionRecvonly,
        }); err != nil {
            return nil, err
        }
    }
    // Add event handlers.
    peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
        if i == nil {
            return
        }
        client.candidateFound <- i
    })
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // avoid panic after closing channel
        if p == webrtc.PeerConnectionStateClosed {
            _, ok := <-client.changeConnectionState
            if ok {
                client.changeConnectionState <- p
            }
            return
        }
        client.changeConnectionState <- p
    })
    peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
        client.addTrack <- t
    })

    return &PeerConnectionState{
        peerConnection: peerConnection,
        client:         client,
    }, nil
}
Войти в полноэкранный режим Выход из полноэкранного режима

sseHub.go

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/pion/rtcp"
    "github.com/pion/webrtc/v3"
)

type SSEHub struct {
    clients     map[*PeerConnectionState]bool
    broadcast   chan ClientMessage
    register    chan *PeerConnectionState
    unregister  chan *PeerConnectionState
    trackLocals map[string]*webrtc.TrackLocalStaticRTP
    addTrack    chan *webrtc.TrackRemote
}

func newSSEHub() *SSEHub {
    return &SSEHub{
        clients:     make(map[*PeerConnectionState]bool),
        broadcast:   make(chan ClientMessage),
        register:    make(chan *PeerConnectionState),
        unregister:  make(chan *PeerConnectionState),
        trackLocals: map[string]*webrtc.TrackLocalStaticRTP{},
        addTrack:    make(chan *webrtc.TrackRemote),
    }
}
func (h *SSEHub) run() {
    go func() {
        for range time.NewTicker(time.Second * 3).C {
            dispatchKeyFrame(h)
        }
    }()
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
            signalPeerConnections(h)
        case client := <-h.unregister:
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                signalPeerConnections(h)
            }
        case track := <-h.addTrack:
            trackLocal, err := webrtc.NewTrackLocalStaticRTP(track.Codec().RTPCodecCapability,
                track.ID(), track.StreamID())
            if err != nil {
                log.Println(err.Error())
                return
            }
            h.trackLocals[track.ID()] = trackLocal
            signalPeerConnections(h)
            go updateTrackValue(h, track)

        case message := <-h.broadcast:
            handleReceivedMessage(h, message)
        }
    }
}
func updateTrackValue(h *SSEHub, track *webrtc.TrackRemote) {
    defer func() {
        delete(h.trackLocals, track.ID())
        signalPeerConnections(h)
    }()

    buf := make([]byte, 1500)

    for {
        i, _, err := track.Read(buf)
        if err != nil {
            return
        }
        if _, err = h.trackLocals[track.ID()].Write(buf[:i]); err != nil {
            return
        }
    }
}
func handleReceivedMessage(h *SSEHub, message ClientMessage) {
    switch message.Event {
    case TextEvent:
        m, _ := json.Marshal(message)
        jsonText := string(m)

        for client := range h.clients {
            flusher, _ := client.client.w.(http.Flusher)

            fmt.Fprintf(client.client.w, "data: %snn", jsonText)
            flusher.Flush()
        }
    case CandidateEvent:
        candidate := webrtc.ICECandidateInit{}
        if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
            log.Println(err)
            return
        }
        for pc := range h.clients {
            if pc.client.userName == message.UserName {
                if err := pc.peerConnection.AddICECandidate(candidate); err != nil {
                    log.Println(err)
                    return
                }
            }
        }
    case AnswerEvent:
        answer := webrtc.SessionDescription{}
        if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
            log.Println(err)
            return
        }
        for pc := range h.clients {
            if pc.client.userName == message.UserName {
                if err := pc.peerConnection.SetRemoteDescription(answer); err != nil {
                    log.Println(err)
                    return
                }
            }
        }

    }
}
func signalPeerConnections(h *SSEHub) {
    defer func() {
        dispatchKeyFrame(h)
    }()
    for syncAttempt := 0; ; syncAttempt++ {
        if syncAttempt == 25 {
            // Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
            go func() {
                time.Sleep(time.Second * 3)
                signalPeerConnections(h)
            }()
            return
        }
        // For ignoring errors like below, execute attemptSync multiple times.
        // InvalidModificationError: invalid proposed signaling state transition: have-local-offer->SetLocal(offer)->have-local-offer
        if !attemptSync(h) {
            break
        }
    }
}
// Share received tracks to all connected peers.
func attemptSync(h *SSEHub) bool {
    for ps := range h.clients {
        if ps.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
            delete(h.clients, ps)
            // We modified the slice, start from the beginning
            return true
        }
        existingSenders := map[string]bool{}

        for _, sender := range ps.peerConnection.GetSenders() {
            if sender.Track() == nil {
                continue
            }
            existingSenders[sender.Track().ID()] = true

            if _, ok := h.trackLocals[sender.Track().ID()]; !ok {
                if err := ps.peerConnection.RemoveTrack(sender); err != nil {
                    return true
                }
            }
        }
        for _, receiver := range ps.peerConnection.GetReceivers() {
            if receiver.Track() == nil {
                continue
            }
            existingSenders[receiver.Track().ID()] = true
        }
        for trackID := range h.trackLocals {
            if _, ok := existingSenders[trackID]; !ok {
                if _, err := ps.peerConnection.AddTrack(h.trackLocals[trackID]); err != nil {
                    return true
                }
            }
        }

        offer, err := ps.peerConnection.CreateOffer(nil)
        if err != nil {
            return true
        }
        messageJSON, err := NewOfferMessageJSON(ps.client.userName, offer)
        if err != nil {
            return true
        }

        if err = ps.peerConnection.SetLocalDescription(offer); err != nil {
            return true
        }
        flusher, _ := ps.client.w.(http.Flusher)

        fmt.Fprintf(ps.client.w, "data: %snn", messageJSON)
        flusher.Flush()
    }
    return false
}
func dispatchKeyFrame(h *SSEHub) {
    for ps := range h.clients {
        for _, receiver := range ps.peerConnection.GetReceivers() {
            if receiver.Track() == nil {
                continue
            }

            _ = ps.peerConnection.WriteRTCP([]rtcp.Packet{
                &rtcp.PictureLossIndication{
                    MediaSSRC: uint32(receiver.Track().SSRC()),
                },
            })
        }
    }
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Каналы

Я создаю каналы в SSEClient и SSEHub.
Я попробовал добавить несколько каналов в SSEClient, чтобы сначала отправлять сообщения с SSEHub.

Но если я сделал это, приложение зависает при отправке текстовых сообщений после подключения WebRTC.
Поскольку я думаю, что причина в круговой ссылке, я удалил эти каналы и отправил сообщения из SSEHub.

Ресурсы

  • pion/webrtc — GitHub
  • Pion
  • example-webrtc-applications/sfu-ws — pion/example-webrtc-applications — GitHub
  • WebRTC для любознательных
  • 実用 Go言語—システム開発の現場で知っておきたいアドバイス

Оцените статью
Procodings.ru
Добавить комментарий