使用NestJS+Redis+Kafka实现简单秒杀系统

时间:2021-2-20 作者:admin

使用NestJS + Redis + Kafka实现简单秒杀系统

技术栈:我们的老伙计NestJS,以及iorediskafka-node

最近在研究kafka消息队列,所以想写个秒杀来试试手,看了好几篇博客都没有具体的项目示例,所以参考了一下各种实现用nestjs写了一个可运行的项目。

第一步,创建项目

这里使用了nest cli命令快速生成项目模板;

  1. 安装@nest/cli脚手架用于生成项目;

npm i -g @nest/cli   #安装nest-cli

  1. 生成项目

nest new nest-seckill   #使用nest cli生成项目
cd ./nest-seckill
yarn                    #安装依赖
yarn add -S redis ioredis nestjs-redis kafka-node mysql2 typeorm uuid-random    #添加依赖

第二步,生成seckill模块

这里使用了nest cli命令快速生成模板代码;了解详情可以查看官方文档:nest-cli文档

  1. 生成 seckill.module.ts文件;

    用于创建kafka消费者,接收kafka消息,写入订单信息;

    nest generate module seckill 
    # 可以简写为 `nest g mo seckill`
    

  1. 生成 seckill.controller.ts

    用于实现秒杀的RESTful接口;

    nest g co seckill
    

  1. 生成 seckill.service.ts;

    service里使用redis乐观锁(watch)事务(mult)实现秒杀逻辑,
    再使用kafka的Producer生产一条消费数据;

    nest g service seckill
    

  1. 生成 redis.service.ts;

    用于连接redis;

    nest g service redis
    

    修改内容:

     import { Injectable } from '@nestjs/common'
     import { RedisService } from 'nestjs-redis'
    
     @Injectable()
     export class RedisClientService {
       constructor(private readonly redisService: RedisService) {}
    
       // 连接配置已在app.module设置
       async getSeckillRedisClient() {
         return await this.redisService.getClient('seckill')
       }
     }
    

第三步,编写秒杀逻辑;

  1. 定义秒杀接口:

    seckill.controller.ts里新增一个Post接口:

     import { Body, Controller, Post } from '@nestjs/common'
     import * as uuid from 'uuid-random'                   // 使用uuid生成订单号
     import { CreateOrderDTO } from '../order/order.dto'   // 新增订单字段定义
     import { SeckillService } from './seckill.service'   // 秒杀逻辑具体实现
     import { awaitWrap } from '@/utils/index'            // async返回值简化方法
    
    @Controller('seckill')
     export class SeckillController {
      constructor(private readonly seckillService: SeckillService) {}
    
       @Post('/add')
       async addOrder(@Body() order: CreateOrderDTO) {
         const params: CreateOrderDTO = {
           ...order,
           openid: `${uuid()}-${new Date().valueOf()}`,
         }
    
         // 调用service的secKill方法,并等待完成
         const [error, result] = await awaitWrap(this.seckillService.secKill(params))
         return error || result
       }
     }
    

  1. 实现秒杀逻辑:

    seckill.service.ts里新增一个secKill方法;

    使用redis乐观锁(watch)事务(mult),实现并发下修改数据,详情可参考node redis文档

    import { Injectable, Logger } from '@nestjs/common'
    import * as kafka from 'kafka-node'
    import * as Redis from 'ioredis'
    import { RedisClientService } from '../redis/redis.service'
    import { getConfig } from '@root/config/index' // redis和 kafka的连接配置
    import { awaitWrap } from '@/utils'
    
    const { redisSeckill, kafkaConfig } = getConfig()
    
    // 创建kafka Client
    const kafkaClient = new kafka.KafkaClient({ kafkaHost: kafkaConfig.kafkaHost })
    // 创建kafka生产者
    const producer = new kafka.Producer(kafkaClient, {
      // Configuration for when to consider a message as acknowledged, default 1
      requireAcks: 1,
      // The amount of time in milliseconds to wait for all acks before considered, default 100ms
      ackTimeoutMs: 100,
      // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
      partitionerType: 2,
    })
    
    @Injectable()
    export class SeckillService {
      logger = new Logger('SeckillService') // 创建nest自带的日志实例
      seckillRedisClient!: Redis.Redis // redis连接实例
      count = 0 // 当前请求的次数
    
      constructor(private readonly redisClientService: RedisClientService) {
        // service 创建时异步初始化redis连接
        this.redisClientService.getSeckillRedisClient().then(client => {
          this.seckillRedisClient = client
        })
      }
    
      /*
       * ***********************
       * @desc 秒杀具体实现
       * ***********************
       */
      async secKill(params) {
        const { seckillCounterKey } = redisSeckill
        this.logger.log(`当前请求count:${this.count++}`)
    
        // tips:使用乐观锁解决并发
        const [watchError] = await awaitWrap(this.seckillRedisClient.watch(seckillCounterKey)) //监听'counter'字段更改
        watchError && this.logger.error(watchError)
        if (watchError) return watchError
    
        // 获取当前当前订单剩余数量
        const [getError, reply] = await awaitWrap(this.seckillRedisClient.get(seckillCounterKey))
        getError && this.logger.error(getError)
        if (getError) return getError
        if (parseInt(reply) <= 0) {
          this.logger.warn('已经卖光了')
          return '已经卖光了'
        }
    
        //tips: 使用redis事务修改redis的counter数量减一
        const [execError, replies] = await awaitWrap(this.seckillRedisClient.multi().decr(seckillCounterKey).exec())
        execError && this.logger.error(execError)
        if (execError) return execError
    
        // counter字段正在操作中,等待counter被其他释放
        if (!replies) {
          this.logger.warn('counter被使用')
          this.secKill(params) // 自动重试
          return
        }
    
        // kafka消费数据的内容
        const payload = [
          {
            topic: kafkaConfig.topic,
            partition: 0,
            messages: [JSON.stringify(params)],
          },
        ]
    
        this.logger.log('生产数据payload:')
        this.logger.verbose(payload)
    
        // 异步等待发送kafka消费数据
        return new Promise((resolve, reject) => {
          producer.send(payload, (err, kafkaProducerResponse) => {
            if (err) {
              this.logger.error(err)
              reject(err)
              return err
            }
    
            this.logger.verbose(kafkaProducerResponse)
            resolve({ payload, kafkaProducerResponse })
          })
        })
      }
    
    }
    

  1. 监听kafka消息,消费订单队列消息;

    seckill.module.ts内新增handleListenerKafkaMessage()方法,用于处理kafka消息;

    同时需要在seckill模块挂载(onApplicationBootstrap)时调用此方法,开始订阅kafka消息;

    import { Logger, Module, OnApplicationBootstrap } from '@nestjs/common'
    import * as Redis from 'ioredis'
    import { awaitWrap } from '@/utils'
    import { CreateOrderDTO } from '../order/order.dto'
    import { OrderModule } from '../order/order.module'
    import { OrderService } from '../order/order.service'
    import { RedisClientService } from '../redis/redis.service'
    import { getKafkaConsumer } from './kafka-utils'
    import { SeckillController } from './seckill.controller'
    import { SeckillService } from './seckill.service'
    import { getConfig } from '@root/config'
    
    const { kafkaConfig } = getConfig()
    
    @Module({
      imports: [OrderModule],
      providers: [RedisClientService, SeckillService],
      controllers: [SeckillController],
    })
    export class SeckillModule implements OnApplicationBootstrap {
      logger = new Logger('SeckillModule')
      seckillRedisClient!: Redis.Redis
    
      constructor(
        private readonly orderService: OrderService, //处理订单的Service
        private readonly seckillService: SeckillService, //秒杀相关实现
        private readonly redisClientService: RedisClientService //redis连接
      ) {
        this.redisClientService.getSeckillRedisClient().then(client => {
          this.seckillRedisClient = client
        })
      }
    
      async handleListenerKafkaMessage() {
        const kafkaConsumer = getKafkaConsumer() //抽取出创建消费者实现方法为函数
    
        kafkaConsumer.on('message', async message => {
          this.logger.log('得到的生产者的数据为:')
          this.logger.verbose(message)
    
          let order!: CreateOrderDTO // 从kafka队列得到的订单数据,即service里producer.send的messages内容
    
          if (typeof message.value === 'string') {
            order = JSON.parse(message.value)
          } else {
            order = JSON.parse(message.value.toString())
          }
    
          // 写入数据库,完成订单创建
          const [err, order] = await awaitWrap(this.orderService.saveOne(value))
          if (err) {
            this.logger.error(err)
            return
          }
          this.logger.log(`订单【${order.id}】信息已存入数据库`)
        })
      }
    
      async onApplicationBootstrap() {
        this.logger.log('onApplicationBootstrap: ')
        await this.seckillService.initCount()         //重置redis里商品剩余库存数
        this.handleListenerKafkaMessage()
      }
    }
    
    

  1. kafka消费者getKafkaConsumer方法实现如下:

    在seckill模块文件夹下新增kafka-utils.ts文件:

    import * as kafka from 'kafka-node'
    import * as Redis from 'ioredis'
    import { getConfig } from '@root/config/index'
    import { awaitWrap } from '@/utils'
    
    const { kafkaConfig } = getConfig()
    let kafkaConsumer!: kafka.Consumer
    
    // 获取kafka client
    function getKafkaClient() {
      let kafkaClient!: kafka.KafkaClient
    
      return () => {
        if (!kafkaClient) {
          kafkaClient = new kafka.KafkaClient({
            kafkaHost: kafkaConfig.kafkaHost,
          })
        }
    
        return kafkaClient
      }
    }
    
        /**
       * @desc 获取消费者实例
       */
      export function getKafkaConsumer() {
        // consumer要订阅的topics配置
        const topics = [
          {
            topic: kafkaConfig.topic,
            partition: 0,
            offset: 0,
          },
        ]
    
        const options = {
          //  自动提交配置   (false 不会提交偏移量,每次都从头开始读取)
          autoCommit: true,
          autoCommitIntervalMs: 5000,
          //  如果设置为true,则consumer将从有效负载中的给定偏移量中获取消息
          fromOffset: false,
        }
        const kafkaClient = getKafkaClient()()
    
        if (!kafkaConsumer) {
          kafkaConsumer = new kafka.Consumer(kafkaClient, topics, options)
        }
    
        return kafkaConsumer
      }
    

最后我们得到的文件结构大概是这样:

运行项目:

yarn dev

一些说明

  1. 如果需要并发测试秒杀接口,可以使用postmanrunner多开;简单测试接口逻辑的话,可以打开项目默认配置的swagger-ui页面http://localhost:3000/api-docs

  2. 至此我们的主要秒杀逻辑就写的差不多了。由于我们主要为了实现秒杀逻辑,所有订单模块的代码就没有在这里展开了。我们只需要像第二步那样几行命令就可以简单创建Order模块,用于订单curd;

  3. 关于redis,mysql,kafka等服务的话可以编写docker-compose.yaml快速启动起来,具体可以参考本项目代码;

    kafka容器可能会由于centos的防火墙导致启动失败,解决办法是:先关闭宿主机防火墙再重启docker;
  4. kafka容器创建后,需要我们在打开浏览器访问kafka-manager容器映射的9000端口上kafka管理页面,创建cluster和我们的Topic,具体初始化操作较为简单,可自行搜索kafka-manager

    例如Kafka集群管理工具kafka-manager的安装使用

项目github地址: github.com/wenqieqiu/n…

声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。