要点

我们发现可能出现高并发的地方有:取得服务器时间、获取页面、获取秒杀接口、执行秒杀。

然而获取服务器时间可以不用管,因为很快。获取页面用CDN,我们只作后两个。

redis缓存

用jedis Java客户端操作java对象缓存的api方法,和谷歌的protostuff序列化开源方案(比Java原生的序列化效率要高)来序列化和反序列化对象,缓存对象必须转换为byte字节数组存入redis缓存,不像memcached可以直接缓存对象。实际操作思路是初次查询redis缓存,如果没有就从数据库取出对象放入redis缓存,下次取就会直接从redis缓存取数据了。但缓存有超时时间限制。

pom.xml

<!--Redis 客户端-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.7.3</version>
        </dependency>

        <!--protostuff 序列化依赖-->
        <dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.1.1</version>
        </dependency>

RedisDao

在dao包下再建一个cache包,用于缓存Dao。

package com.jimo.dao.cache;

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import com.jimo.entity.KillOne;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * Created by root on 17-5-30.
 */
public class RedisDao {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    private final JedisPool jedisPool;

    public RedisDao(String ip, int port) {
        this.jedisPool = new JedisPool(ip, port);
    }

    private RuntimeSchema<KillOne> schema = RuntimeSchema.createFrom(KillOne.class);

    //查找缓存,返回对象
    public KillOne getKillOne(long killId) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String key = "killId:" + killId;
            //get --> byte[] --> 反序列化 --> Object(KillOne)
            byte[] bytes = jedis.get(key.getBytes());
            //不为空则获取
            if (bytes != null) {
                //先new个空对象
                KillOne killOne = schema.newMessage();
                //反序列化填充数据
                ProtostuffIOUtil.mergeFrom(bytes, killOne, schema);
                return killOne;
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return null;
    }

    //存入缓存
    public String putKillOne(KillOne killOne) {
        //Object-->序列化-->bytes
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String key = "killId:" + killOne.getKillId();
            byte[] bytes = ProtostuffIOUtil.toByteArray(killOne, schema,
                    LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
            //超时缓存
            int exp = 60 * 60;//1 hours
            return jedis.setex(key.getBytes(), exp, bytes);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return "no";
    }
}

注入redisDao

在spring-dao.xml里:

<!--注入RedisDao的构造方法-->
    <bean id="redisDao" class="com.jimo.dao.cache.RedisDao">
        <constructor-arg index="0" value="localhost"/>
        <constructor-arg index="1" value="6379"/>
    </bean>

修改service

public Exposer exportKillUrl(long killId) {
//        KillOne killOne = killOneDao.queryById(killId);
//        if (killOne == null) {
//            return new Exposer(false, killId);
//        }
        /*
        * 访问缓存步骤:
        * 1、从缓存获取对象
        * 2、为空则从数据库获取,并存入缓存
        * 3、否则从缓存获取
        * */
        KillOne killOne = redisDao.getKillOne(killId);
        if (killOne == null) {
            killOne = killOneDao.queryById(killId);
            if (killOne == null) {
                return new Exposer(false, killId);
            } else {
                redisDao.putKillOne(killOne);
            }
        }

        Date start = killOne.getStartTime();
        Date end = killOne.getEndTime();
        Date now = new Date();
        //不在时间范围内
        if (now.getTime() - start.getTime() < 0 || now.getTime() - end.getTime() > 0) {
            return new Exposer(false, now.getTime(), start.getTime(), end.getTime(), killId);
        }
        String md5 = getMD5(killId);//TODO
        return new Exposer(true, md5, killId);
    }

测试

public class RedisDaoTest {

    @Autowired
    private RedisDao redisDao;
    @Autowired
    private KillOneDao killOneDao;

    @Test
    public void testLogic() throws Exception {
        KillOne killOne = redisDao.getKillOne(1000);
        System.out.println(killOne);
        if (killOne == null) {
            killOne = killOneDao.queryById(1000);
            if (killOne == null) {
                System.err.println("error");
            } else {
                System.out.println(redisDao.putKillOne(killOne));
            }
        }
    }

}

优化1

降低网络延迟和GC的影响,行级锁的持有时间

交换插入和更新的顺序

   * 先插入数据,再更新数据,可以减少行级锁的时间
   * 因为默认mysql 的insert没有行级锁,update是有的
   * 如果插入成功,但更新失败了还是会回滚
@Transactional
    /*
    * 使用声明式事务:
    * 1、开发团队风格明确
    * 2、保证事务执行时间尽可能短,不要穿插耗时操作(RPC或其他网络操作),否则剥离出去
    * 3、不是所有方法都需要事务
    * */
    public KillExecution executeKill(long killId, long userPhone, String md5)
            throws KillException, RepeatKillException, KillCloseException {
        if (md5 == null || !md5.equals(getMD5(killId))) {
            throw new KillException("数据篡改");
        }
        try {
            /**old
             //减库存
             int reduceNumber = killOneDao.reduceNumber(killId, new Date());
             if (reduceNumber <= 0) {
             throw new KillCloseException("秒杀结束");
             } else {
             //记录购买行为
             int insertCount = successKilledDao.insertSuccessKilled(killId, userPhone);
             if (insertCount <= 0) {
             throw new KillException("重复秒杀");
             } else {
             SuccessKilled successKilled = successKilledDao.queryByIdWithKillOne(killId, userPhone);
             return new KillExecution(killId, KillOneStateEnum.SUCCESS, successKilled);
             }
             }*/
            /**new 
             * 先插入数据,再更新数据,可以减少行级锁的时间
             * 因为默认mysql 的insert没有行级锁,update是有的
             * 如果插入成功,但更新失败了还是会回滚
             * */
            //记录购买行为
            int insertCount = successKilledDao.insertSuccessKilled(killId, userPhone);
            if (insertCount <= 0) {
                throw new KillException("重复秒杀");
            } else {
                //减库存
                int reduceNumber = killOneDao.reduceNumber(killId, new Date());
                if (reduceNumber <= 0) {
                    throw new KillCloseException("秒杀结束");
                } else {
                    SuccessKilled successKilled = successKilledDao.queryByIdWithKillOne(killId, userPhone);
                    return new KillExecution(killId, KillOneStateEnum.SUCCESS, successKilled);
                }
            }
            //异常会执行回滚
        } catch (KillCloseException e) {
            throw e;
        } catch (RepeatKillException e2) {
            throw e2;
        } catch (Exception e3) {
            logger.error(e3.getMessage(), e3);
            //所有编译期异常转为运行时异常
            throw new KillException("内部错误:" + e3.getMessage());
        }
    }

优化二

使用存储过程降低行级锁的时间,前面的执行过程都是在客户端执行,现在使用存储过程让这个过程在数据库端执行。

新建killone.sql

delimiter $$ --在命令行使用$$替代;

CREATE PROCEDURE `killone`.`exec_killone`
(in p_kill_id bigint ,in p_user_phone bigint,
in p_kill_time TIMESTAMP ,out result int)
BEGIN
  DECLARE insert_count int DEFAULT 0;
  start TRANSACTION;
  INSERT ignore into success_killed(kill_id,user_phone,create_time)
  VALUES (p_kill_id,p_user_phone,p_kill_time);
  SELECT ROW_COUNT() INTO insert_count;
  if (insert_count = 0)THEN
    ROLLBACK ;
    SET result = -2;
  elseif (insert_count < 0)THEN
    ROLLBACK ;
    SET result = -1;
  ELSE
    UPDATE killone SET
    number = number - 1
    WHERE kill_id=p_kill_id
    AND start_time<p_kill_time
    AND end_time>p_kill_time
    and number>0;
    select row_count() into insert_count;
    if(insert_count=0)THEN
      ROLLBACK ;
      SET result = 0;
    elseif(insert_count<0)THEN
      ROLLBACK ;
      set result = -1;
    ELSE
      COMMIT ;
      set result = 1;
    end if;
  END if;
END;
$$
--存储过程结束

delimiter ; --又定义回来

set @result = -2;
--执行
call exec_killone(1001,11011011011,now(),@result);

select @result;

在mysql里执行测试

MariaDB [killone]> select*from success_killed\G;
*************************** 1. row ***************************
    kill_id: 1000
 user_phone: 13300000000
      state: 0
create_time: 2017-05-21 16:46:26
1 row in set (0.00 sec)

ERROR: No query specified

MariaDB [killone]> call exec_killone(1001,11011011011,now(),@result);
Query OK, 0 rows affected (0.01 sec)

MariaDB [killone]> select @result;
+---------+
| @result |
+---------+
|       0 |
+---------+
1 row in set (0.00 sec)

优化3

这一步将存储过程嵌入到代码里

在service里:

/**
     * 使用存储过程执行秒杀
     *
     * @param killId
     * @param userPhone
     * @param md5
     * @return
     */
    public KillExecution killByProcedure(long killId, long userPhone, String md5) {
        if (md5 == null || !md5.equals(getMD5(killId))) {
            return new KillExecution(killId, KillOneStateEnum.DATA_REWRITE);
        }
        Date killTime = new Date();
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("killId", killId);
        map.put("phone", userPhone);
        map.put("killTime", killTime);
        map.put("result", null);
        try {
            killOneDao.killByProcedure(map);
            Integer result = MapUtils.getInteger(map, "result", -2);
            if (result == 1) {
                return new KillExecution(killId, KillOneStateEnum.SUCCESS);
            } else {
                return new KillExecution(killId, KillOneStateEnum.stateOf(result));
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return new KillExecution(killId, KillOneStateEnum.INNER_ERROR);
    }

上面使用了一个集合工具类:MapUtils

<!--集合工具-->
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
        </dependency>

在KillOneDao里操作数据库:(参数用map)

/**
     * 使用存储过程
     *
     * @param params
     */
    public void killByProcedure(Map<String, Object> params);

对应xml里的实现

<!--调用存储过程-->
    <select id="killByProcedure" statementType="CALLABLE">
        call exec_killone(
        #{killId,jdbcType=BIGINT,mode=IN},
        #{phone,jdbcType=BIGINT,mode=IN},
        #{killTime,jdbcType=TIMESTAMP,mode=IN},
        #{result,jdbcType=INTEGER,mode=OUT}
        )
    </select>

然后是测试

@Test
    public void killByProcedure() {
        Exposer exposer = killOneService.exportKillUrl(1000);
        logger.info("exposor:{}", exposer);
        if (exposer.isExposed()) {
            long phone = 11011111111L;
            KillExecution killExecution = killOneService.killByProcedure(1000, phone, exposer.getMd5());
            logger.info("result:{}", killExecution);
        } else {
            System.out.println("not ok");
        }
    }

results matching ""

    No results matching ""