import { Emitter } from '@leyan/emitter';
import sleep from '../../utils/wait';
import logger from '../logger';

export interface ServerMesssage {
  id: number;
  fn: string;
  args: Record<string, any>;
}

export type Assistant = { name: string; id: string };

interface WebSocketJsonRPCProps {
  app: string;
  emitter: Emitter;
  endpoint: string;
  shopID: number;
  assistant: Assistant;
  requestTimeout?: number;
  pingTimeout?: number;
}

const RUNTIME_ENV = 'web';

class WebSocketJsonRPC {
  app: string;

  emitter: Emitter;

  assistant: Assistant;

  requestTimeout: number;

  ws: null | WebSocket;

  idCounter: number;

  requestPool: Map<number, any>;

  lastPingAt: null | number;

  pingTimeout: number;

  authed: boolean;

  shopID: number;

  isConnecting: boolean;

  endpoint: string;

  babysittingInterval: null | number;

  offline: boolean;

  constructor({
    app,
    emitter,
    shopID,
    endpoint,
    assistant,
    requestTimeout = 5 * 1000,
    pingTimeout = 30 * 1000,
  }: WebSocketJsonRPCProps) {
    this.app = app;
    this.emitter = emitter;
    this.assistant = assistant;
    this.requestTimeout = requestTimeout;
    this.pingTimeout = pingTimeout;
    this.endpoint = endpoint;
    this.lastPingAt = null;
    this.babysittingInterval = null;
    this.ws = null;
    this.idCounter = 0;
    this.requestPool = new Map();
    this.authed = false;
    this.shopID = shopID;
    this.isConnecting = false;
    this.offline = false;
  }

  toString() {
    const { assistant: assistantNick, shopID } = this;
    return `<WS:${assistantNick}:${shopID}>`;
  }

  isAlive() {
    const { ws } = this;
    logger.debug('isAlive:', ws && ws.readyState === WebSocket.OPEN, this.lastPingAt);
    let isPingOk = true;
    if (this.lastPingAt !== null) {
      isPingOk = Date.now() - this.lastPingAt < this.pingTimeout;
    }

    return ws && ws.readyState === WebSocket.OPEN && isPingOk;
  }

  close() {
    this.lastPingAt = null;
    this.requestPool.clear();
    this.authed = false;
    if (this.ws !== null) {
      try {
        this.ws.close();
      } catch (err) {
        logger.error('ws terminate error: %s', err);
      }
    }
    this.ws = null;
  }

  async setup(ws: WebSocket) {
    ws.addEventListener('error', (event) => {
      logger.error('agent error: [%s]', event);
      this.close();
    });
    ws.addEventListener('close', (event) => {
      logger.error('%s connection closed: [%s]', this, event);
      this.close();
    });
    ws.addEventListener('message', (event: MessageEvent) => {
      logger.error('%s connection message: [%s]', this, event);
      logger.error('connection message: ', this, event);
      this.onMessage(event.data);
    });
    this.ws = ws;
    await this.authorize();
    logger.info('authorize success');
    this.babysitting();
  }

  sendRequest<T extends object = {}>(
    method: ServerMesssage['fn'],
    param: ServerMesssage['args'],
    requestTimeout?: number,
  ): Promise<T> {
    const { requestPool, ws } = this;

    const data = this.createPayload(method, param);
    logger.debug('sending %j', data);
    const label = `[${data.fn}#${data.id}]`;
    const request = new Promise((resolve, reject) => {
      if (!this.isAlive()) {
        reject(new Error('websocket is not available'));
      }
      requestPool.set(data.id, {
        ts: Date.now(),
        id: data.id,
        data,
        resolve,
        reject,
      });
      const payload = JSON.stringify(data);
      const tip = `${label} transport error`;
      try {
        ws!.send(payload);
      } catch (e) {
        reject(new Error(`${tip}: ${e}\n${e.stack}`));
      }
    });
    const timeout = sleep(requestTimeout || this.requestTimeout).then(() => {
      return Promise.reject(new Error(`timeout ${label}`));
    });
    return Promise.race([request, timeout]) as Promise<T>;
  }

  sendResponse(id: number, args: ServerMesssage['args']) {
    if (this.isAlive()) {
      const response = this.createPayload('Agent.Ack', { id, ...args });
      this.ws!.send(JSON.stringify(response));
    } else {
      logger.warn(`sendResponse ${id} faild, %s not connected`, this);
    }
  }

  send(method: ServerMesssage['fn'], param: ServerMesssage['args']) {
    if (!this.isAlive()) {
      throw new Error('websocket is not available');
    }

    const data = this.createPayload(method, param);
    logger.debug('no resp sending %j', data);

    return this.ws!.send(JSON.stringify(data));
  }

  onMessage(data: string) {
    const { emitter, requestPool } = this;
    let message = null;
    try {
      message = JSON.parse(data);
      logger.verbose('message received %j', message);
    } catch (err) {
      logger.error('invalid json-rpc [%s] %s', err, data);
      return;
    }
    const { id, fn, args } = message as ServerMesssage;
    // malformed
    if (id === undefined) {
      logger.error('malformed json-rpc %j', message);
      return;
    }

    if (fn === 'Agent.Ack') {
      // response
      if (!requestPool.has(args.id)) {
        logger.error('json-rpc missing context %j', message);
        return;
      }
      const requestContext = requestPool.get(args.id);
      requestPool.delete(id);
      if (args.code === 0) {
        requestContext.resolve(args);
      } else {
        requestContext.reject(new Error(JSON.stringify(args)));
      }
    } else {
      // request
      emitter.emit('request', id, fn, args);
    }
  }

  async authorize() {
    const { assistant, shopID } = this;
    if (this.authed) {
      logger.warn('%s already authorized', this);
      return;
    }
    logger.verbose('authenticating...');
    const payload = {
      client_type: RUNTIME_ENV,
      store_id: shopID,
      assistant_name: assistant.name,
      assistant_id: assistant.id,
      timestamp: Date.now(),
    };
    // auth 请求设置10s;
    await this.sendRequest('Agent.AuthenticateByToken', payload, 10 * 1000);
    logger.info('%s authorized', this);
    this.authed = true;
  }

  createPayload(fn: ServerMesssage['fn'], args: ServerMesssage['args']): ServerMesssage {
    this.idCounter += 1;
    const id = this.idCounter;
    return { id, fn, args };
  }

  pong() {
    const request = this.createPayload('Agent.PingPong', { type: 'pong' });
    this.lastPingAt = Date.now();
    this.ws && this.ws.send(JSON.stringify(request));
  }

  async connect() {
    if (this.offline) {
      return;
    }
    if (this.isConnecting) {
      logger.warn('%s is connecting, skip to setup', this);
      return;
    }

    if (this.isAlive()) {
      logger.warn('%s is alive, skip to setup', this);
      return;
    }
    this.close();
    this.isConnecting = true;

    let ws: WebSocket | null = null;
    try {
      ws = await this.getWS();
      await this.setup(ws);
    } catch (err) {
      if (ws !== null) {
        try {
          ws.close();
        } catch (err) {
          logger.error('ws terminate failed: %s', err);
        }
      }
      logger.error('ws connection error: %s\n%s', err, err.stack);
      this.isConnecting = false;
      this.babysitting();
      throw err;
    }
  }

  async getWS() {
    const ws = new WebSocket(this.endpoint);
    const connected = new Promise((resolve, reject) => {
      if (ws.readyState === WebSocket.OPEN) {
        resolve(null);
        return;
      }

      ws.addEventListener('open', () => {
        this.lastPingAt = Date.now();
        resolve(null);
      });
      ws.addEventListener(
        'error',
        (error) => {
          reject(error);
        },
        { once: true },
      );
    });

    await connected;
    return ws;
  }

  babysitting() {
    this.stopBabysitting();
    this.babysittingInterval = window.setInterval(() => {
      logger.verbose('babysitting agent...');

      if (this.isAlive() && this.authed) {
        logger.verbose('%s ok', this);
        return;
      }
      this.close();

      this.connect();
    }, 10000);
  }

  stopBabysitting() {
    if (this.babysittingInterval) {
      clearInterval(this.babysittingInterval);
    }
  }
}

export default WebSocketJsonRPC;
