要点
我们发现可能出现高并发的地方有:取得服务器时间、获取页面、获取秒杀接口、执行秒杀。
然而获取服务器时间可以不用管,因为很快。获取页面用CDN,我们只作后两个。
redis缓存
用jedis Java客户端操作java对象缓存的api方法,和谷歌的protostuff序列化开源方案(比Java原生的序列化效率要高)来序列化和反序列化对象,缓存对象必须转换为byte字节数组存入redis缓存,不像memcached可以直接缓存对象。实际操作思路是初次查询redis缓存,如果没有就从数据库取出对象放入redis缓存,下次取就会直接从redis缓存取数据了。但缓存有超时时间限制。
pom.xml
<!--Redis 客户端-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
<!--protostuff 序列化依赖-->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.1.1</version>
</dependency>
RedisDao
在dao包下再建一个cache包,用于缓存Dao。
package com.jimo.dao.cache;
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import com.jimo.entity.KillOne;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* Created by root on 17-5-30.
*/
public class RedisDao {
Logger logger = LoggerFactory.getLogger(this.getClass());
private final JedisPool jedisPool;
public RedisDao(String ip, int port) {
this.jedisPool = new JedisPool(ip, port);
}
private RuntimeSchema<KillOne> schema = RuntimeSchema.createFrom(KillOne.class);
//查找缓存,返回对象
public KillOne getKillOne(long killId) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String key = "killId:" + killId;
//get --> byte[] --> 反序列化 --> Object(KillOne)
byte[] bytes = jedis.get(key.getBytes());
//不为空则获取
if (bytes != null) {
//先new个空对象
KillOne killOne = schema.newMessage();
//反序列化填充数据
ProtostuffIOUtil.mergeFrom(bytes, killOne, schema);
return killOne;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (jedis != null) {
jedis.close();
}
}
return null;
}
//存入缓存
public String putKillOne(KillOne killOne) {
//Object-->序列化-->bytes
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String key = "killId:" + killOne.getKillId();
byte[] bytes = ProtostuffIOUtil.toByteArray(killOne, schema,
LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
//超时缓存
int exp = 60 * 60;//1 hours
return jedis.setex(key.getBytes(), exp, bytes);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (jedis != null) {
jedis.close();
}
}
return "no";
}
}
注入redisDao
在spring-dao.xml里:
<!--注入RedisDao的构造方法-->
<bean id="redisDao" class="com.jimo.dao.cache.RedisDao">
<constructor-arg index="0" value="localhost"/>
<constructor-arg index="1" value="6379"/>
</bean>
修改service
public Exposer exportKillUrl(long killId) {
// KillOne killOne = killOneDao.queryById(killId);
// if (killOne == null) {
// return new Exposer(false, killId);
// }
/*
* 访问缓存步骤:
* 1、从缓存获取对象
* 2、为空则从数据库获取,并存入缓存
* 3、否则从缓存获取
* */
KillOne killOne = redisDao.getKillOne(killId);
if (killOne == null) {
killOne = killOneDao.queryById(killId);
if (killOne == null) {
return new Exposer(false, killId);
} else {
redisDao.putKillOne(killOne);
}
}
Date start = killOne.getStartTime();
Date end = killOne.getEndTime();
Date now = new Date();
//不在时间范围内
if (now.getTime() - start.getTime() < 0 || now.getTime() - end.getTime() > 0) {
return new Exposer(false, now.getTime(), start.getTime(), end.getTime(), killId);
}
String md5 = getMD5(killId);//TODO
return new Exposer(true, md5, killId);
}
测试
public class RedisDaoTest {
@Autowired
private RedisDao redisDao;
@Autowired
private KillOneDao killOneDao;
@Test
public void testLogic() throws Exception {
KillOne killOne = redisDao.getKillOne(1000);
System.out.println(killOne);
if (killOne == null) {
killOne = killOneDao.queryById(1000);
if (killOne == null) {
System.err.println("error");
} else {
System.out.println(redisDao.putKillOne(killOne));
}
}
}
}
优化1
降低网络延迟和GC的影响,行级锁的持有时间
交换插入和更新的顺序
* 先插入数据,再更新数据,可以减少行级锁的时间
* 因为默认mysql 的insert没有行级锁,update是有的
* 如果插入成功,但更新失败了还是会回滚
@Transactional
/*
* 使用声明式事务:
* 1、开发团队风格明确
* 2、保证事务执行时间尽可能短,不要穿插耗时操作(RPC或其他网络操作),否则剥离出去
* 3、不是所有方法都需要事务
* */
public KillExecution executeKill(long killId, long userPhone, String md5)
throws KillException, RepeatKillException, KillCloseException {
if (md5 == null || !md5.equals(getMD5(killId))) {
throw new KillException("数据篡改");
}
try {
/**old
//减库存
int reduceNumber = killOneDao.reduceNumber(killId, new Date());
if (reduceNumber <= 0) {
throw new KillCloseException("秒杀结束");
} else {
//记录购买行为
int insertCount = successKilledDao.insertSuccessKilled(killId, userPhone);
if (insertCount <= 0) {
throw new KillException("重复秒杀");
} else {
SuccessKilled successKilled = successKilledDao.queryByIdWithKillOne(killId, userPhone);
return new KillExecution(killId, KillOneStateEnum.SUCCESS, successKilled);
}
}*/
/**new
* 先插入数据,再更新数据,可以减少行级锁的时间
* 因为默认mysql 的insert没有行级锁,update是有的
* 如果插入成功,但更新失败了还是会回滚
* */
//记录购买行为
int insertCount = successKilledDao.insertSuccessKilled(killId, userPhone);
if (insertCount <= 0) {
throw new KillException("重复秒杀");
} else {
//减库存
int reduceNumber = killOneDao.reduceNumber(killId, new Date());
if (reduceNumber <= 0) {
throw new KillCloseException("秒杀结束");
} else {
SuccessKilled successKilled = successKilledDao.queryByIdWithKillOne(killId, userPhone);
return new KillExecution(killId, KillOneStateEnum.SUCCESS, successKilled);
}
}
//异常会执行回滚
} catch (KillCloseException e) {
throw e;
} catch (RepeatKillException e2) {
throw e2;
} catch (Exception e3) {
logger.error(e3.getMessage(), e3);
//所有编译期异常转为运行时异常
throw new KillException("内部错误:" + e3.getMessage());
}
}
优化二
使用存储过程降低行级锁的时间,前面的执行过程都是在客户端执行,现在使用存储过程让这个过程在数据库端执行。
新建killone.sql
delimiter $$ --在命令行使用$$替代;
CREATE PROCEDURE `killone`.`exec_killone`
(in p_kill_id bigint ,in p_user_phone bigint,
in p_kill_time TIMESTAMP ,out result int)
BEGIN
DECLARE insert_count int DEFAULT 0;
start TRANSACTION;
INSERT ignore into success_killed(kill_id,user_phone,create_time)
VALUES (p_kill_id,p_user_phone,p_kill_time);
SELECT ROW_COUNT() INTO insert_count;
if (insert_count = 0)THEN
ROLLBACK ;
SET result = -2;
elseif (insert_count < 0)THEN
ROLLBACK ;
SET result = -1;
ELSE
UPDATE killone SET
number = number - 1
WHERE kill_id=p_kill_id
AND start_time<p_kill_time
AND end_time>p_kill_time
and number>0;
select row_count() into insert_count;
if(insert_count=0)THEN
ROLLBACK ;
SET result = 0;
elseif(insert_count<0)THEN
ROLLBACK ;
set result = -1;
ELSE
COMMIT ;
set result = 1;
end if;
END if;
END;
$$
--存储过程结束
delimiter ; --又定义回来
set @result = -2;
--执行
call exec_killone(1001,11011011011,now(),@result);
select @result;
在mysql里执行测试
MariaDB [killone]> select*from success_killed\G;
*************************** 1. row ***************************
kill_id: 1000
user_phone: 13300000000
state: 0
create_time: 2017-05-21 16:46:26
1 row in set (0.00 sec)
ERROR: No query specified
MariaDB [killone]> call exec_killone(1001,11011011011,now(),@result);
Query OK, 0 rows affected (0.01 sec)
MariaDB [killone]> select @result;
+---------+
| @result |
+---------+
| 0 |
+---------+
1 row in set (0.00 sec)
优化3
这一步将存储过程嵌入到代码里
在service里:
/**
* 使用存储过程执行秒杀
*
* @param killId
* @param userPhone
* @param md5
* @return
*/
public KillExecution killByProcedure(long killId, long userPhone, String md5) {
if (md5 == null || !md5.equals(getMD5(killId))) {
return new KillExecution(killId, KillOneStateEnum.DATA_REWRITE);
}
Date killTime = new Date();
Map<String, Object> map = new HashMap<String, Object>();
map.put("killId", killId);
map.put("phone", userPhone);
map.put("killTime", killTime);
map.put("result", null);
try {
killOneDao.killByProcedure(map);
Integer result = MapUtils.getInteger(map, "result", -2);
if (result == 1) {
return new KillExecution(killId, KillOneStateEnum.SUCCESS);
} else {
return new KillExecution(killId, KillOneStateEnum.stateOf(result));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return new KillExecution(killId, KillOneStateEnum.INNER_ERROR);
}
上面使用了一个集合工具类:MapUtils
<!--集合工具-->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
在KillOneDao里操作数据库:(参数用map)
/**
* 使用存储过程
*
* @param params
*/
public void killByProcedure(Map<String, Object> params);
对应xml里的实现
<!--调用存储过程-->
<select id="killByProcedure" statementType="CALLABLE">
call exec_killone(
#{killId,jdbcType=BIGINT,mode=IN},
#{phone,jdbcType=BIGINT,mode=IN},
#{killTime,jdbcType=TIMESTAMP,mode=IN},
#{result,jdbcType=INTEGER,mode=OUT}
)
</select>
然后是测试
@Test
public void killByProcedure() {
Exposer exposer = killOneService.exportKillUrl(1000);
logger.info("exposor:{}", exposer);
if (exposer.isExposed()) {
long phone = 11011111111L;
KillExecution killExecution = killOneService.killByProcedure(1000, phone, exposer.getMd5());
logger.info("result:{}", killExecution);
} else {
System.out.println("not ok");
}
}