通过webrtc和Alexa echo的音视频互通互联方案实现
webrtc开源库: Amazon Kinesis Video Streams C WebRTC SDK作者伍增田。
·
webrtc开源库: Amazon Kinesis Video Streams C WebRTC SDK
git clone --recursive https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c.git
作者 伍增田
该库是纯c写的, 不仅可以用在服务器上, 还可以用到嵌入式设备上, 对内存和SSD空间要求严格的场景.
WebRTC KVS(亚马逊版)环境编译
我司互联网iot平台www.meshare.com提供视频监控云服务,
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>LIVE</title>
<script src="assets/js/vue.min.js"></script>
<script src="assets/js/jquery.min.js"></script>
<style>
.app-video-live-container .hidden-dom{
display: none;
}
.app-video-live-container .a-lr-flex{
display: flex;
justify-content: space-between;
align-items: center;
padding: 0 15px;
}
.app-video-live-container .a-label{
padding: 10px 0;
font-size: 15px;
}
.app-video-live-container .app-video-box {
width: 100%;
}
.app-video-live-container .app-video-box video{
width: 100%;
vertical-align: bottom;
}
.app-video-live-container .btn-box {
font-size: 12px;
border: #00aaee 1px solid;
outline: 0;
background-color: #FFFFFF;
border-radius: 2px;
color: #00aaee;
cursor: pointer;
padding: 0;
}
.app-video-live-container .btn-box>span{
width: 30px;
display: inline-block;
padding: 2px 0;
}
.app-video-live-container .btn-box .current{
background: #00aaee;
color: #FFFFFF;
}
.logs-box{
font-size: 12px;
padding-top: 15px;
width: 100%;
overflow-x: hidden;
word-break: break-all;
}
</style>
</head>
<body>
<div class="app-video-live-container" id="app_live">
<div class="hidden-dom">
<label id="selectAudio">Local Audio :</label>
<select id="audios">
</select>
<br/>
<label id="selectVideo">Local Video :</label>
<select id="videos">
</select>
</div>
<div class="a-lr-flex">
<div class="a-label" @click="show_log = !show_log">
On-demand time: <span v-html="atime" style="width: 100px; display: inline-block"></span>
</div>
<button id="btnCall" class="btn-box" v-if="false">
<span :class="{'current': articulation === 'HD'}">HD</span>
<span :class="{'current': articulation === 'LD'}">LD</span>
<!-- {{articulation}}-->
</button>
</div>
<div class="app-video-box">
<video id="video" :muted="flag" @pause="videoPauseFunc" playsinline autoplay controls></video>
</div>
<!-- <button onclick="window.location.reload()">reload</button>-->
<div class="logs-box" id="logsbox" v-show="show_log"></div>
</div>
</body>
</html>
<script>
// import $ from 'jquery'
let inVplay;
new Vue({
el: '#app_live',
name: 'live',
// props: {
// startTime: String,
// physicalId: String,
// tokenId: String,
// offsetSeconds: Number,
// },
data(){
return {
atime: '',
token: '',
sdp: '',
physical_id: '',
articulation: 'LD', // LD | HD
show_log: false,
flag: false,
dom_video: null,
}
},
watch:{
flag(v){
this.log(`sound: ${v}`);
}
},
created() {
var paramsObj = new URLSearchParams(location.search);
// let {token, physical_id} = this.$router.history.current.params;
this.token = paramsObj.get('token');
this.physical_id = paramsObj.get('physical_id');
// console.log(this.$router.history.current.params);
},
mounted(){
// let {token} = this.$router.history.current.params;
let _self = this;
let token = _self.token
$("#btnCall").click(function () {
init();
if(_self.articulation === 'LD'){
_self.articulation = 'HD'
}else{
_self.articulation = 'LD';
}
call();
});
// $("#btnCallLD").click(function () {
// init();
// _self.articulation = 'HD';
// call("LD");
// });
const dataChannelOptions = {ordered: true};
const remoteVideo = document.getElementById('video');
const localAudios = document.getElementById('audios');
const localVideos = document.getElementById('videos');
//dom_video = remoteVideo;
const offerOptions = {
offerToReceiveAudio: 1, // 用于控制是否向远程对等方提供尝试发送音频的机会。如果该值为false,即使本地端将发送音频数据,也不会提供远程端点发送音频数据。如果此值为true,即使本地端不会发送音频数据,也将向远程端点发送音频数据。
offerToReceiveVideo: 1,
iceRestart: true,
//voiceActivityDetection: true
};
/*try {
log('Get local audio sources');
navigator.mediaDevices.enumerateDevices()
.then(gotSources);
} catch (e) {
console.log(e);
}*/
remoteVideo.onplay = function () {
log("The video has started to play");
setTimeout(() => {
// _self.flag = false;
},2500)
}
// remoteVideo.onreadystatechange = (e) => {
// console.log(e);
// }
remoteVideo.onloadstart = function () {
log("Starting to load video");
// remoteVideo.play();
}
remoteVideo.ondurationchange = function () {
log("The video duration has changed");
}
remoteVideo.onloadedmetadata = function () {
log("Meta data for video loaded");
// remoteVideo.play();
}
remoteVideo.onloadeddata = function () {
log("Browser has loaded the current frame");
}
remoteVideo.onprogress = function () {
log("Downloading video");
// remoteVideo.play();
}
remoteVideo.oncanplay = function () {
log("Can start playing video");
// remoteVideo.play();
}
remoteVideo.oncanplaythrough = function () {
log("Can play through video without stopping");
endTime = new Date().valueOf();
// $("#time").html((endTime - startTime) + "ms");
_self.atime = (endTime - startTime) + "ms";
// remoteVideo.play();
}
let dataChannelDataReceived, receiveChannel, localAudio, localVideo, sendChannel, sendDataLoop,
dataChannelCounter = 0;
let audioCount = 0;
let videoCount = 0;
let startTime = 0;
let endTime = 0;
const servers = {};
let remotePeerConnection = null;
function init() {
log('-----------------init--------------------');
log('Created remote peer connection object remotePeerConnection');
remotePeerConnection = new RTCPeerConnection(servers);
log('Created remote peer connection success');
}
let remoteVideoStream, remoteAudioStream;
async function call(resolution='LD') {
resolution = _self.articulation;
console.log(_self.articulation, resolution)
// Since the 'remote' side has no media stream we need
// to pass in the right constraints in order for it to
// accept the incoming offer of audio and video.
try {
//localAudio = localAudios.value;
//localVideo = localVideos.value;
log('Binding PeerConnection event start!');
remotePeerConnection.onicecandidate = e => onIceCandidate(remotePeerConnection, e);
remotePeerConnection.ontrack = gotRemoteStream;
remotePeerConnection.ondatachannel = receiveChannelCallback;
log('Binding PeerConnection event end!');
let constraints = {"audio": 1, "video": 0};
log('Received local stream:' + JSON.stringify(constraints));
startTime = new Date().valueOf();
let localStream = await navigator.mediaDevices.getUserMedia(constraints);
log('Received local stream success!time:' + (new Date().valueOf() - startTime));
log('Adding Local Stream to peer connection');
let tracks = localStream.getAudioTracks();
if(tracks.length === 0){
alert('Failed to get audio tracks');
return;
}
for (let i = 0; i < tracks.length; i++) {
let track = tracks[i];
log("Add track:" + track.label + "(" + track.kind + ")");
remotePeerConnection.addTrack(track, localStream);
}
// Create video track
let canvas = document.createElement("canvas");
let ctx = canvas.getContext("2d");
ctx.fillStyle = "black";
ctx.fill();
let canvasStream = canvas.captureStream(0);
let videoTrack = canvasStream.getVideoTracks()[0];
log("Add video track:" + videoTrack.label + "(" + videoTrack.kind + ")");
remotePeerConnection.addTrack(videoTrack);
log('CreateOffer:' + JSON.stringify(offerOptions));
let offer = await remotePeerConnection.createOffer(offerOptions);
$("#sdp").val(offer.sdp);
log('SetLocalDescription');
remotePeerConnection.setLocalDescription(offer);
log('---------------------------->starting http post ');
startTime = new Date().valueOf();
let sdp = encodeURIComponent(offer.sdp);
let params = {
"sdp": sdp,
"physical_id": _self.physical_id,
"token": _self.token,
"resolution": resolution
};
log(`physical_id: ${_self.physical_id}, token: ${_self.token}, ${resolution}`);
//remoteVideo.play();
let hostname = window.location.hostname;
let api_url = (/meshare.com$/).test(hostname) ? 'xxxx.meshare.com' : 'xxx.meshare.cn'
$.ajax({
url: `https://${api_url}/app/init_webrtc_session`,
method: "POST",
data: params,
dataType: 'json',
success: function (resp) {
if (resp.result == "ok") {
log('http response answer sdp!time:' + (new Date().valueOf() - startTime));
//let sdp = localStorage.getItem("sdp");
let sdp = resp.data["sdp"];
sdp = sdp.split('\n')
.map(l => l.trim())
.join('\r\n');
const answer = {
type: 'answer',
sdp: sdp
//sdp: sdp
};
$("#remoteSdp").val(sdp);
log('setRemoteDescription sdp:' + sdp.length);
remotePeerConnection.setRemoteDescription(answer);
log('Set session description success.');
// remoteVideo.play();
// _self.flag = true;
} else {
alert(JSON.stringify(resp));
}
}, error: function (a, b, c) {
alert("Error!");
log(a.responseText);
//window.location.href = "login.html";
}
});
} catch (e) {
log("Failed to create session description: " + e.toString());
}
}
async function onIceCandidate(pc, event) {
try {
log("onIceCandidate:" + JSON.stringify(event.candidate));
// eslint-disable-next-line no-unused-vars
//const ignore = await getOtherPc(pc).addIceCandidate(event.candidate);
onAddIceCandidateSuccess(pc);
} catch (e) {
onAddIceCandidateError(pc, e);
}
log(getName(pc) + ' ICE candidate:' + (event.candidate ? event.candidate.candidate : '(null)'));
}
function onAddIceCandidateSuccess(pc) {
let name = getName(pc);
log('AddIceCandidate ' + name + ' success.');
}
function onAddIceCandidateError(pc, error) {
let name = getName(pc);
log('Failed to add Ice Candidate ' + name + ':' + error.toString());
}
function getName(pc) {
return "remotePeerConnection";
}
function gotRemoteStream(e) {
console.log('Received remote streams');
console.log(e.streams);
if (e.streams.length > 0) {
for (let i = 0; i < e.streams.length; i++) {
let stream = e.streams[i];
log('Received remote stream:' + stream.id);
if (stream.id.toString().indexOf("video") >= 0) {
remoteVideoStream = stream;
// remoteVideo.srcObject = remoteVideoStream;
} else {
remoteAudioStream = stream;
// remoteVideo.srcObject = remoteAudioStream;
}
}
}
/*if (e.streams[0].id.toString().indexOf("video") >= 0) {
remoteVideoStream = e.streams[0];
remoteVideo.srcObject = remoteVideoStream;
} else {
remoteAudioStream = e.streams[0];
remoteVideo.srcObject = remoteAudioStream;
}*/
if (remoteAudioStream != null && remoteVideoStream != null) {
// 使用视频流的轨道和音频流的轨道创建一个新的(组合的)媒体流
// let stream = new MediaStream([remoteVideoStream.getVideoTracks()[0], remoteAudioStream.getAudioTracks()[0]]);
// log('Merge audio and video');
log('Merge audio and video start!');
let stream = new MediaStream([remoteVideoStream.getVideoTracks()[0], remoteAudioStream.getAudioTracks()[0]]);
log('Merge audio and video end!');
remoteVideo.srcObject = stream;
// remoteVideo.play();
}
}
function receiveChannelCallback(event) {
log('Receive Channel Callback');
receiveChannel = event.channel;
receiveChannel.onmessage = onReceiveMessageCallback;
receiveChannel.onopen = onReceiveChannelStateChange;
receiveChannel.onclose = onReceiveChannelStateChange;
}
function onReceiveMessageCallback(event) {
dataChannelDataReceived = event.data;
log("DataChannel receive counter: " + dataChannelDataReceived);
}
function onReceiveChannelStateChange() {
const readyState = receiveChannel.readyState;
log("Receive channel state is: " + readyState);
}
function log(msg) {
msg = "time:[" + new Date().valueOf() + "] " + msg;
console.log(msg);
$("#logsbox").append("<div>" + msg + "</div>");
}
function onSendChannelStateChange() {
const readyState = sendChannel.readyState;
log("Send channel state is: " + readyState);
if (readyState === 'open') {
sendDataLoop = setInterval(sendData, 1000);
} else {
clearInterval(sendDataLoop);
}
}
function sendData() {
if (sendChannel.readyState === 'open') {
sendChannel.send(dataChannelCounter);
log("DataChannel send counter: " + dataChannelCounter);
dataChannelCounter++;
}
}
init();
call();
},
methods: {
videoPauseFunc(){
this.log('video pause ----');
},
log(msg) {
msg = "time:[" + new Date().valueOf() + "] " + msg;
console.log(msg);
$("#logsbox").append("<div>" + msg + "</div>");
}
}
})
</script>
使用Amazon sdk开发answer端服务器, 和私有协议的摄像头互通实时视频, amazon-kinesis-video-streams-webrtc-sdk-c的example:
#define LOG_CLASS "WebRtcSamples"
#include "Samples.h"
PSampleConfiguration gSampleConfiguration = NULL;
VOID sigintHandler(INT32 sigNum)
{
UNUSED_PARAM(sigNum);
if (gSampleConfiguration != NULL) {
ATOMIC_STORE_BOOL(&gSampleConfiguration->interrupted, TRUE);
CVAR_BROADCAST(gSampleConfiguration->cvar);
}
}
VOID onDataChannelMessage(UINT64 customData, PRtcDataChannel pDataChannel, BOOL isBinary, PBYTE pMessage, UINT32 pMessageLen)
{
UNUSED_PARAM(customData);
UNUSED_PARAM(pDataChannel);
if (isBinary) {
DLOGI("DataChannel Binary Message");
} else {
DLOGI("DataChannel String Message: %.*s\n", pMessageLen, pMessage);
}
// Send a response to the message sent by the viewer
STATUS retStatus = STATUS_SUCCESS;
retStatus = dataChannelSend(pDataChannel, FALSE, (PBYTE) MASTER_DATA_CHANNEL_MESSAGE, STRLEN(MASTER_DATA_CHANNEL_MESSAGE));
if(retStatus != STATUS_SUCCESS) {
DLOGI("[KVS Master] dataChannelSend(): operation returned status code: 0x%08x \n", retStatus);
}
}
VOID onDataChannel(UINT64 customData, PRtcDataChannel pRtcDataChannel)
{
DLOGI("New DataChannel has been opened %s \n", pRtcDataChannel->name);
dataChannelOnMessage(pRtcDataChannel, customData, onDataChannelMessage);
}
VOID onConnectionStateChange(UINT64 customData, RTC_PEER_CONNECTION_STATE newState)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData;
CHK(pSampleStreamingSession != NULL && pSampleStreamingSession->pSampleConfiguration != NULL, STATUS_INTERNAL_ERROR);
PSampleConfiguration pSampleConfiguration = pSampleStreamingSession->pSampleConfiguration;
DLOGI("New connection state %u", newState);
switch (newState) {
case RTC_PEER_CONNECTION_STATE_CONNECTED:
ATOMIC_STORE_BOOL(&pSampleConfiguration->connected, TRUE);
CVAR_BROADCAST(pSampleConfiguration->cvar);
if (STATUS_FAILED(retStatus = logSelectedIceCandidatesInformation(pSampleStreamingSession))) {
DLOGW("Failed to get information about selected Ice candidates: 0x%08x", retStatus);
}
break;
case RTC_PEER_CONNECTION_STATE_FAILED:
// explicit fallthrough
case RTC_PEER_CONNECTION_STATE_CLOSED:
// explicit fallthrough
case RTC_PEER_CONNECTION_STATE_DISCONNECTED:
ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, TRUE);
CVAR_BROADCAST(pSampleConfiguration->cvar);
// explicit fallthrough
default:
ATOMIC_STORE_BOOL(&pSampleConfiguration->connected, FALSE);
CVAR_BROADCAST(pSampleConfiguration->cvar);
break;
}
CleanUp:
CHK_LOG_ERR(retStatus);
}
STATUS signalingClientStateChanged(UINT64 customData, SIGNALING_CLIENT_STATE state)
{
UNUSED_PARAM(customData);
STATUS retStatus = STATUS_SUCCESS;
PCHAR pStateStr;
signalingClientGetStateString(state, &pStateStr);
DLOGV("Signaling client state changed to %d - '%s'", state, pStateStr);
// Return success to continue
return retStatus;
}
STATUS signalingClientError(UINT64 customData, STATUS status, PCHAR msg, UINT32 msgLen)
{
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
DLOGW("Signaling client generated an error 0x%08x - '%.*s'", status, msgLen, msg);
// We will force re-create the signaling client on the following errors
if (status == STATUS_SIGNALING_ICE_CONFIG_REFRESH_FAILED || status == STATUS_SIGNALING_RECONNECT_FAILED) {
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, TRUE);
CVAR_BROADCAST(pSampleConfiguration->cvar);
}
return STATUS_SUCCESS;
}
STATUS logSelectedIceCandidatesInformation(PSampleStreamingSession pSampleStreamingSession)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
RtcStats rtcMetrics;
CHK(pSampleStreamingSession != NULL, STATUS_NULL_ARG);
rtcMetrics.requestedTypeOfStats = RTC_STATS_TYPE_LOCAL_CANDIDATE;
CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcMetrics));
DLOGD("Local Candidate IP Address: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.address);
DLOGD("Local Candidate type: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.candidateType);
DLOGD("Local Candidate port: %d", rtcMetrics.rtcStatsObject.localIceCandidateStats.port);
DLOGD("Local Candidate priority: %d", rtcMetrics.rtcStatsObject.localIceCandidateStats.priority);
DLOGD("Local Candidate transport protocol: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.protocol);
DLOGD("Local Candidate relay protocol: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.relayProtocol);
DLOGD("Local Candidate Ice server source: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.url);
rtcMetrics.requestedTypeOfStats = RTC_STATS_TYPE_REMOTE_CANDIDATE;
CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcMetrics));
DLOGD("Remote Candidate IP Address: %s", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.address);
DLOGD("Remote Candidate type: %s", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.candidateType);
DLOGD("Remote Candidate port: %d", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.port);
DLOGD("Remote Candidate priority: %d", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.priority);
DLOGD("Remote Candidate transport protocol: %s", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.protocol);
CleanUp:
LEAVES();
return retStatus;
}
STATUS handleAnswer(PSampleConfiguration pSampleConfiguration, PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage)
{
UNUSED_PARAM(pSampleConfiguration);
STATUS retStatus = STATUS_SUCCESS;
RtcSessionDescriptionInit answerSessionDescriptionInit;
MEMSET(&answerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit));
CHK_STATUS(deserializeSessionDescriptionInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &answerSessionDescriptionInit));
CHK_STATUS(setRemoteDescription(pSampleStreamingSession->pPeerConnection, &answerSessionDescriptionInit));
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
PVOID mediaSenderRoutine(PVOID customData)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
TID videoSenderTid = INVALID_TID_VALUE, audioSenderTid = INVALID_TID_VALUE;
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
while (!ATOMIC_LOAD_BOOL(&pSampleConfiguration->connected) && !ATOMIC_LOAD_BOOL(&pSampleConfiguration->appTerminateFlag)) {
CVAR_WAIT(pSampleConfiguration->cvar, pSampleConfiguration->sampleConfigurationObjLock, 5 * HUNDREDS_OF_NANOS_IN_A_SECOND);
}
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
CHK(!ATOMIC_LOAD_BOOL(&pSampleConfiguration->appTerminateFlag), retStatus);
if (pSampleConfiguration->videoSource != NULL) {
THREAD_CREATE(&videoSenderTid, pSampleConfiguration->videoSource, (PVOID) pSampleConfiguration);
}
if (pSampleConfiguration->audioSource != NULL) {
THREAD_CREATE(&audioSenderTid, pSampleConfiguration->audioSource, (PVOID) pSampleConfiguration);
}
if (videoSenderTid != INVALID_TID_VALUE) {
THREAD_JOIN(videoSenderTid, NULL);
}
if (audioSenderTid != INVALID_TID_VALUE) {
THREAD_JOIN(audioSenderTid, NULL);
}
CleanUp:
// clean the flag of the media thread.
ATOMIC_STORE_BOOL(&pSampleConfiguration->mediaThreadStarted, FALSE);
CHK_LOG_ERR(retStatus);
return NULL;
}
STATUS handleOffer(PSampleConfiguration pSampleConfiguration, PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage)
{
STATUS retStatus = STATUS_SUCCESS;
RtcSessionDescriptionInit offerSessionDescriptionInit;
NullableBool canTrickle;
BOOL mediaThreadStarted;
CHK(pSampleConfiguration != NULL && pSignalingMessage != NULL, STATUS_NULL_ARG);
MEMSET(&offerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit));
MEMSET(&pSampleStreamingSession->answerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit));
CHK_STATUS(deserializeSessionDescriptionInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &offerSessionDescriptionInit));
CHK_STATUS(setRemoteDescription(pSampleStreamingSession->pPeerConnection, &offerSessionDescriptionInit));
canTrickle = canTrickleIceCandidates(pSampleStreamingSession->pPeerConnection);
/* cannot be null after setRemoteDescription */
CHECK(!NULLABLE_CHECK_EMPTY(canTrickle));
pSampleStreamingSession->remoteCanTrickleIce = canTrickle.value;
CHK_STATUS(setLocalDescription(pSampleStreamingSession->pPeerConnection, &pSampleStreamingSession->answerSessionDescriptionInit));
/*
* If remote support trickle ice, send answer now. Otherwise answer will be sent once ice candidate gathering is complete.
*/
if (pSampleStreamingSession->remoteCanTrickleIce) {
CHK_STATUS(createAnswer(pSampleStreamingSession->pPeerConnection, &pSampleStreamingSession->answerSessionDescriptionInit));
CHK_STATUS(respondWithAnswer(pSampleStreamingSession));
DLOGD("time taken to send answer %" PRIu64 " ms",
(GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}
mediaThreadStarted = ATOMIC_EXCHANGE_BOOL(&pSampleConfiguration->mediaThreadStarted, TRUE);
if (!mediaThreadStarted) {
THREAD_CREATE(&pSampleConfiguration->mediaSenderTid, mediaSenderRoutine, (PVOID) pSampleConfiguration);
}
// The audio video receive routine should be per streaming session
if (pSampleConfiguration->receiveAudioVideoSource != NULL) {
THREAD_CREATE(&pSampleStreamingSession->receiveAudioVideoSenderTid, pSampleConfiguration->receiveAudioVideoSource,
(PVOID) pSampleStreamingSession);
}
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS sendSignalingMessage(PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pMessage)
{
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
// Validate the input params
CHK(pSampleStreamingSession != NULL && pSampleStreamingSession->pSampleConfiguration != NULL && pMessage != NULL, STATUS_NULL_ARG);
CHK(IS_VALID_MUTEX_VALUE(pSampleStreamingSession->pSampleConfiguration->signalingSendMessageLock) &&
IS_VALID_SIGNALING_CLIENT_HANDLE(pSampleStreamingSession->pSampleConfiguration->signalingClientHandle),
STATUS_INVALID_OPERATION);
MUTEX_LOCK(pSampleStreamingSession->pSampleConfiguration->signalingSendMessageLock);
locked = TRUE;
CHK_STATUS(signalingClientSendMessageSync(pSampleStreamingSession->pSampleConfiguration->signalingClientHandle, pMessage));
CleanUp:
if (locked) {
MUTEX_UNLOCK(pSampleStreamingSession->pSampleConfiguration->signalingSendMessageLock);
}
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS respondWithAnswer(PSampleStreamingSession pSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
SignalingMessage message;
UINT32 buffLen = MAX_SIGNALING_MESSAGE_LEN;
CHK_STATUS(serializeSessionDescriptionInit(&pSampleStreamingSession->answerSessionDescriptionInit, message.payload, &buffLen));
message.version = SIGNALING_MESSAGE_CURRENT_VERSION;
message.messageType = SIGNALING_MESSAGE_TYPE_ANSWER;
STRNCPY(message.peerClientId, pSampleStreamingSession->peerId, MAX_SIGNALING_CLIENT_ID_LEN);
message.payloadLen = (UINT32) STRLEN(message.payload);
message.correlationId[0] = '\0';
CHK_STATUS(sendSignalingMessage(pSampleStreamingSession, &message));
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
BOOL sampleFilterNetworkInterfaces(UINT64 customData, PCHAR networkInt)
{
UNUSED_PARAM(customData);
BOOL useInterface = FALSE;
if (STRNCMP(networkInt, (PCHAR) "eth0", ARRAY_SIZE("eth0")) == 0) {
useInterface = TRUE;
}
DLOGD("%s %s", networkInt, (useInterface) ? ("allowed. Candidates to be gathered") : ("blocked. Candidates will not be gathered"));
return useInterface;
}
VOID onIceCandidateHandler(UINT64 customData, PCHAR candidateJson)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData;
SignalingMessage message;
CHK(pSampleStreamingSession != NULL, STATUS_NULL_ARG);
if (candidateJson == NULL) {
DLOGD("ice candidate gathering finished");
ATOMIC_STORE_BOOL(&pSampleStreamingSession->candidateGatheringDone, TRUE);
// if application is master and non-trickle ice, send answer now.
if (pSampleStreamingSession->pSampleConfiguration->channelInfo.channelRoleType == SIGNALING_CHANNEL_ROLE_TYPE_MASTER &&
!pSampleStreamingSession->remoteCanTrickleIce) {
CHK_STATUS(createAnswer(pSampleStreamingSession->pPeerConnection, &pSampleStreamingSession->answerSessionDescriptionInit));
CHK_STATUS(respondWithAnswer(pSampleStreamingSession));
DLOGD("time taken to send answer %" PRIu64 " ms",
(GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
} else if (pSampleStreamingSession->pSampleConfiguration->channelInfo.channelRoleType == SIGNALING_CHANNEL_ROLE_TYPE_VIEWER &&
!pSampleStreamingSession->pSampleConfiguration->trickleIce) {
CVAR_BROADCAST(pSampleStreamingSession->pSampleConfiguration->cvar);
}
} else if (pSampleStreamingSession->remoteCanTrickleIce && ATOMIC_LOAD_BOOL(&pSampleStreamingSession->peerIdReceived)) {
message.version = SIGNALING_MESSAGE_CURRENT_VERSION;
message.messageType = SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE;
STRNCPY(message.peerClientId, pSampleStreamingSession->peerId, MAX_SIGNALING_CLIENT_ID_LEN);
message.payloadLen = (UINT32) STRNLEN(candidateJson, MAX_SIGNALING_MESSAGE_LEN);
STRNCPY(message.payload, candidateJson, message.payloadLen);
message.correlationId[0] = '\0';
CHK_STATUS(sendSignalingMessage(pSampleStreamingSession, &message));
}
CleanUp:
CHK_LOG_ERR(retStatus);
}
STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcPeerConnection* ppRtcPeerConnection)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
RtcConfiguration configuration;
UINT32 i, j, iceConfigCount, uriCount = 0, maxTurnServer = 1;
PIceConfigInfo pIceConfigInfo;
UINT64 data, curTime;
PRtcCertificate pRtcCertificate = NULL;
CHK(pSampleConfiguration != NULL && ppRtcPeerConnection != NULL, STATUS_NULL_ARG);
MEMSET(&configuration, 0x00, SIZEOF(RtcConfiguration));
// Set this to custom callback to enable filtering of interfaces
configuration.kvsRtcConfiguration.iceSetInterfaceFilterFunc = NULL;
// Set the ICE mode explicitly
configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL;
// Set the STUN server
SNPRINTF(configuration.iceServers[0].urls, MAX_ICE_CONFIG_URI_LEN, KINESIS_VIDEO_STUN_URL, pSampleConfiguration->channelInfo.pRegion);
if (pSampleConfiguration->useTurn) {
// Set the URIs from the configuration
CHK_STATUS(signalingClientGetIceConfigInfoCount(pSampleConfiguration->signalingClientHandle, &iceConfigCount));
/* signalingClientGetIceConfigInfoCount can return more than one turn server. Use only one to optimize
* candidate gathering latency. But user can also choose to use more than 1 turn server. */
for (uriCount = 0, i = 0; i < maxTurnServer; i++) {
CHK_STATUS(signalingClientGetIceConfigInfo(pSampleConfiguration->signalingClientHandle, i, &pIceConfigInfo));
for (j = 0; j < pIceConfigInfo->uriCount; j++) {
CHECK(uriCount < MAX_ICE_SERVERS_COUNT);
/*
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=udp" then ICE will try TURN over UDP
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
* if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=udp", it's currently ignored because sdk dont do TURN
* over DTLS yet. if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port" then ICE will try both TURN over UPD and TCP/TLS
*
* It's recommended to not pass too many TURN iceServers to configuration because it will slow down ice gathering in non-trickle mode.
*/
STRNCPY(configuration.iceServers[uriCount + 1].urls, pIceConfigInfo->uris[j], MAX_ICE_CONFIG_URI_LEN);
STRNCPY(configuration.iceServers[uriCount + 1].credential, pIceConfigInfo->password, MAX_ICE_CONFIG_CREDENTIAL_LEN);
STRNCPY(configuration.iceServers[uriCount + 1].username, pIceConfigInfo->userName, MAX_ICE_CONFIG_USER_NAME_LEN);
uriCount++;
}
}
}
pSampleConfiguration->iceUriCount = uriCount + 1;
// Check if we have any pregenerated certs and use them
// NOTE: We are running under the config lock
retStatus = stackQueueDequeue(pSampleConfiguration->pregeneratedCertificates, &data);
CHK(retStatus == STATUS_SUCCESS || retStatus == STATUS_NOT_FOUND, retStatus);
if (retStatus == STATUS_NOT_FOUND) {
retStatus = STATUS_SUCCESS;
} else {
// Use the pre-generated cert and get rid of it to not reuse again
pRtcCertificate = (PRtcCertificate) data;
configuration.certificates[0] = *pRtcCertificate;
}
curTime = GETTIME();
CHK_STATUS(createPeerConnection(&configuration, ppRtcPeerConnection));
DLOGD("time taken to create peer connection %" PRIu64 " ms", (GETTIME() - curTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
CleanUp:
CHK_LOG_ERR(retStatus);
// Free the certificate which can be NULL as we no longer need it and won't reuse
freeRtcCertificate(pRtcCertificate);
LEAVES();
return retStatus;
}
// Return ICE server stats for a specific streaming session
STATUS gatherIceServerStats(PSampleStreamingSession pSampleStreamingSession)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
RtcStats rtcmetrics;
UINT32 j = 0;
rtcmetrics.requestedTypeOfStats = RTC_STATS_TYPE_ICE_SERVER;
for (; j < pSampleStreamingSession->pSampleConfiguration->iceUriCount; j++) {
rtcmetrics.rtcStatsObject.iceServerStats.iceServerIndex = j;
CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcmetrics));
DLOGD("ICE Server URL: %s", rtcmetrics.rtcStatsObject.iceServerStats.url);
DLOGD("ICE Server port: %d", rtcmetrics.rtcStatsObject.iceServerStats.port);
DLOGD("ICE Server protocol: %s", rtcmetrics.rtcStatsObject.iceServerStats.protocol);
DLOGD("Total requests sent:%" PRIu64, rtcmetrics.rtcStatsObject.iceServerStats.totalRequestsSent);
DLOGD("Total responses received: %" PRIu64, rtcmetrics.rtcStatsObject.iceServerStats.totalResponsesReceived);
DLOGD("Total round trip time: %" PRIu64 "ms",
rtcmetrics.rtcStatsObject.iceServerStats.totalRoundTripTime / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}
CleanUp:
LEAVES();
return retStatus;
}
STATUS createSampleStreamingSession(PSampleConfiguration pSampleConfiguration, PCHAR peerId, BOOL isMaster,
PSampleStreamingSession* ppSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
RtcMediaStreamTrack videoTrack, audioTrack;
PSampleStreamingSession pSampleStreamingSession = NULL;
RtcRtpTransceiverInit audioRtpTransceiverInit;
RtcRtpTransceiverInit videoRtpTransceiverInit;
MEMSET(&videoTrack, 0x00, SIZEOF(RtcMediaStreamTrack));
MEMSET(&audioTrack, 0x00, SIZEOF(RtcMediaStreamTrack));
CHK(pSampleConfiguration != NULL && ppSampleStreamingSession != NULL, STATUS_NULL_ARG);
CHK((isMaster && peerId != NULL) || !isMaster, STATUS_INVALID_ARG);
pSampleStreamingSession = (PSampleStreamingSession) MEMCALLOC(1, SIZEOF(SampleStreamingSession));
CHK(pSampleStreamingSession != NULL, STATUS_NOT_ENOUGH_MEMORY);
if (isMaster) {
STRCPY(pSampleStreamingSession->peerId, peerId);
} else {
STRCPY(pSampleStreamingSession->peerId, SAMPLE_VIEWER_CLIENT_ID);
}
ATOMIC_STORE_BOOL(&pSampleStreamingSession->peerIdReceived, TRUE);
pSampleStreamingSession->pSampleConfiguration = pSampleConfiguration;
pSampleStreamingSession->rtcMetricsHistory.prevTs = GETTIME();
// if we're the viewer, we control the trickle ice mode
pSampleStreamingSession->remoteCanTrickleIce = !isMaster && pSampleConfiguration->trickleIce;
ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, FALSE);
ATOMIC_STORE_BOOL(&pSampleStreamingSession->candidateGatheringDone, FALSE);
CHK_STATUS(initializePeerConnection(pSampleConfiguration, &pSampleStreamingSession->pPeerConnection));
CHK_STATUS(peerConnectionOnIceCandidate(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession, onIceCandidateHandler));
CHK_STATUS(
peerConnectionOnConnectionStateChange(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession, onConnectionStateChange));
if (pSampleConfiguration->onDataChannel != NULL) {
CHK_STATUS(peerConnectionOnDataChannel(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession,
pSampleConfiguration->onDataChannel));
}
// Declare that we support H264,Profile=42E01F,level-asymmetry-allowed=1,packetization-mode=1 and Opus
CHK_STATUS(addSupportedCodec(pSampleStreamingSession->pPeerConnection, RTC_CODEC_H264_PROFILE_42E01F_LEVEL_ASYMMETRY_ALLOWED_PACKETIZATION_MODE));
CHK_STATUS(addSupportedCodec(pSampleStreamingSession->pPeerConnection, RTC_CODEC_OPUS));
// Add a SendRecv Transceiver of type video
videoTrack.kind = MEDIA_STREAM_TRACK_KIND_VIDEO;
videoTrack.codec = RTC_CODEC_H264_PROFILE_42E01F_LEVEL_ASYMMETRY_ALLOWED_PACKETIZATION_MODE;
videoRtpTransceiverInit.direction = RTC_RTP_TRANSCEIVER_DIRECTION_SENDRECV;
STRCPY(videoTrack.streamId, "myKvsVideoStream");
STRCPY(videoTrack.trackId, "myVideoTrack");
CHK_STATUS(addTransceiver(pSampleStreamingSession->pPeerConnection, &videoTrack, &videoRtpTransceiverInit,
&pSampleStreamingSession->pVideoRtcRtpTransceiver));
CHK_STATUS(transceiverOnBandwidthEstimation(pSampleStreamingSession->pVideoRtcRtpTransceiver, (UINT64) pSampleStreamingSession,
sampleBandwidthEstimationHandler));
// Add a SendRecv Transceiver of type video
audioTrack.kind = MEDIA_STREAM_TRACK_KIND_AUDIO;
audioTrack.codec = RTC_CODEC_OPUS;
audioRtpTransceiverInit.direction = RTC_RTP_TRANSCEIVER_DIRECTION_SENDRECV;
STRCPY(audioTrack.streamId, "myKvsVideoStream");
STRCPY(audioTrack.trackId, "myAudioTrack");
CHK_STATUS(addTransceiver(pSampleStreamingSession->pPeerConnection, &audioTrack, &audioRtpTransceiverInit,
&pSampleStreamingSession->pAudioRtcRtpTransceiver));
CHK_STATUS(transceiverOnBandwidthEstimation(pSampleStreamingSession->pAudioRtcRtpTransceiver, (UINT64) pSampleStreamingSession,
sampleBandwidthEstimationHandler));
// twcc bandwidth estimation
CHK_STATUS(peerConnectionOnSenderBandwidthEstimation(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession,
sampleSenderBandwidthEstimationHandler));
pSampleStreamingSession->firstFrame = TRUE;
pSampleStreamingSession->startUpLatency = 0;
CleanUp:
if (STATUS_FAILED(retStatus) && pSampleStreamingSession != NULL) {
freeSampleStreamingSession(&pSampleStreamingSession);
pSampleStreamingSession = NULL;
}
if (ppSampleStreamingSession != NULL) {
*ppSampleStreamingSession = pSampleStreamingSession;
}
return retStatus;
}
STATUS freeSampleStreamingSession(PSampleStreamingSession* ppSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleStreamingSession pSampleStreamingSession = NULL;
PSampleConfiguration pSampleConfiguration;
CHK(ppSampleStreamingSession != NULL, STATUS_NULL_ARG);
pSampleStreamingSession = *ppSampleStreamingSession;
CHK(pSampleStreamingSession != NULL && pSampleStreamingSession->pSampleConfiguration != NULL, retStatus);
pSampleConfiguration = pSampleStreamingSession->pSampleConfiguration;
DLOGD("Freeing streaming session with peer id: %s ", pSampleStreamingSession->peerId);
ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, TRUE);
if (pSampleStreamingSession->shutdownCallback != NULL) {
pSampleStreamingSession->shutdownCallback(pSampleStreamingSession->shutdownCallbackCustomData, pSampleStreamingSession);
}
if (IS_VALID_TID_VALUE(pSampleStreamingSession->receiveAudioVideoSenderTid)) {
THREAD_JOIN(pSampleStreamingSession->receiveAudioVideoSenderTid, NULL);
}
// De-initialize the session stats timer if there are no active sessions
// NOTE: we need to perform this under the lock which might be acquired by
// the running thread but it's OK as it's re-entrant
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
if (pSampleConfiguration->iceCandidatePairStatsTimerId != MAX_UINT32 && pSampleConfiguration->streamingSessionCount == 0 &&
pSampleConfiguration->iceCandidatePairStatsTimerId != MAX_UINT32) {
CHK_LOG_ERR(timerQueueCancelTimer(pSampleConfiguration->timerQueueHandle, pSampleConfiguration->iceCandidatePairStatsTimerId,
(UINT64) pSampleConfiguration));
pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32;
}
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
CHK_LOG_ERR(closePeerConnection(pSampleStreamingSession->pPeerConnection));
CHK_LOG_ERR(freePeerConnection(&pSampleStreamingSession->pPeerConnection));
SAFE_MEMFREE(pSampleStreamingSession);
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS streamingSessionOnShutdown(PSampleStreamingSession pSampleStreamingSession, UINT64 customData,
StreamSessionShutdownCallback streamSessionShutdownCallback)
{
STATUS retStatus = STATUS_SUCCESS;
CHK(pSampleStreamingSession != NULL && streamSessionShutdownCallback != NULL, STATUS_NULL_ARG);
pSampleStreamingSession->shutdownCallbackCustomData = customData;
pSampleStreamingSession->shutdownCallback = streamSessionShutdownCallback;
CleanUp:
return retStatus;
}
VOID sampleFrameHandler(UINT64 customData, PFrame pFrame)
{
UNUSED_PARAM(customData);
DLOGV("Frame received. TrackId: %" PRIu64 ", Size: %u, Flags %u", pFrame->trackId, pFrame->size, pFrame->flags);
PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData;
if (pSampleStreamingSession->firstFrame) {
pSampleStreamingSession->firstFrame = FALSE;
pSampleStreamingSession->startUpLatency = (GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
printf("Start up latency from offer to first frame: %" PRIu64 "ms\n", pSampleStreamingSession->startUpLatency);
}
}
VOID sampleBandwidthEstimationHandler(UINT64 customData, DOUBLE maxiumBitrate)
{
UNUSED_PARAM(customData);
DLOGV("received bitrate suggestion: %f", maxiumBitrate);
}
VOID sampleSenderBandwidthEstimationHandler(UINT64 customData, UINT32 txBytes, UINT32 rxBytes, UINT32 txPacketsCnt, UINT32 rxPacketsCnt,
UINT64 duration)
{
UNUSED_PARAM(customData);
UNUSED_PARAM(duration);
UNUSED_PARAM(rxBytes);
UNUSED_PARAM(txBytes);
UINT32 lostPacketsCnt = txPacketsCnt - rxPacketsCnt;
UINT32 percentLost = lostPacketsCnt * 100 / txPacketsCnt;
UINT32 bitrate = 1024;
if (percentLost < 2) {
// increase encoder bitrate by 2 percent
bitrate *= 1.02f;
} else if (percentLost > 5) {
// decrease encoder bitrate by packet loss percent
bitrate *= (1.0f - percentLost / 100.0f);
}
// otherwise keep bitrate the same
DLOGS("received sender bitrate estimation: suggested bitrate %u sent: %u bytes %u packets received: %u bytes %u packets in %lu msec, ", bitrate,
txBytes, txPacketsCnt, rxBytes, rxPacketsCnt, duration / 10000ULL);
}
STATUS handleRemoteCandidate(PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage)
{
STATUS retStatus = STATUS_SUCCESS;
RtcIceCandidateInit iceCandidate;
CHK(pSampleStreamingSession != NULL && pSignalingMessage != NULL, STATUS_NULL_ARG);
CHK_STATUS(deserializeRtcIceCandidateInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &iceCandidate));
CHK_STATUS(addIceCandidate(pSampleStreamingSession->pPeerConnection, iceCandidate.candidate));
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS traverseDirectoryPEMFileScan(UINT64 customData, DIR_ENTRY_TYPES entryType, PCHAR fullPath, PCHAR fileName)
{
UNUSED_PARAM(entryType);
UNUSED_PARAM(fullPath);
PCHAR certName = (PCHAR) customData;
UINT32 fileNameLen = STRLEN(fileName);
if (fileNameLen > ARRAY_SIZE(CA_CERT_PEM_FILE_EXTENSION) + 1 &&
(STRCMPI(CA_CERT_PEM_FILE_EXTENSION, &fileName[fileNameLen - ARRAY_SIZE(CA_CERT_PEM_FILE_EXTENSION) + 1]) == 0)) {
certName[0] = FPATHSEPARATOR;
certName++;
STRCPY(certName, fileName);
}
return STATUS_SUCCESS;
}
STATUS lookForSslCert(PSampleConfiguration* ppSampleConfiguration)
{
STATUS retStatus = STATUS_SUCCESS;
struct stat pathStat;
CHAR certName[MAX_PATH_LEN];
PSampleConfiguration pSampleConfiguration = *ppSampleConfiguration;
MEMSET(certName, 0x0, ARRAY_SIZE(certName));
pSampleConfiguration->pCaCertPath = getenv(CACERT_PATH_ENV_VAR);
// if ca cert path is not set from the environment, try to use the one that cmake detected
if (pSampleConfiguration->pCaCertPath == NULL) {
CHK_ERR(STRNLEN(DEFAULT_KVS_CACERT_PATH, MAX_PATH_LEN) > 0, STATUS_INVALID_OPERATION, "No ca cert path given (error:%s)", strerror(errno));
pSampleConfiguration->pCaCertPath = DEFAULT_KVS_CACERT_PATH;
} else {
// Check if the environment variable is a path
CHK(0 == FSTAT(pSampleConfiguration->pCaCertPath, &pathStat), STATUS_DIRECTORY_ENTRY_STAT_ERROR);
if (S_ISDIR(pathStat.st_mode)) {
CHK_STATUS(traverseDirectory(pSampleConfiguration->pCaCertPath, (UINT64) &certName, /* iterate */ FALSE, traverseDirectoryPEMFileScan));
if (certName[0] != 0x0) {
STRCAT(pSampleConfiguration->pCaCertPath, certName);
} else {
DLOGW("Cert not found in path set...checking if CMake detected a path\n");
CHK_ERR(STRNLEN(DEFAULT_KVS_CACERT_PATH, MAX_PATH_LEN) > 0, STATUS_INVALID_OPERATION, "No ca cert path given (error:%s)",
strerror(errno));
DLOGI("CMake detected cert path\n");
pSampleConfiguration->pCaCertPath = DEFAULT_KVS_CACERT_PATH;
}
}
}
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE roleType, BOOL trickleIce, BOOL useTurn,
PSampleConfiguration* ppSampleConfiguration)
{
STATUS retStatus = STATUS_SUCCESS;
PCHAR pAccessKey, pSecretKey, pSessionToken, pLogLevel;
PSampleConfiguration pSampleConfiguration = NULL;
UINT32 logLevel = LOG_LEVEL_DEBUG;
CHK(ppSampleConfiguration != NULL, STATUS_NULL_ARG);
CHK(NULL != (pSampleConfiguration = (PSampleConfiguration) MEMCALLOC(1, SIZEOF(SampleConfiguration))), STATUS_NOT_ENOUGH_MEMORY);
#ifdef IOT_CORE_ENABLE_CREDENTIALS
PCHAR pIotCoreCredentialEndPoint, pIotCoreCert, pIotCorePrivateKey, pIotCoreRoleAlias, pIotCoreThingName;
CHK_ERR((pIotCoreCredentialEndPoint = getenv(IOT_CORE_CREDENTIAL_ENDPOINT)) != NULL, STATUS_INVALID_OPERATION,
"AWS_IOT_CORE_CREDENTIAL_ENDPOINT must be set");
CHK_ERR((pIotCoreCert = getenv(IOT_CORE_CERT)) != NULL, STATUS_INVALID_OPERATION, "AWS_IOT_CORE_CERT must be set");
CHK_ERR((pIotCorePrivateKey = getenv(IOT_CORE_PRIVATE_KEY)) != NULL, STATUS_INVALID_OPERATION, "AWS_IOT_CORE_PRIVATE_KEY must be set");
CHK_ERR((pIotCoreRoleAlias = getenv(IOT_CORE_ROLE_ALIAS)) != NULL, STATUS_INVALID_OPERATION, "AWS_IOT_CORE_ROLE_ALIAS must be set");
#else
CHK_ERR((pAccessKey = getenv(ACCESS_KEY_ENV_VAR)) != NULL, STATUS_INVALID_OPERATION, "AWS_ACCESS_KEY_ID must be set");
CHK_ERR((pSecretKey = getenv(SECRET_KEY_ENV_VAR)) != NULL, STATUS_INVALID_OPERATION, "AWS_SECRET_ACCESS_KEY must be set");
#endif
pSessionToken = getenv(SESSION_TOKEN_ENV_VAR);
pSampleConfiguration->enableFileLogging = FALSE;
if (NULL != getenv(ENABLE_FILE_LOGGING)) {
pSampleConfiguration->enableFileLogging = TRUE;
}
if ((pSampleConfiguration->channelInfo.pRegion = getenv(DEFAULT_REGION_ENV_VAR)) == NULL) {
pSampleConfiguration->channelInfo.pRegion = DEFAULT_AWS_REGION;
}
CHK_STATUS(lookForSslCert(&pSampleConfiguration));
// Set the logger log level
if (NULL == (pLogLevel = getenv(DEBUG_LOG_LEVEL_ENV_VAR)) || STATUS_SUCCESS != STRTOUI32(pLogLevel, NULL, 10, &logLevel) ||
logLevel < LOG_LEVEL_VERBOSE || logLevel > LOG_LEVEL_SILENT) {
logLevel = LOG_LEVEL_WARN;
}
SET_LOGGER_LOG_LEVEL(logLevel);
#ifdef IOT_CORE_ENABLE_CREDENTIALS
CHK_STATUS(createLwsIotCredentialProvider(pIotCoreCredentialEndPoint, pIotCoreCert, pIotCorePrivateKey, pSampleConfiguration->pCaCertPath,
pIotCoreRoleAlias, channelName, &pSampleConfiguration->pCredentialProvider));
#else
CHK_STATUS(
createStaticCredentialProvider(pAccessKey, 0, pSecretKey, 0, pSessionToken, 0, MAX_UINT64, &pSampleConfiguration->pCredentialProvider));
#endif
pSampleConfiguration->mediaSenderTid = INVALID_TID_VALUE;
pSampleConfiguration->signalingClientHandle = INVALID_SIGNALING_CLIENT_HANDLE_VALUE;
pSampleConfiguration->sampleConfigurationObjLock = MUTEX_CREATE(TRUE);
pSampleConfiguration->cvar = CVAR_CREATE();
pSampleConfiguration->streamingSessionListReadLock = MUTEX_CREATE(FALSE);
pSampleConfiguration->signalingSendMessageLock = MUTEX_CREATE(FALSE);
/* This is ignored for master. Master can extract the info from offer. Viewer has to know if peer can trickle or
* not ahead of time. */
pSampleConfiguration->trickleIce = trickleIce;
pSampleConfiguration->useTurn = useTurn;
pSampleConfiguration->channelInfo.version = CHANNEL_INFO_CURRENT_VERSION;
pSampleConfiguration->channelInfo.pChannelName = channelName;
pSampleConfiguration->channelInfo.pKmsKeyId = NULL;
pSampleConfiguration->channelInfo.tagCount = 0;
pSampleConfiguration->channelInfo.pTags = NULL;
pSampleConfiguration->channelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER;
pSampleConfiguration->channelInfo.channelRoleType = roleType;
pSampleConfiguration->channelInfo.cachingPolicy = SIGNALING_API_CALL_CACHE_TYPE_FILE;
pSampleConfiguration->channelInfo.cachingPeriod = SIGNALING_API_CALL_CACHE_TTL_SENTINEL_VALUE;
pSampleConfiguration->channelInfo.asyncIceServerConfig = TRUE; // has no effect
pSampleConfiguration->channelInfo.retry = TRUE;
pSampleConfiguration->channelInfo.reconnect = TRUE;
pSampleConfiguration->channelInfo.pCertPath = pSampleConfiguration->pCaCertPath;
pSampleConfiguration->channelInfo.messageTtl = 0; // Default is 60 seconds
pSampleConfiguration->signalingClientCallbacks.version = SIGNALING_CLIENT_CALLBACKS_CURRENT_VERSION;
pSampleConfiguration->signalingClientCallbacks.errorReportFn = signalingClientError;
pSampleConfiguration->signalingClientCallbacks.stateChangeFn = signalingClientStateChanged;
pSampleConfiguration->signalingClientCallbacks.customData = (UINT64) pSampleConfiguration;
pSampleConfiguration->clientInfo.version = SIGNALING_CLIENT_INFO_CURRENT_VERSION;
pSampleConfiguration->clientInfo.loggingLevel = logLevel;
pSampleConfiguration->clientInfo.cacheFilePath = NULL; // Use the default path
pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32;
pSampleConfiguration->pregenerateCertTimerId = MAX_UINT32;
ATOMIC_STORE_BOOL(&pSampleConfiguration->interrupted, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->mediaThreadStarted, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->appTerminateFlag, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->connected, FALSE);
CHK_STATUS(timerQueueCreate(&pSampleConfiguration->timerQueueHandle));
CHK_STATUS(stackQueueCreate(&pSampleConfiguration->pregeneratedCertificates));
// Start the cert pre-gen timer callback
if (SAMPLE_PRE_GENERATE_CERT) {
CHK_LOG_ERR(retStatus =
timerQueueAddTimer(pSampleConfiguration->timerQueueHandle, 0, SAMPLE_PRE_GENERATE_CERT_PERIOD, pregenerateCertTimerCallback,
(UINT64) pSampleConfiguration, &pSampleConfiguration->pregenerateCertTimerId));
}
pSampleConfiguration->iceUriCount = 0;
CHK_STATUS(stackQueueCreate(&pSampleConfiguration->pPendingSignalingMessageForRemoteClient));
CHK_STATUS(hashTableCreateWithParams(SAMPLE_HASH_TABLE_BUCKET_COUNT, SAMPLE_HASH_TABLE_BUCKET_LENGTH,
&pSampleConfiguration->pRtcPeerConnectionForRemoteClient));
CleanUp:
if (STATUS_FAILED(retStatus)) {
freeSampleConfiguration(&pSampleConfiguration);
}
if (ppSampleConfiguration != NULL) {
*ppSampleConfiguration = pSampleConfiguration;
}
return retStatus;
}
STATUS logSignalingClientStats(PSignalingClientMetrics pSignalingClientMetrics)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
CHK(pSignalingClientMetrics != NULL, STATUS_NULL_ARG);
DLOGD("Signaling client connection duration: %" PRIu64 " ms",
(pSignalingClientMetrics->signalingClientStats.connectionDuration / HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
DLOGD("Number of signaling client API errors: %d", pSignalingClientMetrics->signalingClientStats.numberOfErrors);
DLOGD("Number of runtime errors in the session: %d", pSignalingClientMetrics->signalingClientStats.numberOfRuntimeErrors);
DLOGD("Signaling client uptime: %" PRIu64 " ms",
(pSignalingClientMetrics->signalingClientStats.connectionDuration / HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
// This gives the EMA of the createChannel, describeChannel, getChannelEndpoint and deleteChannel calls
DLOGD("Control Plane API call latency: %" PRIu64 " ms",
(pSignalingClientMetrics->signalingClientStats.cpApiCallLatency / HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
// This gives the EMA of the getIceConfig() call.
DLOGD("Data Plane API call latency: %" PRIu64 " ms",
(pSignalingClientMetrics->signalingClientStats.dpApiCallLatency / HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
CleanUp:
LEAVES();
return retStatus;
}
STATUS getIceCandidatePairStatsCallback(UINT32 timerId, UINT64 currentTime, UINT64 customData)
{
UNUSED_PARAM(timerId);
UNUSED_PARAM(currentTime);
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
UINT32 i;
UINT64 currentMeasureDuration = 0;
DOUBLE averagePacketsDiscardedOnSend = 0.0;
DOUBLE averageNumberOfPacketsSentPerSecond = 0.0;
DOUBLE averageNumberOfPacketsReceivedPerSecond = 0.0;
DOUBLE outgoingBitrate = 0.0;
DOUBLE incomingBitrate = 0.0;
BOOL locked = FALSE;
CHK_WARN(pSampleConfiguration != NULL, STATUS_NULL_ARG, "[KVS Master] getPeriodicStats(): Passed argument is NULL");
pSampleConfiguration->rtcIceCandidatePairMetrics.requestedTypeOfStats = RTC_STATS_TYPE_CANDIDATE_PAIR;
// We need to execute this under the object lock due to race conditions that it could pose
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) {
if (STATUS_SUCCEEDED(rtcPeerConnectionGetMetrics(pSampleConfiguration->sampleStreamingSessionList[i]->pPeerConnection, NULL,
&pSampleConfiguration->rtcIceCandidatePairMetrics))) {
currentMeasureDuration = (pSampleConfiguration->rtcIceCandidatePairMetrics.timestamp -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevTs) /
HUNDREDS_OF_NANOS_IN_A_SECOND;
DLOGD("Current duration: %" PRIu64 " seconds", currentMeasureDuration);
if (currentMeasureDuration > 0) {
DLOGD("Selected local candidate ID: %s",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.localCandidateId);
DLOGD("Selected remote candidate ID: %s",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.remoteCandidateId);
// TODO: Display state as a string for readability
DLOGD("Ice Candidate Pair state: %d", pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.state);
DLOGD("Nomination state: %s",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.nominated ? "nominated"
: "not nominated");
averageNumberOfPacketsSentPerSecond =
(DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsSent -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsSent) /
(DOUBLE) currentMeasureDuration;
DLOGD("Packet send rate: %lf pkts/sec", averageNumberOfPacketsSentPerSecond);
averageNumberOfPacketsReceivedPerSecond =
(DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsReceived -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsReceived) /
(DOUBLE) currentMeasureDuration;
DLOGD("Packet receive rate: %lf pkts/sec", averageNumberOfPacketsReceivedPerSecond);
outgoingBitrate = (DOUBLE)((pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesSent -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesSent) *
8.0) /
currentMeasureDuration;
DLOGD("Outgoing bit rate: %lf bps", outgoingBitrate);
incomingBitrate = (DOUBLE)((pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesReceived -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesReceived) *
8.0) /
currentMeasureDuration;
DLOGD("Incoming bit rate: %lf bps", incomingBitrate);
averagePacketsDiscardedOnSend =
(DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsDiscardedOnSend -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevPacketsDiscardedOnSend) /
(DOUBLE) currentMeasureDuration;
DLOGD("Packet discard rate: %lf pkts/sec", averagePacketsDiscardedOnSend);
DLOGD("Current STUN request round trip time: %lf sec",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.currentRoundTripTime);
DLOGD("Number of STUN responses received: %llu",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.responsesReceived);
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevTs =
pSampleConfiguration->rtcIceCandidatePairMetrics.timestamp;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsSent =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsSent;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsReceived =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsReceived;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesSent =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesSent;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesReceived =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesReceived;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevPacketsDiscardedOnSend =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsDiscardedOnSend;
}
}
}
CleanUp:
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
return retStatus;
}
STATUS pregenerateCertTimerCallback(UINT32 timerId, UINT64 currentTime, UINT64 customData)
{
UNUSED_PARAM(timerId);
UNUSED_PARAM(currentTime);
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
BOOL locked = FALSE;
UINT32 certCount;
PRtcCertificate pRtcCertificate = NULL;
CHK_WARN(pSampleConfiguration != NULL, STATUS_NULL_ARG, "[KVS Master] pregenerateCertTimerCallback(): Passed argument is NULL");
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
// Quick check if there is anything that needs to be done.
CHK_STATUS(stackQueueGetCount(pSampleConfiguration->pregeneratedCertificates, &certCount));
CHK(certCount != MAX_RTCCONFIGURATION_CERTIFICATES, retStatus);
// Generate the certificate with the keypair
CHK_STATUS(createRtcCertificate(&pRtcCertificate));
// Add to the stack queue
CHK_STATUS(stackQueueEnqueue(pSampleConfiguration->pregeneratedCertificates, (UINT64) pRtcCertificate));
DLOGV("New certificate has been pre-generated and added to the queue");
// Reset it so it won't be freed on exit
pRtcCertificate = NULL;
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = FALSE;
CleanUp:
if (pRtcCertificate != NULL) {
freeRtcCertificate(pRtcCertificate);
}
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
return retStatus;
}
STATUS freeSampleConfiguration(PSampleConfiguration* ppSampleConfiguration)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration;
UINT32 i;
UINT64 data;
StackQueueIterator iterator;
BOOL locked = FALSE;
CHK(ppSampleConfiguration != NULL, STATUS_NULL_ARG);
pSampleConfiguration = *ppSampleConfiguration;
CHK(pSampleConfiguration != NULL, retStatus);
if (pSampleConfiguration->pPendingSignalingMessageForRemoteClient != NULL) {
// Iterate and free all the pending queues
stackQueueGetIterator(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, &iterator);
while (IS_VALID_ITERATOR(iterator)) {
stackQueueIteratorGetItem(iterator, &data);
stackQueueIteratorNext(&iterator);
freeMessageQueue((PPendingMessageQueue) data);
}
stackQueueClear(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, FALSE);
stackQueueFree(pSampleConfiguration->pPendingSignalingMessageForRemoteClient);
pSampleConfiguration->pPendingSignalingMessageForRemoteClient = NULL;
}
hashTableClear(pSampleConfiguration->pRtcPeerConnectionForRemoteClient);
hashTableFree(pSampleConfiguration->pRtcPeerConnectionForRemoteClient);
if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
}
// Cancel the media thread
if(!(pSampleConfiguration->mediaThreadStarted)) {
DLOGD("Canceling media thread");
THREAD_CANCEL(pSampleConfiguration->mediaSenderTid);
}
for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) {
retStatus = gatherIceServerStats(pSampleConfiguration->sampleStreamingSessionList[i]);
if (STATUS_FAILED(retStatus)) {
DLOGW("Failed to ICE Server Stats for streaming session %d: %08x", i, retStatus);
}
freeSampleStreamingSession(&pSampleConfiguration->sampleStreamingSessionList[i]);
}
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
deinitKvsWebRtc();
SAFE_MEMFREE(pSampleConfiguration->pVideoFrameBuffer);
SAFE_MEMFREE(pSampleConfiguration->pAudioFrameBuffer);
if (IS_VALID_CVAR_VALUE(pSampleConfiguration->cvar) && IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
CVAR_BROADCAST(pSampleConfiguration->cvar);
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
MUTEX_FREE(pSampleConfiguration->sampleConfigurationObjLock);
}
if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->streamingSessionListReadLock)) {
MUTEX_FREE(pSampleConfiguration->streamingSessionListReadLock);
}
if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->signalingSendMessageLock)) {
MUTEX_FREE(pSampleConfiguration->signalingSendMessageLock);
}
if (IS_VALID_CVAR_VALUE(pSampleConfiguration->cvar)) {
CVAR_FREE(pSampleConfiguration->cvar);
}
#ifdef IOT_CORE_ENABLE_CREDENTIALS
freeIotCredentialProvider(&pSampleConfiguration->pCredentialProvider);
#else
freeStaticCredentialProvider(&pSampleConfiguration->pCredentialProvider);
#endif
if (IS_VALID_TIMER_QUEUE_HANDLE(pSampleConfiguration->timerQueueHandle)) {
if (pSampleConfiguration->iceCandidatePairStatsTimerId != MAX_UINT32) {
retStatus = timerQueueCancelTimer(pSampleConfiguration->timerQueueHandle, pSampleConfiguration->iceCandidatePairStatsTimerId,
(UINT64) pSampleConfiguration);
if (STATUS_FAILED(retStatus)) {
DLOGE("Failed to cancel stats timer with: 0x%08x", retStatus);
}
pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32;
}
if (pSampleConfiguration->pregenerateCertTimerId != MAX_UINT32) {
retStatus = timerQueueCancelTimer(pSampleConfiguration->timerQueueHandle, pSampleConfiguration->pregenerateCertTimerId,
(UINT64) pSampleConfiguration);
if (STATUS_FAILED(retStatus)) {
DLOGE("Failed to cancel certificate pre-generation timer with: 0x%08x", retStatus);
}
pSampleConfiguration->pregenerateCertTimerId = MAX_UINT32;
}
timerQueueFree(&pSampleConfiguration->timerQueueHandle);
}
if (pSampleConfiguration->pregeneratedCertificates != NULL) {
stackQueueGetIterator(pSampleConfiguration->pregeneratedCertificates, &iterator);
while (IS_VALID_ITERATOR(iterator)) {
stackQueueIteratorGetItem(iterator, &data);
stackQueueIteratorNext(&iterator);
freeRtcCertificate((PRtcCertificate) data);
}
CHK_LOG_ERR(stackQueueClear(pSampleConfiguration->pregeneratedCertificates, FALSE));
CHK_LOG_ERR(stackQueueFree(pSampleConfiguration->pregeneratedCertificates));
pSampleConfiguration->pregeneratedCertificates = NULL;
}
MEMFREE(*ppSampleConfiguration);
*ppSampleConfiguration = NULL;
CleanUp:
LEAVES();
return retStatus;
}
STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSampleStreamingSession pSampleStreamingSession = NULL;
UINT32 i, clientIdHash;
BOOL locked = FALSE, peerConnectionFound = FALSE;
SIGNALING_CLIENT_STATE signalingClientState;
CHK(pSampleConfiguration != NULL, STATUS_NULL_ARG);
while (!ATOMIC_LOAD_BOOL(&pSampleConfiguration->interrupted)) {
// Keep the main set of operations interlocked until cvar wait which would atomically unlock
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
// scan and cleanup terminated streaming session
for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) {
if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->sampleStreamingSessionList[i]->terminateFlag)) {
pSampleStreamingSession = pSampleConfiguration->sampleStreamingSessionList[i];
MUTEX_LOCK(pSampleConfiguration->streamingSessionListReadLock);
// swap with last element and decrement count
pSampleConfiguration->streamingSessionCount--;
pSampleConfiguration->sampleStreamingSessionList[i] =
pSampleConfiguration->sampleStreamingSessionList[pSampleConfiguration->streamingSessionCount];
// Remove from the hash table
clientIdHash = COMPUTE_CRC32((PBYTE) pSampleStreamingSession->peerId, (UINT32) STRLEN(pSampleStreamingSession->peerId));
CHK_STATUS(hashTableContains(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, &peerConnectionFound));
if (peerConnectionFound) {
CHK_STATUS(hashTableRemove(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash));
}
MUTEX_UNLOCK(pSampleConfiguration->streamingSessionListReadLock);
CHK_STATUS(freeSampleStreamingSession(&pSampleStreamingSession));
}
}
// Check if we need to re-create the signaling client on-the-fly
if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->recreateSignalingClient) &&
STATUS_SUCCEEDED(freeSignalingClient(&pSampleConfiguration->signalingClientHandle)) &&
STATUS_SUCCEEDED(createSignalingClientSync(&pSampleConfiguration->clientInfo, &pSampleConfiguration->channelInfo,
&pSampleConfiguration->signalingClientCallbacks, pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle))) {
// Re-set the variable again
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE);
}
// Check the signaling client state and connect if needed
if (IS_VALID_SIGNALING_CLIENT_HANDLE(pSampleConfiguration->signalingClientHandle)) {
CHK_STATUS(signalingClientGetCurrentState(pSampleConfiguration->signalingClientHandle, &signalingClientState));
if (signalingClientState == SIGNALING_CLIENT_STATE_READY) {
UNUSED_PARAM(signalingClientConnectSync(pSampleConfiguration->signalingClientHandle));
}
}
// Check if any lingering pending message queues
CHK_STATUS(removeExpiredMessageQueues(pSampleConfiguration->pPendingSignalingMessageForRemoteClient));
// periodically wake up and clean up terminated streaming session
CVAR_WAIT(pSampleConfiguration->cvar, pSampleConfiguration->sampleConfigurationObjLock, SAMPLE_SESSION_CLEANUP_WAIT_PERIOD);
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = FALSE;
}
CleanUp:
CHK_LOG_ERR(retStatus);
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
LEAVES();
return retStatus;
}
STATUS submitPendingIceCandidate(PPendingMessageQueue pPendingMessageQueue, PSampleStreamingSession pSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
BOOL noPendingSignalingMessageForClient = FALSE;
PReceivedSignalingMessage pReceivedSignalingMessage = NULL;
UINT64 hashValue;
CHK(pPendingMessageQueue != NULL && pPendingMessageQueue->messageQueue != NULL && pSampleStreamingSession != NULL, STATUS_NULL_ARG);
do {
CHK_STATUS(stackQueueIsEmpty(pPendingMessageQueue->messageQueue, &noPendingSignalingMessageForClient));
if (!noPendingSignalingMessageForClient) {
hashValue = 0;
CHK_STATUS(stackQueueDequeue(pPendingMessageQueue->messageQueue, &hashValue));
pReceivedSignalingMessage = (PReceivedSignalingMessage) hashValue;
CHK(pReceivedSignalingMessage != NULL, STATUS_INTERNAL_ERROR);
if (pReceivedSignalingMessage->signalingMessage.messageType == SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE) {
CHK_STATUS(handleRemoteCandidate(pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
}
SAFE_MEMFREE(pReceivedSignalingMessage);
}
} while (!noPendingSignalingMessageForClient);
CHK_STATUS(freeMessageQueue(pPendingMessageQueue));
CleanUp:
SAFE_MEMFREE(pReceivedSignalingMessage);
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pReceivedSignalingMessage)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
BOOL peerConnectionFound = FALSE, locked = TRUE, startStats = FALSE;
UINT32 clientIdHash;
UINT64 hashValue = 0;
PPendingMessageQueue pPendingMessageQueue = NULL;
PSampleStreamingSession pSampleStreamingSession = NULL;
PReceivedSignalingMessage pReceivedSignalingMessageCopy = NULL;
CHK(pSampleConfiguration != NULL, STATUS_NULL_ARG);
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
clientIdHash = COMPUTE_CRC32((PBYTE) pReceivedSignalingMessage->signalingMessage.peerClientId,
(UINT32) STRLEN(pReceivedSignalingMessage->signalingMessage.peerClientId));
CHK_STATUS(hashTableContains(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, &peerConnectionFound));
if (peerConnectionFound) {
CHK_STATUS(hashTableGet(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, &hashValue));
pSampleStreamingSession = (PSampleStreamingSession) hashValue;
}
switch (pReceivedSignalingMessage->signalingMessage.messageType) {
case SIGNALING_MESSAGE_TYPE_OFFER:
// Check if we already have an ongoing master session with the same peer
CHK_ERR(!peerConnectionFound, STATUS_INVALID_OPERATION, "Peer connection %s is in progress",
pReceivedSignalingMessage->signalingMessage.peerClientId);
/*
* Create new streaming session for each offer, then insert the client id and streaming session into
* pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages. Lastly check if there is
* any ice candidate messages queued in pPendingSignalingMessageForRemoteClient. If so then submit
* all of them.
*/
if (pSampleConfiguration->streamingSessionCount == ARRAY_SIZE(pSampleConfiguration->sampleStreamingSessionList)) {
DLOGW("Max simultaneous streaming session count reached.");
// Need to remove the pending queue if any.
// This is a simple optimization as the session cleanup will
// handle the cleanup of pending message queue after a while
CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE,
&pPendingMessageQueue));
CHK(FALSE, retStatus);
}
CHK_STATUS(createSampleStreamingSession(pSampleConfiguration, pReceivedSignalingMessage->signalingMessage.peerClientId, TRUE,
&pSampleStreamingSession));
pSampleStreamingSession->offerReceiveTime = GETTIME();
MUTEX_LOCK(pSampleConfiguration->streamingSessionListReadLock);
pSampleConfiguration->sampleStreamingSessionList[pSampleConfiguration->streamingSessionCount++] = pSampleStreamingSession;
MUTEX_UNLOCK(pSampleConfiguration->streamingSessionListReadLock);
CHK_STATUS(handleOffer(pSampleConfiguration, pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession));
// If there are any ice candidate messages in the queue for this client id, submit them now.
CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE,
&pPendingMessageQueue));
if (pPendingMessageQueue != NULL) {
CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession));
// NULL the pointer to avoid it being freed in the cleanup
pPendingMessageQueue = NULL;
}
startStats = pSampleConfiguration->iceCandidatePairStatsTimerId == MAX_UINT32;
break;
case SIGNALING_MESSAGE_TYPE_ANSWER:
/*
* for viewer, pSampleStreamingSession should've already been created. insert the client id and
* streaming session into pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages.
* Lastly check if there is any ice candidate messages queued in pPendingSignalingMessageForRemoteClient.
* If so then submit all of them.
*/
pSampleStreamingSession = pSampleConfiguration->sampleStreamingSessionList[0];
CHK_STATUS(handleAnswer(pSampleConfiguration, pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession));
// If there are any ice candidate messages in the queue for this client id, submit them now.
CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE,
&pPendingMessageQueue));
if (pPendingMessageQueue != NULL) {
CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession));
// NULL the pointer to avoid it being freed in the cleanup
pPendingMessageQueue = NULL;
}
break;
case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE:
/*
* if peer connection hasn't been created, create an queue to store the ice candidate message. Otherwise
* submit the signaling message into the corresponding streaming session.
*/
if (!peerConnectionFound) {
CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, FALSE,
&pPendingMessageQueue));
if (pPendingMessageQueue == NULL) {
CHK_STATUS(createMessageQueue(clientIdHash, &pPendingMessageQueue));
CHK_STATUS(stackQueueEnqueue(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, (UINT64) pPendingMessageQueue));
}
pReceivedSignalingMessageCopy = (PReceivedSignalingMessage) MEMCALLOC(1, SIZEOF(ReceivedSignalingMessage));
*pReceivedSignalingMessageCopy = *pReceivedSignalingMessage;
CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue->messageQueue, (UINT64) pReceivedSignalingMessageCopy));
// NULL the pointers to not free any longer
pPendingMessageQueue = NULL;
pReceivedSignalingMessageCopy = NULL;
} else {
CHK_STATUS(handleRemoteCandidate(pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
}
break;
default:
DLOGD("Unhandled signaling message type %u", pReceivedSignalingMessage->signalingMessage.messageType);
break;
}
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = FALSE;
if (startStats &&
STATUS_FAILED(retStatus = timerQueueAddTimer(pSampleConfiguration->timerQueueHandle, SAMPLE_STATS_DURATION, SAMPLE_STATS_DURATION,
getIceCandidatePairStatsCallback, (UINT64) pSampleConfiguration,
&pSampleConfiguration->iceCandidatePairStatsTimerId))) {
DLOGW("Failed to add getIceCandidatePairStatsCallback to add to timer queue (code 0x%08x). "
"Cannot pull ice candidate pair metrics periodically",
retStatus);
// Reset the returned status
retStatus = STATUS_SUCCESS;
}
CleanUp:
SAFE_MEMFREE(pReceivedSignalingMessageCopy);
if (pPendingMessageQueue != NULL) {
freeMessageQueue(pPendingMessageQueue);
}
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS createMessageQueue(UINT64 hashValue, PPendingMessageQueue* ppPendingMessageQueue)
{
STATUS retStatus = STATUS_SUCCESS;
PPendingMessageQueue pPendingMessageQueue = NULL;
CHK(ppPendingMessageQueue != NULL, STATUS_NULL_ARG);
CHK(NULL != (pPendingMessageQueue = (PPendingMessageQueue) MEMCALLOC(1, SIZEOF(PendingMessageQueue))), STATUS_NOT_ENOUGH_MEMORY);
pPendingMessageQueue->hashValue = hashValue;
pPendingMessageQueue->createTime = GETTIME();
CHK_STATUS(stackQueueCreate(&pPendingMessageQueue->messageQueue));
CleanUp:
if (STATUS_FAILED(retStatus) && pPendingMessageQueue != NULL) {
freeMessageQueue(pPendingMessageQueue);
pPendingMessageQueue = NULL;
}
if (ppPendingMessageQueue != NULL) {
*ppPendingMessageQueue = pPendingMessageQueue;
}
return retStatus;
}
STATUS freeMessageQueue(PPendingMessageQueue pPendingMessageQueue)
{
STATUS retStatus = STATUS_SUCCESS;
// free is idempotent
CHK(pPendingMessageQueue != NULL, retStatus);
if (pPendingMessageQueue->messageQueue != NULL) {
stackQueueClear(pPendingMessageQueue->messageQueue, TRUE);
stackQueueFree(pPendingMessageQueue->messageQueue);
}
MEMFREE(pPendingMessageQueue);
CleanUp:
return retStatus;
}
STATUS getPendingMessageQueueForHash(PStackQueue pPendingQueue, UINT64 clientHash, BOOL remove, PPendingMessageQueue* ppPendingMessageQueue)
{
STATUS retStatus = STATUS_SUCCESS;
PPendingMessageQueue pPendingMessageQueue = NULL;
StackQueueIterator iterator;
BOOL iterate = TRUE;
UINT64 data;
CHK(pPendingQueue != NULL && ppPendingMessageQueue != NULL, STATUS_NULL_ARG);
CHK_STATUS(stackQueueGetIterator(pPendingQueue, &iterator));
while (iterate && IS_VALID_ITERATOR(iterator)) {
CHK_STATUS(stackQueueIteratorGetItem(iterator, &data));
CHK_STATUS(stackQueueIteratorNext(&iterator));
pPendingMessageQueue = (PPendingMessageQueue) data;
if (clientHash == pPendingMessageQueue->hashValue) {
*ppPendingMessageQueue = pPendingMessageQueue;
iterate = FALSE;
// Check if the item needs to be removed
if (remove) {
// This is OK to do as we are terminating the iterator anyway
CHK_STATUS(stackQueueRemoveItem(pPendingQueue, data));
}
}
}
CleanUp:
return retStatus;
}
STATUS removeExpiredMessageQueues(PStackQueue pPendingQueue)
{
STATUS retStatus = STATUS_SUCCESS;
PPendingMessageQueue pPendingMessageQueue = NULL;
UINT32 i, count;
UINT64 data, curTime;
CHK(pPendingQueue != NULL, STATUS_NULL_ARG);
curTime = GETTIME();
CHK_STATUS(stackQueueGetCount(pPendingQueue, &count));
// Dequeue and enqueue in order to not break the iterator while removing an item
for (i = 0; i < count; i++) {
CHK_STATUS(stackQueueDequeue(pPendingQueue, &data));
// Check for expiry
pPendingMessageQueue = (PPendingMessageQueue) data;
if (pPendingMessageQueue->createTime + SAMPLE_PENDING_MESSAGE_CLEANUP_DURATION < curTime) {
// Message queue has expired and needs to be freed
CHK_STATUS(freeMessageQueue(pPendingMessageQueue));
} else {
// Enqueue back again as it's still valued
CHK_STATUS(stackQueueEnqueue(pPendingQueue, data));
}
}
CleanUp:
return retStatus;
}
#include "Samples.h"
extern PSampleConfiguration gSampleConfiguration;
// onMessage callback for a message received by the viewer on a data channel
VOID dataChannelOnMessageCallback(UINT64 customData, PRtcDataChannel pDataChannel, BOOL isBinary, PBYTE pMessage, UINT32 pMessageLen)
{
UNUSED_PARAM(customData);
UNUSED_PARAM(pDataChannel);
if (isBinary) {
DLOGI("DataChannel Binary Message");
} else {
DLOGI("DataChannel String Message: %.*s\n", pMessageLen, pMessage);
}
}
// onOpen callback for the onOpen event of a viewer created data channel
VOID dataChannelOnOpenCallback(UINT64 customData, PRtcDataChannel pDataChannel) {
STATUS retStatus = STATUS_SUCCESS;
DLOGI("New DataChannel has been opened %s \n", pDataChannel->name);
dataChannelOnMessage(pDataChannel, customData, dataChannelOnMessageCallback);
ATOMIC_INCREMENT((PSIZE_T) customData);
// Sending first message to the master over the data channel
retStatus = dataChannelSend(pDataChannel, FALSE, (PBYTE) VIEWER_DATA_CHANNEL_MESSAGE, STRLEN(VIEWER_DATA_CHANNEL_MESSAGE));
if(retStatus != STATUS_SUCCESS){
DLOGI("[KVS Viewer] dataChannelSend(): operation returned status code: 0x%08x \n", retStatus);
}
}
INT32 main(INT32 argc, CHAR* argv[])
{
STATUS retStatus = STATUS_SUCCESS;
RtcSessionDescriptionInit offerSessionDescriptionInit;
UINT32 buffLen = 0;
SignalingMessage message;
PSampleConfiguration pSampleConfiguration = NULL;
PSampleStreamingSession pSampleStreamingSession = NULL;
BOOL locked = FALSE;
PCHAR pChannelName;
SET_INSTRUMENTED_ALLOCATORS();
#ifndef _WIN32
signal(SIGINT, sigintHandler);
#endif
// do trickle-ice by default
printf("[KVS Master] Using trickleICE by default\n");
#ifdef IOT_CORE_ENABLE_CREDENTIALS
CHK_ERR((pChannelName = getenv(IOT_CORE_THING_NAME)) != NULL, STATUS_INVALID_OPERATION, "AWS_IOT_CORE_THING_NAME must be set");
#else
pChannelName = argc > 1 ? argv[1] : SAMPLE_CHANNEL_NAME;
#endif
retStatus = createSampleConfiguration(pChannelName, SIGNALING_CHANNEL_ROLE_TYPE_VIEWER, TRUE, TRUE, &pSampleConfiguration);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] createSampleConfiguration(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
gSampleConfiguration = pSampleConfiguration;
printf("[KVS Viewer] Created signaling channel %s\n", pChannelName);
if (pSampleConfiguration->enableFileLogging) {
retStatus =
createFileLogger(FILE_LOGGING_BUFFER_SIZE, MAX_NUMBER_OF_LOG_FILES, (PCHAR) FILE_LOGGER_LOG_FILE_DIRECTORY_PATH, TRUE, TRUE, NULL);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Master] createFileLogger(): operation returned status code: 0x%08x \n", retStatus);
pSampleConfiguration->enableFileLogging = FALSE;
}
}
// Initialize KVS WebRTC. This must be done before anything else, and must only be done once.
retStatus = initKvsWebRtc();
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] initKvsWebRtc(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("[KVS Viewer] KVS WebRTC initialization completed successfully\n");
pSampleConfiguration->signalingClientCallbacks.messageReceivedFn = signalingMessageReceived;
sprintf(pSampleConfiguration->clientInfo.clientId, "%s_%u", SAMPLE_VIEWER_CLIENT_ID, RAND() % MAX_UINT32);
retStatus = createSignalingClientSync(&pSampleConfiguration->clientInfo, &pSampleConfiguration->channelInfo,
&pSampleConfiguration->signalingClientCallbacks, pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] createSignalingClientSync(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("[KVS Viewer] Signaling client created successfully\n");
// Enable the processing of the messages
retStatus = signalingClientConnectSync(pSampleConfiguration->signalingClientHandle);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] signalingClientConnectSync(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("[KVS Viewer] Signaling client connection to socket established\n");
// Initialize streaming session
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
retStatus = createSampleStreamingSession(pSampleConfiguration, NULL, FALSE, &pSampleStreamingSession);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] createSampleStreamingSession(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("[KVS Viewer] Creating streaming session...completed\n");
pSampleConfiguration->sampleStreamingSessionList[pSampleConfiguration->streamingSessionCount++] = pSampleStreamingSession;
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = FALSE;
memset(&offerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit));
retStatus = setLocalDescription(pSampleStreamingSession->pPeerConnection, &offerSessionDescriptionInit);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] setLocalDescription(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("[KVS Viewer] Completed setting local description\n");
retStatus = transceiverOnFrame(pSampleStreamingSession->pAudioRtcRtpTransceiver, (UINT64) pSampleStreamingSession, sampleFrameHandler);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] transceiverOnFrame(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
if (!pSampleConfiguration->trickleIce) {
printf("[KVS Viewer] Non trickle ice. Wait for Candidate collection to complete\n");
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
while (!ATOMIC_LOAD_BOOL(&pSampleStreamingSession->candidateGatheringDone)) {
CHK_WARN(!ATOMIC_LOAD_BOOL(&pSampleStreamingSession->terminateFlag), STATUS_OPERATION_TIMED_OUT,
"application terminated and candidate gathering still not done");
CVAR_WAIT(pSampleConfiguration->cvar, pSampleConfiguration->sampleConfigurationObjLock, 5 * HUNDREDS_OF_NANOS_IN_A_SECOND);
}
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = FALSE;
printf("[KVS Viewer] Candidate collection completed\n");
}
retStatus = createOffer(pSampleStreamingSession->pPeerConnection, &offerSessionDescriptionInit);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] createOffer(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("[KVS Viewer] Offer creation successful\n");
printf("[KVS Viewer] Generating JSON of session description....");
retStatus = serializeSessionDescriptionInit(&offerSessionDescriptionInit, NULL, &buffLen);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] serializeSessionDescriptionInit(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
if (buffLen >= SIZEOF(message.payload)) {
printf("[KVS Viewer] serializeSessionDescriptionInit(): operation returned status code: 0x%08x \n", STATUS_INVALID_OPERATION);
retStatus = STATUS_INVALID_OPERATION;
goto CleanUp;
}
retStatus = serializeSessionDescriptionInit(&offerSessionDescriptionInit, message.payload, &buffLen);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] serializeSessionDescriptionInit(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("Completed\n");
message.version = SIGNALING_MESSAGE_CURRENT_VERSION;
message.messageType = SIGNALING_MESSAGE_TYPE_OFFER;
STRCPY(message.peerClientId, SAMPLE_MASTER_CLIENT_ID);
message.payloadLen = (buffLen / SIZEOF(CHAR)) - 1;
message.correlationId[0] = '\0';
retStatus = signalingClientSendMessageSync(pSampleConfiguration->signalingClientHandle, &message);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] signalingClientSendMessageSync(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
#ifdef ENABLE_DATA_CHANNEL
PRtcDataChannel pDataChannel = NULL;
PRtcPeerConnection pPeerConnection = pSampleStreamingSession->pPeerConnection;
SIZE_T datachannelLocalOpenCount = 0;
// Creating a new datachannel on the peer connection of the existing sample streaming session
retStatus = createDataChannel(pPeerConnection, pChannelName, NULL, &pDataChannel);
if(retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] createDataChannel(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("[KVS Viewer] Creating data channel...completed\n");
// Setting a callback for when the data channel is open
retStatus = dataChannelOnOpen(pDataChannel, (UINT64) &datachannelLocalOpenCount, dataChannelOnOpenCallback);
if(retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] dataChannelOnOpen(): operation returned status code: 0x%08x \n", retStatus);
goto CleanUp;
}
printf("[KVS Viewer] Data Channel open now...\n");
#endif
// Block until interrupted
while (!ATOMIC_LOAD_BOOL(&pSampleConfiguration->interrupted) && !ATOMIC_LOAD_BOOL(&pSampleStreamingSession->terminateFlag)) {
THREAD_SLEEP(HUNDREDS_OF_NANOS_IN_A_SECOND);
}
CleanUp:
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Viewer] Terminated with status code 0x%08x", retStatus);
}
printf("[KVS Viewer] Cleaning up....\n");
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
if (pSampleConfiguration->enableFileLogging) {
freeFileLogger();
}
if (pSampleConfiguration != NULL) {
retStatus = freeSignalingClient(&pSampleConfiguration->signalingClientHandle);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Master] freeSignalingClient(): operation returned status code: 0x%08x \n", retStatus);
}
retStatus = freeSampleConfiguration(&pSampleConfiguration);
if (retStatus != STATUS_SUCCESS) {
printf("[KVS Master] freeSampleConfiguration(): operation returned status code: 0x%08x \n", retStatus);
}
}
printf("[KVS Viewer] Cleanup done\n");
RESET_INSTRUMENTED_ALLOCATORS();
// https://www.gnu.org/software/libc/manual/html_node/Exit-Status.html
// We can only return with 0 - 127. Some platforms treat exit code >= 128
// to be a success code, which might give an unintended behaviour.
// Some platforms also treat 1 or 0 differently, so it's better to use
// EXIT_FAILURE and EXIT_SUCCESS macros for portability.
return STATUS_FAILED(retStatus) ? EXIT_FAILURE : EXIT_SUCCESS;
}
更多推荐
所有评论(0)