aboutsummaryrefslogtreecommitdiff
path: root/ts/transport/web_serial_transport.ts
blob: f4f91807293e75398027178012b05d073992421b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

/* eslint-env browser */
import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
import DeviceTransport from './device_transport';
import type {
  SerialPort,
  Serial,
  SerialOptions,
  Navigator,
  SerialPortFilter,
} from '../types/serial';

const DEFAULT_SERIAL_OPTIONS: SerialOptions & { baudRate: number } = {
  // Some versions of chrome use `baudrate` (linux)
  baudrate: 115200,
  // Some versions use `baudRate` (chromebook)
  baudRate: 115200,
  databits: 8,
  parity: 'none',
  stopbits: 1,
};

interface PortReadConnection {
  chunks: Observable<Uint8Array>;
  errors: Observable<Error>;
}

interface PortConnection extends PortReadConnection {
  sendChunk: (chunk: Uint8Array) => Promise<void>;
}

export class DeviceLostError extends Error {
  override message = 'The device has been lost';
}

export class DeviceLockedError extends Error {
  override message =
    "The device's port is locked. Try unplugging it" +
    ' and plugging it back in.';
}

/**
 * WebSerialTransport sends and receives UInt8Arrays to and
 * from a serial device connected over USB.
 */
export class WebSerialTransport implements DeviceTransport {
  chunks = new Subject<Uint8Array>();
  errors = new Subject<Error>();
  connected = new BehaviorSubject<boolean>(false);
  private portConnections: Map<SerialPort, PortConnection> = new Map();
  private activePortConnectionConnection: PortConnection | undefined;
  private rxSubscriptions: Subscription[] = [];
  private writer: WritableStreamDefaultWriter<Uint8Array> | undefined;
  private abortController: AbortController | undefined;

  constructor(
    private serial: Serial = (navigator as unknown as Navigator).serial,
    private filters: SerialPortFilter[] = [],
    private serialOptions = DEFAULT_SERIAL_OPTIONS,
  ) {}

  /**
   * Send a UInt8Array chunk of data to the connected device.
   * @param {Uint8Array} chunk The chunk to send
   */
  async sendChunk(chunk: Uint8Array): Promise<void> {
    if (this.activePortConnectionConnection) {
      return this.activePortConnectionConnection.sendChunk(chunk);
    }
    throw new Error('Device not connected');
  }

  /**
   * Attempt to open a connection to a device. This includes
   * asking the user to select a serial port and should only
   * be called in response to user interaction.
   */
  async connect(): Promise<void> {
    const port = await this.serial.requestPort({ filters: this.filters });
    await this.connectPort(port);
  }

  async disconnect() {
    for (const subscription of this.rxSubscriptions) {
      subscription.unsubscribe();
    }
    this.rxSubscriptions = [];

    this.activePortConnectionConnection = undefined;
    this.portConnections.clear();
    this.abortController?.abort();

    try {
      await this.writer?.close();
    } catch (err) {
      this.errors.next(err as Error);
    }
    this.connected.next(false);
  }

  /**
   * Connect to a given SerialPort. This involves no user interaction.
   * and can be called whenever a port is available.
   */
  async connectPort(port: SerialPort): Promise<void> {
    this.activePortConnectionConnection =
      this.portConnections.get(port) ?? (await this.connectNewPort(port));

    this.connected.next(true);

    this.rxSubscriptions.push(
      this.activePortConnectionConnection.chunks.subscribe(
        (chunk: any) => {
          this.chunks.next(chunk);
        },
        (err: any) => {
          throw new Error(`Chunks observable had an unexpected error ${err}`);
        },
        () => {
          this.connected.next(false);
          this.portConnections.delete(port);
          // Don't complete the chunks observable because then it would not
          // be able to forward any future chunks.
        },
      ),
    );

    this.rxSubscriptions.push(
      this.activePortConnectionConnection.errors.subscribe((error: any) => {
        this.errors.next(error);
        if (error instanceof DeviceLostError) {
          // The device has been lost
          this.connected.next(false);
        }
      }),
    );
  }

  private async connectNewPort(port: SerialPort): Promise<PortConnection> {
    await port.open(this.serialOptions);
    const writer = port.writable.getWriter();
    this.writer = writer;

    async function sendChunk(chunk: Uint8Array) {
      await writer.ready;
      await writer.write(chunk);
    }

    const { chunks, errors } = this.getChunks(port);

    const connection: PortConnection = { sendChunk, chunks, errors };
    this.portConnections.set(port, connection);
    return connection;
  }

  private getChunks(port: SerialPort): PortReadConnection {
    const chunks = new Subject<Uint8Array>();
    const errors = new Subject<Error>();
    const abortController = new AbortController();
    this.abortController = abortController;

    async function read() {
      if (!port.readable) {
        throw new DeviceLostError();
      }
      if (port.readable.locked) {
        throw new DeviceLockedError();
      }
      await port.readable.pipeTo(
        new WritableStream({
          write: (chunk) => {
            chunks.next(chunk);
          },
          close: () => {
            chunks.complete();
            errors.complete();
          },
        }),
        { signal: abortController.signal },
      );
    }

    function connect() {
      read().catch((err) => {
        // Don't error the chunks observable since that stops it from
        // reading any more packets, and we often want to continue
        // despite an error. Instead, push errors to the 'errors'
        // observable.
        errors.next(err);
      });
    }

    connect();

    return { chunks, errors };
  }
}