Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
expect(inboundStream).to.have.property('writeStatus', 'writable', 'inbound stream writeStatus was incorrect')
expect(inboundStream).to.have.property('readStatus', 'readable', 'inbound stream readStatus was incorrect')
})

it('closes read only', async () => {
expect(outboundStream).to.not.have.nested.property('timeline.close')

await outboundStream.closeRead()

expect(outboundStream).to.have.property('writeStatus', 'writable')
expect(outboundStream).to.have.property('readStatus', 'closed')
})

it('aborts', async () => {
const eventPromises = Promise.all([
Expand Down Expand Up @@ -237,6 +246,19 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
expect(inboundEvent).to.have.nested.property('error.name', 'StreamResetError')
})

it('resets when remote aborts', async () => {
expect(outboundStream).to.not.have.nested.property('timeline.close')

const closePromise = pEvent(outboundStream, 'close')
inboundStream.abort(new Error('Urk!'))

await closePromise

expect(outboundStream).to.have.property('status', 'reset')
expect(isValidTick(outboundStream.timeline.close)).to.equal(true)
expect(outboundStream.timeline.close).to.be.greaterThanOrEqual(outboundStream.timeline.open)
})

it('does not send close read when remote closes write', async () => {
// @ts-expect-error internal method of AbstractMessageStream
const sendCloseReadSpy = Sinon.spy(outboundStream, 'sendCloseRead')
Expand Down
185 changes: 0 additions & 185 deletions packages/transport-webrtc/test/stream.spec.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
import { defaultLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import delay from 'delay'
import * as lengthPrefixed from 'it-length-prefixed'
import { bytes } from 'multiformats'
import { pEvent } from 'p-event'
import { stubInterface } from 'sinon-ts'
import { MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD } from '../src/constants.js'
import { Message } from '../src/private-to-public/pb/message.js'
import { createStream } from '../src/stream.js'
import { isFirefox } from '../src/util.ts'
import { RTCPeerConnection } from '../src/webrtc/index.js'
import { receiveFinAck, receiveRemoteCloseWrite } from './util.js'
import type { WebRTCStream } from '../src/stream.js'
import type { Stream } from '@libp2p/interface'

describe('Max message size', () => {
it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => {
Expand Down Expand Up @@ -84,181 +77,3 @@ describe('Max message size', () => {
}
})
})

const TEST_MESSAGE = 'test_message'

async function setup (): Promise<{ peerConnection: RTCPeerConnection, dataChannel: RTCDataChannel, stream: WebRTCStream }> {
const peerConnection = new RTCPeerConnection()
const dataChannel = peerConnection.createDataChannel('whatever', { negotiated: true, id: 91 })

await pEvent(dataChannel, 'open', {
rejectionEvents: [
'close',
'error'
]
})

const stream = createStream({
channel: dataChannel,
direction: 'outbound',
closeTimeout: 1,
log: defaultLogger().forComponent('test')
})

return { peerConnection, dataChannel, stream }
}

function generatePbByFlag (flag?: Message.Flag): Uint8Array {
const buf = Message.encode({
flag,
message: bytes.fromString(TEST_MESSAGE)
})

return lengthPrefixed.encode.single(buf).subarray()
}

// TODO: move to transport interface compliance suite
describe.skip('Stream Stats', () => {
let stream: WebRTCStream
let peerConnection: RTCPeerConnection
let dataChannel: RTCDataChannel

beforeEach(async () => {
({ stream, peerConnection, dataChannel } = await setup())
})

afterEach(() => {
if (peerConnection != null) {
peerConnection.close()
}
})

it('can construct', () => {
expect(stream.timeline.close).to.not.exist()
})

it('close marks it closed', async () => {
expect(stream.timeline.close).to.not.exist()
expect(stream.writeStatus).to.equal('writable')

receiveFinAck(dataChannel)
receiveRemoteCloseWrite(dataChannel)

await Promise.all([
pEvent(stream, 'close'),
stream.close()
])

expect(stream.timeline.close).to.be.a('number')
expect(stream.writeStatus).to.equal('closed')
})

it('closeRead marks it read-closed only', async () => {
expect(stream.timeline.close).to.not.exist()
await stream.closeRead()

expect(stream).to.have.property('writeStatus', 'writable')
expect(stream).to.have.property('readStatus', 'closed')
})

it('closeWrite marks it write-closed only', async () => {
expect(stream.timeline.close).to.not.exist()

receiveFinAck(dataChannel)
await stream.close()

expect(stream).to.have.property('writeStatus', 'closed')
expect(stream).to.have.property('readStatus', 'readable')
})

it('abort = close', () => {
expect(stream.timeline.close).to.not.exist()
stream.abort(new Error('Oh no!'))
expect(stream.timeline.close).to.be.a('number')
})

it('reset = close', () => {
expect(stream.timeline.close).to.not.exist()
stream.onRemoteReset() // only resets the write side
expect(stream.timeline.close).to.be.a('number')
expect(stream.timeline.close).to.be.greaterThanOrEqual(stream.timeline.open)
})
})

// TODO: move to transport interface compliance suite
describe.skip('Stream Read Stats Transition By Incoming Flag', () => {
let dataChannel: RTCDataChannel
let stream: Stream
let peerConnection: RTCPeerConnection

beforeEach(async () => {
({ dataChannel, stream, peerConnection } = await setup())
})

afterEach(() => {
if (peerConnection != null) {
peerConnection.close()
}
})

it('no flag, no transition', () => {
expect(stream.timeline.close).to.not.exist()
const data = generatePbByFlag()
dataChannel.onmessage?.(new MessageEvent('message', { data }))

expect(stream.timeline.close).to.not.exist()
})

it('open to read-close by flag:FIN', async () => {
const data = generatePbByFlag(Message.Flag.FIN)
dataChannel.dispatchEvent(new MessageEvent('message', { data }))

await delay(100)

expect(stream.readStatus).to.equal('closed')
})

it('read-close to close by flag:STOP_SENDING', async () => {
const data = generatePbByFlag(Message.Flag.STOP_SENDING)
dataChannel.dispatchEvent(new MessageEvent('message', { data }))

await delay(100)

expect(stream.remoteReadStatus).to.equal('closed')
})
})

// TODO: move to transport interface compliance suite
describe.skip('Stream Write Stats Transition By Incoming Flag', () => {
let dataChannel: RTCDataChannel
let stream: Stream
let peerConnection: RTCPeerConnection

beforeEach(async () => {
({ dataChannel, stream, peerConnection } = await setup())
})

afterEach(() => {
if (peerConnection != null) {
peerConnection.close()
}
})

it('open to write-close by flag:STOP_SENDING', async () => {
const data = generatePbByFlag(Message.Flag.STOP_SENDING)
dataChannel.dispatchEvent(new MessageEvent('message', { data }))

await delay(100)

expect(stream.remoteReadStatus).to.equal('closed')
})

it('write-close to close by flag:FIN', async () => {
const data = generatePbByFlag(Message.Flag.FIN)
dataChannel.dispatchEvent(new MessageEvent('message', { data }))

await delay(100)

expect(stream.remoteWriteStatus).to.equal('closed')
})
})
Loading