Webook¶
约 6553 个字 546 行代码 预计阅读时间 29 分钟
Webook小微书(仿小红书)
-
DDD 框架:Domin-Drive Design
项目启动:
- 前端:在 webook-fe 目录下,执行
npm run dev
- 后端:在 webook 目录下,执行
go run . --config config/dev.yaml
- 配置文件:config/dev.yaml
go run .
:在当前目录下运行,包含 wire 生成的代码- 第三方依赖:在 webook 目录下,执行
docker compose up
- 执行
docker compose down
会删除数据库,结束docker compose up
进程不会 - 包含 mysql 数据库,redis,viper
- mock: 在 Webook 目录下,执行
make mock
注册功能¶
- Bind 绑定请求参数,绑定到结构体 UserSignUpReq
- 用正则表达式校验邮箱和密码格式
- 确认密码和密码一致
- 调用 service 层进行注册
- 返回注册成功
跨域请求: 项目是前后端分离的,前端是 Axios,后端是Go,所以需要跨域请求。
- 跨域请求:协议、域名、端口有一个不同,就叫跨域
- Request Header 和 Response Header 中的字段要对应上
- 采用 middleware 中间件进行跨域请求
docker compose 安装数据库
- 静默启动;
docker compose up
初始化 docker compose 并启动docker compose down
删除 docker compose 里面创建的各种容器,数据库- 只要不 down 数据库一直都在
DDD 框架:Domin-Drive Design
- Domain: 领域,存储对象
- Repository: 数据存储
- Service: 业务逻辑
登录功能¶
登录功能分为两件事:
- 实现登录功能
- 登录状态的校验
登录功能:
- 绑定请求参数,绑定到结构体 UserLoginReq
- 在 service 层中,根据邮箱查询用户是否存在,密码是否正确
- 返回登录结果
登录状态的校验:
- 利用 Gin 的 session 插件,从 cookie 中获取 sessionID,校验登录状态
- 采用 Cookie 和 Session 进行登录状态的保持
- 接入 JWT 后,采用 JWT Token 和 Token Refresh 进行登录状态的保持
Cookie:
- Domain:Cookie 可以在什么域名下使用
- Path:Cookie 可以在什么路径下使用
- Expires/Max-Age:Cookie 的过期时间
- HttpOnly:Cookie 是否可以通过 JS 访问
- Secure:Cookie 是否只能通过 HTTPS 访问
- SameSite:Cookie 是否只能在同一个站点下使
Session:
- 存储在服务器端
- 通过 SessionID 来识别用户
- 一般通过 Cookie 来传递 SessionID
Redis:
- 用户数据存储在 Redis 中
LoginMiddlewareBuilder:
- 登录中间件,用于校验登录状态
- 通过 IgnorePaths 方法,设置不校验登录状态的路径
- 通过 Build 方法,构建中间件: 链式调用
Debug 定位问题:
- 倒排确定:http 发送请求,中间件,业务逻辑,数据库
- F12 查看错误信息
- 后端看日志
Session 的过期时间:
- 通过中间件 LoginMiddlewareBuilder 设置,当访问不在 IgnorePaths 的路径时,会更新 Session 的 update_time 字段
- 同时更新 Session 的过期时间 MaxAge
- 但每次访问都要从 Redis 中获取 Session,性能较差(所以后面引入 JWT)
接入 JWT:
- 在 Login 方法中,生成 JWT Token,并返回给前端 x-jwt-token
- 跨域中间件 设置 x-jwt-token 为 ExposeHeaders
- Middleware 中,解析 JWT Token,验证 signature
- 前端要携带 x-jwt-token 请求
- 实现 JWT Token 的刷新,长短 token 的过期时间不同,多实例部署时,需要考虑 token 的过期时间
登录安全
- 限流,采用滑动窗口算法:一分钟内最多 100 次请求-
- 检查 userAgent 是否一致
Kubernets 入门¶
Pod: 实例 Service: 服务 Deployment: 管理 Pod
准备 Kubernetes 容器镜像:
- 创建可执行文件
GOOS=linux GOARCH=arm go build -o webook .
- 创建 Dockerfile,将可执行文件复制到容器中,并设置入口点
- 在命令行中登录 Docker Hub,
docker login
- 构建容器镜像:
docker build -t techselfknow/webook:v0.0.1 .
删除工作负载 deployment, 服务 service, 和 pods:
- 删除s Deployment:
kubectl delete deployment webook
- 删除 Service:
kubectl delete service webook
- 删除 Pod:
kubectl delete pod webook
Deployment 配置:
- 创建 k8s-webook-deployment.yaml 文件
- 在命令行中执行
kubectl apply -f k8s-webook-deployment.yaml
- 查看 Deployment 状态:
kubectl get deployment
- 查看 Pod 状态:
kubectl get pod
- 查看 Service 状态:
kubectl get service
- 查看 Node 状态:
kubectl get node
Deployment 配置:
- replicas: 副本数,有多少个 pod
- selector: 选择器
- matchLabels: 根据 label 选择哪些 pod 属于这个 deployment
- matchExpressions: 根据表达式选择哪些 pod 属于这个 deployment
- template: 模板,定义 pod 的模板
- metadata: 元数据,定义 pod 的元数据
- spec: 规格,定义 pod 的规格
- containers: 容器,定义 pod 的容器
- name: 容器名称
- image: 容器镜像
- ports: 容器端口
- containerPort: 容器端口
Service 配置:
- 创建 k8s-webook-service.yaml 文件,采用 LoadBalancer 类型
- 在命令行中执行
kubectl apply -f k8s-webook-service.yaml
- 查看 Service 状态:
kubectl get service
Service 中的端口(
spec.ports.targetPort
)和 Deployment 中的端口(spec.containers.ports.containerPort
)对应关系, main.go 中配置的端口(server.Run(":8080")
) 要保持一致.
k8s 中 mysql 配置:
webook main* ❯ kubectl get services
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 38h
webook-mysql LoadBalancer 10.101.251.206 localhost 3309:32695/TCP 18s
区分服务端口和容器端口:
- 服务端口 port:外部访问的端口
- 容器端口 targetPort:容器内部监听的端口
- ```yaml
ports:
- protocol: TCP port: 3309 targetPort: 3306 ```
k8s 中 mysql 持久化存储配置:
- 创建 k8s-mysql-deployment.yaml 文件
- 创建 k8s-mysql-pv.yaml 文件
- 创建 k8s-mysql-pvc.yaml 文件
- 在命令行中执行
kubectl apply -f k8s-mysql-pv.yaml
- 在命令行中执行
kubectl apply -f k8s-mysql-pvc.yaml
- 在命令行中执行
kubectl apply -f k8s-mysql-deployment.yaml
持久化之后,mysql 数据存储在 /mnt/data
目录下,而不是在容器中。
删除 Deployment 后,mysql 数据不会丢失,因为数据存储在 PV 中。
重新创建 Deployment 后,mysql 数据会从 PV 中恢复。
持久化存储:
- PV: 持久化卷,物理存储
- PVC: 持久化卷声明,逻辑存储
- 持久化存储的挂载路径:/var/lib/mysql (mysql 数据存储路径)
配置 mysql 的 k8s 环境
spec:
selector:
app: webook-mysql
ports:
- protocol: TCP
# 服务端口, 外部访问的端口
port: 11309
# 容器端口, 容器内部监听的端口
targetPort: 3306
# type 为 NodePort 时, 需要指定 nodePort
# 指定 nodePort 后, 可以通过 nodeIP:nodePort 访问服务
nodePort: 30002
type: NodePort
port (Service 端口):
- 这是 Service 暴露给 Kubernetes 集群内部其他 Pod 或 Service 的端口。
- 当集群内部的 Pod 需要访问这个 Service 时,它们会使用这个端口。
- 在上面的 YAML 示例中,port: 11309 表示 Service 会在 11309 端口上监听连接请求。
- 客户端(在集群内部)访问 Service 时,会使用这个端口进行连接。
- 注意: 这个端口仅在 Kubernetes 集群内部使用。
targetPort (Pod 端口):
- 这是 Service 将请求转发到的目标 Pod 的端口。
- targetPort 通常与 Pod 中运行的容器监听的端口一致。
- 在上面的 YAML 示例中,targetPort: 3306 表示 Service 会将连接请求转发到目标 Pod 的 3306 端口,即你的 MySQL 容器内部监听的端口。
- 通常,你的 MySQL 服务(或者其他应用程序)在容器内部会监听这个端口。
- 注意: 在 Kubernetes 中,Pod 内部的端口号是相对于 Pod 内部的网络命名空间而言的。
nodePort (Node 端口):
- 这是当你的 Service type 设置为 NodePort 时,Kubernetes 集群中每个节点的 IP 地址上都会暴露的端口。
- 当你需要从 Kubernetes 集群外部访问你的 Service 时,可以使用节点的 IP 地址和这个 nodePort 进行访问。
- 在上面的 YAML 示例中,nodePort: 30002 表示 Kubernetes 会在所有节点的 IP 地址上开启 30002 端口,并将发送到这个端口的流量转发到 Service。
- 客户端(在集群外部)可以通过节点的 IP 地址和 nodePort 连接到服务。
- 注意: NodePort 的端口号通常在 30000-32767 之间,并且必须是唯一的。
- 在上面的 YAML 示例中,nodePort: 30002 表示 Kubernetes 会在所有节点的 IP 地址上开启 30002 端口,并将发送到这个端口的流量转发到 Service。
- 客户端(在集群外部)可以通过节点的 IP 地址和 nodePort 连接到服务。
- 注意: NodePort 的端口号通常在 30000-32767 之间,并且必须是唯一的。
- 注意: 使用 NodePort 时,你仍然需要访问 Kubernetes 集群节点来访问服务。它并不直接将端口暴露到互联网上。
WRK 压测¶
- 安装 wrk:
brew install wrk
- 压测:
wrk -t4 -c100 -d10s -s ./scripts/signup.lua http://localhost:8080/users/signup
- -t4:4 个线程
- -c100:100 个连接
- -d10s:10 秒
- -s ./scripts/signup.lua:lua 脚本
- http://localhost:8080/users/signup:请求路径
如何在测试中维护登录状态:
- 在初始化中模拟登录,拿到对应的登录态的 cookie
- 手动登录,复制对应的 cookie,在测试中使用
Redis 缓存优化¶
查询用户时,先从 Redis 缓存中查询,如果缓存中没有,则从数据库中查询,并将查询结果缓存到 Redis 中。 - 缓存中的 user 是 domain.User,数据库中的 user 是 dao.User,从数据库查询到 user 后,需要将 dao.User 转换为 domain.User - 数据库限流:数据库限流,防止缓存击穿后,数据库压力过大 - 缓存失败:属于偶发事件,从数据库中查询到用户,但缓存失败,此时我们打日志,做监控,不返回错误。
短信验证码登录¶
需求分析¶
参考竞品:参考别人的实现
从不同角度分析:
- 功能角度:具体做到哪些功能,不同功能的优先级
- 非功能角度:
- 安全性:保证系统不会被人恶意搞崩
- 扩展性:应对未来的需求变化,这很关键
- 性能:优化用户体验
- 从正常和异常流程两个角度思考
系统设计¶
手机验证码登录有两件事:验证码,登录
- 两个是强耦合吗?
- 其他业务会用到吗?
模块划分:
- 一个独立的短信发送服务
- 在独立的短信发送服务的基础上,封装一个验证码功能
- 在验证码功能的基础上,封装一个登录功能
设计原则:
- 类 A 需要使用类 B 的功能,那么 A 应该依赖于一个接口(例如 BInterface),而不是直接依赖于类 B 本身。
- 如果 A 需要使用 B,那么 B 应该作为 A 的字段(成员变量)存在,而不是作为包变量或包方法。
- A 用到了 B,A 绝对不初始化 B,而是外面注入 => 保持依赖注入(DI) 和 依赖反转(IoC)
cache/dao 中的 err 定义( var ErrCodeNotFound = errors.New("code not found")
),在 repository 中使用时,要再次定义 (var ErrCodeNotFound = cache.ErrCodeNotFound
),在 Service 中用 repo.ErrCodeNotFound
来使用。
- 解耦层级依赖,每个层级都知道自己可能抛出的错误,并处理这些错误
- 通过将错误变量定义在相关层级中,可以更清晰地了解每个层级的行为和可能发生的错误。
发验证码的并发问题,引入 lua 脚本,解决并发问题
引入手机号登录后,需要修改 dao 层,添加 phone 字段
- 在邮箱登录时,phone 字段为空
- 在手机号登录时,email 字段为空
- 但是 email 和 phone 字段都是唯一索引
解决方法,采用 sql.NullString
类型,允许空值
- 在邮箱登录时,phone 字段为空
- 在手机号登录时,email 字段为空
- 但是 email 和 phone 字段都是唯一索引
sms 登录校验¶
- 通过手机号查询用户是否存在
- 用户不存在,创建用户
- 创建一个用户
- 根据手机号查询刚创建的用户
- 存在主从延迟的问题,可能查询不到 3. 用户存在,直接返回 4. 返回用户信息
面向接口编程¶
面向接口编程,是为了 扩展性,而不是为了提高性能或者可靠性。
从结构体到接口:
- 左侧的代码使用 struct 定义 UserService,这意味着它是一个具体类型。任何使用 UserService 的地方都直接依赖于这个具体的实现。
- 右侧的代码定义了一个 UserService 接口,它定义了 SignUp、Login、Profile 和 FindOrCreate 这几个方法,而 UserServiceStruct 则是一个实现了这个接口的具体结构体。
构造函数的变化:
- 左侧构造函数返回 *UserService,即 UserService 结构体的指针。
- 右侧构造函数返回 UserService 接口,而不是具体的结构体。
Profile 接口¶
Web 层:
- 获取 JWT 中的用户信息
- 调用 Service 层获取用户信息
- 返回用户信息
Service 层:
- 调用 Repository 层的 FindById 方法获取用户信息
- 返回用户信息
Repository 层:
- 从缓存中获取用户信息
- 缓存中没有,从数据库中获取
- 将 dao.User 转换为 domain.User :添加个人信息字段
- 将 domain.User 转换为 cache.User :添加个人信息字段
- 将 domain.User 缓存到 Redis 中
- 返回用户信息
Edit 接口与 Profile 接口的类似,但 Edit 接口在 repo 层需要更新缓存(先删除,再创建)。
单元测试¶
单元测试:针对每一个方法进行的测试,单独验证每一个方法的正确性。 - 理论上来说,你不能依赖任何第三方组件,包括数据库、缓存、外部服务等
集成测试:多个组件合并在一起的测试,验证各个方法、组件之间的配合无误。
测试用例定义:
使用 mock
- 安装 mockgen 工具:
go install github.com/golang/mock/mockgen@latest
- 为 UserHandler 依赖的 UserService 生成 mock 实现:
mockgen -source=./webook/internal/service/user.go -destination=./webook/internal/service/mocks/user.go -package=svcmocks
- 测试用例匿名结构体定义。
- 执行测试用例的整体代码
- 设计具体测试用例并运行:
- 最开始考虑正常流程
- 在正常流程的基础上考虑异常流程。
测试用例定义示例:
testCases := []struct {
name string
mock func(ctrl *gomock.Controller) service.UserService
reqBody string
wantCode int
wantBody string
}{
// 注册成功
{
name: "注册成功",
// 模拟依赖:返回一个 mock 的 UserService
mock: func(ctrl *gomock.Controller) service.UserService {
userSvc := svcmocks.NewMockUserService(ctrl)
// 期待调用 SignUp 方法,传入任意 context 和匹配的 domain.User 对象,返回 nil
userSvc.EXPECT().SignUp(gomock.Any(), domain.User{
Email: "123@qq.com",
Password: "1234#qwe",
}).Return(nil)
return userSvc
},
// 请求参数
reqBody: `{"email":"123@qq.com","password":"1234#qwe","confirmPassword":"1234#qwe"}`,
// 期望响应
wantCode: http.StatusOK,
wantBody: "注册成功",
},
}
执行测试用例:
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// 创建 userHandler 及所需的依赖 userService
server := gin.Default()
userSvc := tc.mock(ctrl)
userHandler := NewUserHandler(userSvc, nil)
userHandler.RegisterRoutes(server.Group("/users"))
// 创建请求
req, err := http.NewRequest(http.MethodPost, "/users/signup", bytes.NewBuffer([]byte(tc.reqBody)))
req.Header.Set("Content-Type", "application/json")
assert.Nil(t, err)
// 执行请求
resp := httptest.NewRecorder()
server.ServeHTTP(resp, req)
// 检查响应
assert.Equal(t, tc.wantCode, resp.Code)
assert.Equal(t, tc.wantBody, resp.Body.String())
})
}
高可用的短信服务¶
抽象接口:定义了统一的短信服务接口
客户端限流¶
//go:embed slide_window.lua
var luaSlideWindow string
// RedisSlideWindowLimiter 基于 Redis 的滑动窗口限流器
type RedisSlideWindowLimiter struct {
cmd redis.Cmdable
// 窗口大小
interval time.Duration
// 阈值
rate int
}
func (r *RedisSlideWindowLimiter) Limit(ctx context.Context, key string) (bool, error) {
return r.cmd.Eval(ctx, luaSlideWindow, []string{key},
r.interval.Milliseconds(), r.rate, time.Now().UnixMilli()).Bool()
}
采用装饰器模式,对短信服务进行限流。
- 装饰器模式:不改变原有实现而增加新特性的一种设计模式
failover 策略¶
自动切换短信服务商
- 轮询:出现错误,就换一个服务商进行重试
- 基于超时相应的判定:连续超过 N 个请求超时,切换服务商。
提高安全性¶
增加了 JWT Token 的验证。
type AuthSMSService struct {
svc sms.Service
key []byte
}
type AuthSMSClaims struct {
jwt.RegisteredClaims
Tpl string
}
func (s *AuthSMSService) Send(ctx context.Context, tplToken string, args []string, numbers ...string) error {
var claims AuthSMSClaims
_, err := jwt.ParseWithClaims(tplToken, &claims, func(t *jwt.Token) (interface{}, error) {
return s.key, nil
})
if err != nil {
return err
}
return s.svc.Send(ctx, claims.Tpl, args, numbers...)
}
微信扫码登录¶
微信扫码登录是采用微信 OAuth2.0 授权登录。
请求 Code(临时授权码)
- 发起授权: 用户点击“微信登录”按钮,应用程序将用户重定向至微信的授权页面。
- 构造授权链接: 重定向 URL 包含微信授权页面的地址,以及应用程序的 appid、redirect_uri(回调地址)和 state(状态参数)。 redirect_uri 必须进行 URL 编码。 state 用于防止 CSRF 攻击,应生成随机唯一的值。
- 用户授权: 用户在微信授权页面完成扫码登录并授权。
- 微信回调: 微信将用户重定向回 redirect_uri,并在 URL 参数中包含 code(临时授权码)和 state。
- 后端处理: 后端服务器接收到回调请求 callback,验证 state 参数,然后提取 code。 后端使用此 code 向微信服务器请求 Access Token,OpenID 和 UnionID, 然后通过 OpenID 和 UnionID 查询用户信息。
长短 token 设计¶
短 token:access_token, 用于访问资源,有效期短。
长 token:refresh_token, 用于刷新 access_token,有效期长。
用户登录¶
- 在 setJWTToken 后,生成 refresh_token
- 让前端保存 refresh_token
- 在 CORSConfig 的 ExposeHeaders 中添加
x-refresh-token
- 在 CORSConfig 的 ExposeHeaders 中添加
- 前端每次请求时候,都携带 token
- 请求资源时,在 Authorization 中携带 access_token
- 调用 RefreshToken 的时候,在 Authorization 中携带 refresh_token,用来生成新的 access_token
- 如果 access_token 过期,则调用 RefreshToken 生成新的 access_token, 前端再用新的 access_token 请求资源
用户退出¶
JWT token 本身是无状态的,在这里在 Redis 用 Ssid 来记录 token 的状态。
- 用户登录的时候,生成一个标识 ssid,并置为有效,放到长短 token 中
- 在
jwt.go
中,生成 token 时,加入 ssid 字段。
- 在
- 用户登录校验的时候,检查 ssid 是否有效
- 用户更新长 token 的时候,也要检查 ssid 是否有效
- 用户在退出登录的时候,把 ssid 置为无效
配置¶
配置来源:启动参数,环境变量,配置文件,远程配置中心
-
启动参数:某一次运行的参数,可以考虑在这里提供。最为典型的就是命令行工具,会要求你传入各 种参数,例如在 mockgen 中传递的 source、destination。
-
环境变量:和具体的实例有关的参数都放在这里。比如说在实例的权重,或者实例的分组信息。
-
配置文件:一些当下环境中所需要的通用的配置,比如说我们的数据库连接信息等。
-
远程配置中心:它和配置文件可以说是互相补充的,除了启动程序所需的最少配置,剩下的配置都可 以放在远程配置中心。
viper¶
viper 是一个配置管理工具。
- viper.SetConfigName() 和 viper.SetConfigType(): 指定配置文件名和类型。
- viper.AddConfigPath(): 添加配置文件的查找路径,Viper 会按照添加的顺序查找。
- viper.AutomaticEnv(): 自动读取环境变量。
- viper.SetEnvPrefix(): 设置环境变量的前缀,避免与其他环境变量冲突。
- viper.BindEnv(): 将配置项和环境变量绑定,方便使用环境变量覆盖配置文件中的值。
- viper.SetDefault(): 设置默认值,防止程序因缺少配置而崩溃。
- viper.WatchConfig() 和 viper.OnConfigChange(): 监听配置文件变化,实现动态配置更新。
- viper.Get...() 方法: 用于获取配置值,注意类型转换。
读取配置文件:
func InitViper() {
viper.SetConfigName("dev")
viper.SetConfigType("yaml")
viper.AddConfigPath("./config")
err := viper.ReadInConfig()
if err != nil {
panic(err)
}
}
为了让 viper 在不同环境下加载不同的配置文件
- 在启动的时候,传入一个启动参数:
--config=config/dev.yaml
etcd¶
配置文件的缺点是不够灵活,难以实现权限控制,实时更新,所以考虑使用远程配置中心:etcd
在 etcd 中添加配置:
- 从文件中写入:
etcdctl --endpoints=127.0.0.1:12379 put /webook "$(<dev.yaml)"
- 读取配置:
etcdctl --endpoints=127.0.0.1:12379 get /webook
viper 同时支持监听配置变更
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
println("Config file changed:", e.Name)
})
扩展:
TDD¶
第一步:根据对需求的理解,初步定义接口。在这 个步骤,不要害怕定义的接口不合适,必然会不合适。
第二步:根据接口定义测试,也就是参考我给出的 测试模板,先把测试的框架写出来。
第三步:执行核心循环。
- 增加测试用例。
- 提供/修改实现。
- 执行测试用例。
发帖功能(TDD)¶
需求分析¶
创作者:
- 撰写文章
- 修改文章
- 发表文章
- 删除文章
- 文章列表
读者:
- 文章列表
- 阅读文章
状态图:
PublishWithTwoRepo¶
Service¶
这里我写一下在 Service 层中的思考过程,用 Table-Driven Test 的方式来组织测试用例。
PublishWithTwoRepo 依靠两个不同的 repository 来解决跨表,或者跨库的问题。
-
一个 Service 控制两个 repo:读者库和写者库
-
go type ArticleAuthorRepository interface { Create(ctx context.Context, article domain.Article) (int64, error) Update(ctx context.Context, article domain.Article) (int64, error) }
-
go type ArticleReaderRepository interface { // Save 读者只有保存写者创建或修改的文章,即只能被动更新 Save(ctx context.Context, article domain.Article) (int64, error) }
测试模版:包含测试参数和如何运行
func TestArticleService_Publish(t *testing.T) {
testCases := []struct {
name string
mock func(ctrl *gomock.Controller) (article.ArticleAuthorRepository, article.ArticleReaderRepository)
// service 的参数
article domain.Article
// service 的期待返回值
wantId int64
wantErr error
}{
{},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
authorRepo, readerRepo := tc.mock(ctrl)
svc := NewArticleServiceWithTwoRepo(authorRepo, readerRepo)
resId, err := svc.PublishWithTwoRepo(context.Background(), tc.article)
assert.Equal(t, tc.wantErr, err)
assert.Equal(t, tc.wantId, resId)
})
}
}
在 Service 中的实现,先调用写者库写入文章,然后再调用读者库返回 id 和 err。
func (a *articleService) PublishWithTwoRepo(ctx context.Context, article domain.Article) (int64, error) {
// 写者库发表文章
var id = article.Id
var err error
if article.Id > 0 {
id, err = a.authorRepo.Update(ctx, article)
} else {
id, err = a.authorRepo.Create(ctx, article)
}
if err != nil {
return 0, err
}
// 确保写者库和读者库的 id 一致
article.Id = id
return a.readerRepo.Save(ctx, article)
}
所以在测试用例中,要分别模拟两个 Create 的 Expect。
authorRepo.EXPECT().Create(gomock.Any(), ...)
: 期望 authorRepo 的 Create 方法被调用,并使用 gomock.Any() 作为 context.Context 的参数。 只有匹配了 Article 对象的内容,确保只有在传入的文章与期望完全一致时,Mock 才会匹配成功。- readerRepo.EXPECT().Create(gomock.Any(), ...): 类似地,配置了 readerRepo 的 Save 方法。 注意到 readerRepo 期望接收的 Article 对象的 Id 是 1,这是从 authorRepo 返回的 ID。
{
name: "创建文章,并发布成功",
mock: func(ctrl *gomock.Controller) (article.ArticleAuthorRepository, article.ArticleReaderRepository) {
authorRepo := repomocks.NewMockArticleAuthorRepository(ctrl)
readerRepo := repomocks.NewMockArticleReaderRepository(ctrl)
// 模拟写者库创建文章的过程,要求入参为 Id 为 0 的 Article
// 返回 1,nil
authorRepo.EXPECT().Create(gomock.Any(), domain.Article{
Id: 0, // 默认是 0,不写这行也行
Title: "create article and publish",
Content: "this is content",
Author: domain.Author{
Id: 666,
},
}).Return(int64(1), nil)
// 模拟读者库创建文章的过程,要求入参为 Id 为 1 的 Article
// 返回 1,nil
readerRepo.EXPECT().Save(gomock.Any(), domain.Article{
Id: 1, // 用写者库的 id
Title: "create article and publish",
Content: "this is content",
Author: domain.Author{
Id: 666,
},
}).Return(int64(1), nil)
return authorRepo, readerRepo
},
article: domain.Article{
Title: "create article and publish",
Content: "this is content",
Author: domain.Author{
Id: 666,
},
},
wantId: 1,
wantErr: nil,
},
考虑部分失败的问题:写者库写入成功,但是读者库保存失败了。
- 解决方案:增加重试机制。
// 读者库保存文章,如果失败,则重试, 重试至多 3 次
for i := 0; i < 3; i++ {
time.Sleep(time.Second * time.Duration(i))
id, err = a.readerRepo.Save(ctx, article)
if err == nil {
break
}
a.logger.Error("save article to reader repo failed, try again",
logger.Int64("article id: ", article.Id),
logger.Error(err),
)
}
if err != nil {
// 重试 3 次仍然失败,则返回错误
a.logger.Error("reader repo save article failed",
logger.Int64("article id: ", article.Id),
logger.Error(err),
)
}
DAO¶
两个 Repo 对应的就该有两个 GORM 的 Entity 和他对应,相应的,数据库中也有两个 Table 与 Entity 联系起来。
// 作者库:author 进行写入和更新,删除。。
type Article struct {
Id int64 `gorm:"primaryKey,autoIncrement"`
Title string `gorm:"type=varchar(1024)"`
Content string `gorm:"type=BLOB"`
// 作者 id, 在 author_id 上建立索引
AuthorId int64 `gorm:"index"`
// 创建和修改时间,毫秒时间戳
Ctime int64
Utime int64
}
// 线上库:reader 进行被动更新
type PublishedArticle struct {
Article
}
fix bug¶
这次的改动是修复了发布到线上库的 id 判断,之前的 id 以为和发布在 写者库一样(新创建的是 0,其他大于零),实际上是 第一次创建就是 1,这个 1 是写者库传过来的id,所以要先查询线上库中有没有该 id,再判断是创建还是更新操作。
// 线上库更新
// 类似于 FindOrCreate 中的实现,先查询线上库是否存在,不存在则创建,存在则更新
res, err := a.readerRepo.FindById(ctx, art.Id)
if err != nil {
a.logger.Error("find article by id failed",
logger.Int64("article id: ", art.Id),
logger.Error(err),
)
return 0, err
}
// 线上库的最小 id 是 1,则说明文章不存在,创建文章
if res.Id < 1 {
return a.readerRepo.Create(ctx, art)
}
// 线上库存在,则更新
return a.readerRepo.Update(ctx, art)
improve: 在 dao 上用事务¶
之前的写法有一种问题,无法保证作者库和线上库的一致性。例如在作者库创建成功,在线上库创建失败,重试多次失败。
所以考虑用 事务 来保证两者的一致性。
- 一种方法是:在 repository 层面上,创建两个 dao 来管理两个库
- 另一种方法是:在 dao 层面上,用一个事务来管理两张表(同库不同表)
第一种方法,会导致在 repo 层面上直接操作数据库,跨层依赖,没有坚持面向接口的原则。所以这里我采用了第二种方法。
利用 GORM 的闭包,实现一个执行 事务 的操作。
func (dao *GormArticleDAO) Sync(ctx context.Context, art Article) (int64, error) {
var id = art.Id
// 使用事务,保证写者库和读者库的一致性
err := dao.db.WithContext(ctx).Transaction(func(txDb *gorm.DB) error {
var err error
now := time.Now().UnixMilli()
// 写者库,
dao := NewArticleDAO(txDb)
if id > 0 {
id, err = dao.UpdateById(ctx, art)
} else {
id, err = dao.Insert(ctx, art)
}
if err != nil {
return err
}
// 读者库:Upsert 即 update or insert
art.Id = id
pubArt := PublishedArticle{
Article: art,
}
pubArt.Ctime = now
pubArt.Utime = now
err = txDb.Clauses(clause.OnConflict{
// id 冲突的时候执行 update,否则执行 insert
Columns: []clause.Column{{Name: "id"}},
// update 的时候,只更新 title 和 content, utime
DoUpdates: clause.Assignments(map[string]interface{}{
"title": art.Title,
"content": art.Content,
"utime": now,
}),
}).Create(&pubArt).Error
return err
})
return id, err
}
维护文章状态¶
可能存在状态变更的地方:
- 新建文章,还未发表:
ArticleStatusUnknown
->ArticleStatusUnpublished
- 发表文章:
ArticleStatusUnpublished
->ArticleStatusPublished
- 编辑文章:->
ArticleStatusPublished
- 撤回文章:->
ArticleStatusPrivate
- 删除文章:->
ArticleStatusArchive
添加对应的两个接口实现。
Withdraw(ctx context.Context, art domain.Article) (int64, error) // 撤回,仅自己可见
Delete(ctx context.Context, art domain.Article) (int64, error) // 删除,软删除
MongoDB¶
MongoDB 是 NOSQL 的一种。NOSQL 是指 Not Only SQL ,不仅仅是 SQL。
MongoDB 是文档数据库。
- 面向集合存储:集合中存放很多文档。
- 模式自由:不需要预先定义文档模型,且可以灵活修改。
- 支持分片
EDMA 是一种用来理解 BSON 文档结构的简化模型,它将 BSON 文档分解为四种基本类型:键值对 (E)、有序文档 (D)、无序文档 (M) 和数组 (A)。 这种模型可能有助于新手更容易理解 BSON 的结构。
-
E (Element): 一个简单的键值对结构体 (key-value pair)。 Key 是字符串类型,Value 可以是其他三种类型之一。 在 Go 代码中表示为:
-
D (Document): 本质上是 E 的一个切片 (slice)。 它可以理解为一个有序的键值对列表,对应 BSON 中的文档。 在 Go 代码中表示为:
示例:
-
M (Map): 本质是一个 Go 中的
map[string]interface{}
。 key 必须是字符串, value 可以是任何类型,包括其他三种类型。 对应 BSON 中的文档,但与 D 不同,Map 是无序的。 在 Go 代码中表示为:示例:
-
A (Array): 是一个切片
[]interface{}
,表示 BSON 中的数组。 元素可以是其他三种类型或者任意类型。 在 Go 代码中表示为:示例:
雪花算法¶
- 41 比特位的时间戳
- 10 比特位的机器位
- 12 比特的自增序号
- 1 比特保留位
因为 MongoDB 中没有自增主键,所以我们用雪花算法来生成一个 GUID (Global Unify ID) 全局唯一 ID。
- GUID 是业务上的层面,UUID 是技术上的层面,也可以用 UUID 来生成 GUID
用 MongoDB 实现 ArticleRepository 接口¶
这里没有事务来保证写者库和线上库的一致性了。
func (dao *MongoDBArticleDAO) Upsert(ctx context.Context, art Article) (int64, error) {
var (
id = art.Id
err error
)
// 写者库
if id > 0 {
id, err = dao.UpdateById(ctx, art)
} else {
id, err = dao.Insert(ctx, art)
}
if err != nil {
return 0, err
}
// 线上库
art.Id = id
now := time.Now().UnixMilli()
art.Utime = now
filter := bson.D{
bson.E{
Key: "id",
Value: id,
},
}
set := bson.D{
bson.E{
Key: "$set",
Value: art,
},
bson.E{
Key: "$setOnInsert",
Value: bson.D{
bson.E{
Key: "ctime",
Value: now,
},
},
},
}
opt := options.Update().SetUpsert(true)
_, err = dao.liveCol.UpdateOne(ctx, filter, set, opt)
return id, err
}
set 和 setOnInsert:
- set 用于更新字段
- setOnInsert 用于仅在插入时设置字段。
Upsert
- MySQL 的 Upsert 操作通常使用 INSERT ... ON DUPLICATE KEY UPDATE 语句实现。
- MongoDB 的 Upsert 操作通过 update() 方法,并设置
upsert: true
选项来实现。
优化:如何实现写者库和线上库的一致性
-
2PC:两阶段提交 (分布式事务协议)
-
最终一致性:采用消息队列(Kafaak,RabbitMQ)
-
开启一个事务(MongoDB 4.x+ 支持多文档事务)
OSS¶
OSS 是指对象存储,Object Storage Service。
- Bucket:是一种逻辑上的分组关系。
- Object:对象,存储的东西。
在 Bucket 和 Object 上,都可以做权限控制(ACL,Access Control List)
绝大部分 OSS服务器都兼容了 S3(Amazon 的 Simple Storage Service)api。S3 api 是一个 RESTful 风格的 api。
查询接口¶
查询接口包括:
作者的查询接口
-
列表接口:在自己的创作中心能够看到自己发表的所有的文章,这是一个分页接口。
-
详情接口:当作者在选中了某篇文章的时候,能看到文章的全部内容。
读者的查询接口
-
搜索接口:这个需要我们接入搜索模块之后再设计。
-
推荐接口:当读者打开首页的时候,系统可以针对用户的喜好来推荐一些文章,这个也需要我们将来再 设计。
-
阅读文章接口:当选中某篇文章的时候,可以看到文章的全部内容。
作者的列表接口¶
设计为分页接口,分页包含两个参数:偏移量 offset 和 数据量 limit
- offset = 20 limit = 10:从 21 条开始,往后的十条数据。
缓存设计¶
只缓存第一页¶
缓存时,可能存在读写不一致的情况。这里我们采用在数据变更,删除对应的缓存。
// 如果是第一页,从缓存中获取
if offset == 0 && limit <= 100 {
cachedArts, err := c.cache.GetFirstPage(ctx, userId)
if err == nil {
// 缓存命中
c.logger.Info("缓存命中",
logger.Int64("userId", userId),
)
return cachedArts[:limit], nil
}
}
这里我犯了一个错误,当缓存的文章数量小于 limit 的时候,cachedArts[:limit]
会越界 panic ,所以直接返回 cachedArts
即可。
在查询 List 的时候,进行回写缓存。
// 异步缓存第一页的数据
go func() {
// 设置缓存超时时间
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 缓存第一页的数据
if offset == 0 && limit <= 100 {
err := c.cache.SetFirstPage(ctx, userId, result)
if err != nil {
c.logger.Error("缓存第一页的数据失败",
logger.Int64("userId", userId),
logger.Int64("limit", int64(limit)),
logger.Int64("offset", int64(offset)),
logger.Error(err),
)
}
}
}()
缓存预加载¶
在返回列表查询的数据之后,可以把部分数据放进缓存里面
- 这里我们取列表中的第一篇文章,放进缓存
- 预缓存的过期时间设置的短一些(预测效果越不好,就越要短)
func (c *RedisArticleCache) Set(ctx context.Context, art domain.Article) error {
jsonData, err := json.Marshal(art)
if err != nil {
return err
}
return c.client.Set(ctx, c.detailKey(art.Id), jsonData, time.Minute).Err()
}
改进:不缓存大文章
- 检测文章的内容大小,内容过大不缓存。
func (c *CachedArticleRepository) preCache(ctx context.Context, arts []domain.Article) {
const detailExpire = time.Minute
const contentSizeThreshold = 1024 * 1024 // 1MB
if len(arts) > 0 && len(arts[0].Content) < contentSizeThreshold {
art := arts[0]
if err := c.cache.Set(ctx, art); err != nil {
c.logger.Error("预缓存第一篇文章失败", logger.Error(err))
}
}
}
改进:在 Publish 接口调用的时候,直接设置好对应的线上库的文章缓存(Public Article)
- 当一个新帖子发布之后,一般作者会先去线上库访问检查一下。
改进:缓存过期时间设置
- 过期时间越长,命中率越高,但是数据一致性会变差;过期时间越短,命中率越低,但是数 据一致性会更好。
改进:淘汰策略
- 采用 LRU(最近最少使用)策略。
- 优先淘汰访问量少的文章,尽可能留下访问量多的文章
阅读、点赞、收藏¶
阅读、点赞和收藏是否是一个通用的 功能?
-
这一点你从前面我说过的竞品分析里面就可以看出来,比如 说大部分支持评论的网站,评论本身也允许点赞、收藏。
-
又比如说支持多种内容类型的平台,不同的照片、视频也有 阅读、点赞和收藏的功能。
-
也就是说:你不是设计一个专门给文章用的,你是要设计一 个可以给任何资源用的阅读、点赞和收藏功能。
这里我们在 Web 层中的 ArticleHandler 中聚合一下 interactive Service(交互服务)
IncreaseReadCnt¶
当用户访问线上库的文章的时候,即调用 PublicDetail
接口。
-
增加一个阅读计数的功能。
-
go // 增加阅读计数 go func() { err := a.interSvc.IncreaseReadCnt(ctx, a.biz, article.Id) if err != nil { a.logger.Error("增加阅读计数失败", logger.Int64("id", article.Id), logger.Error(err), ) } }()
-
biz + bizId 来唯一标识某个特定业务。
-
先操作数据库,在更新缓存:相当于在 update 时候,更新下缓存。
点赞、收藏、阅读共用一个服务:interactive Service,共用一张表:Interactive
// 交互表:点赞、收藏、阅读,区分业务(biz, bizid)
type Interactive struct {
Id int64 `gorm:"primaryKey,autoIncrement"`
// <bizid, biz>
BizId int64 `gorm:"uniqueIndex:biz_type_id"`
// WHERE biz = ?
Biz string `gorm:"uniqueIndex:biz_type_id"`
ReadCnt int64
LikeCnt int64
CollectCnt int64
Utime int64
Ctime int64
}
阅读量是高频访问的数据,所以应该做好缓存,防止超高的 QPS 压垮数据库。
-
这里我采用 Redis 的 map 结构,也就是一个 key 对应一个 map。而一个 map 里面放着:read_cnt、like_cnt、collect_cnt
-
redis 中的数据更新:
-- redis 的 map 结构对应的 key local key = KEYS[1] -- cntKey:map 中的操作字段 local cntKey = ARGV[1] -- delta:表示增量(-1 、+1) local delta = tonumber(ARGV[2]) -- 检查 key 是否存在 local exists = redis.call("EXISTS", key) if exists == 1 then -- HINCRBY 命令会自动处理字段不存在的情况, -- 如果字段不存在,它会先将字段设置为 0,然后再递增。 redis.call("HINCRBY", key, cntKey, delta) return 1 else -- Key 不存在或缓存过期 return 0 end
由于用户对阅读量不敏感,当 redis 更新失败的时候,可以容忍这种 redis 和 数据库不一致。
go func (r *interactiveRepository) IncreaseReadCnt(ctx context.Context, biz string, bizId int64) error { if err := r.dao.IncreaseReadCnt(ctx, biz, bizId); err != nil { return err } return r.cache.IncreaseReadCntIfPresent(ctx, biz, bizId) }
如何解决
答案是:不需要解决。
- 从业务上来说,没有必要要求阅读数、点赞数或者收藏数量一定是严格准确的,因为即便不准确,也不会对用户 产生什么不好的影响。
- 更进一步说,只有高并发的文章才会有并发问题。而高并发的文章,阅读数、点赞数或者收藏数本身就很多,你少一点点无所谓。
- 而低并发的文章,极小概率会遇上这种不一致的场景。
操作成功
[GIN] 2025/03/05 - 21:36:25 | 200 | 169.841708ms | ::1 | GET "/pub/1"
2025-03-05T21:36:25.631+0800 DEBUG logger/zap_logger.go:16 %s %s
[%.3fms] [rows:%v] %s {"args": ["/Volumes/kioxia/Program/Webook/webook/internal/repository/dao/interactive.go:33","SLOW SQL >= 10ms",202.936792,1,"INSERT INTO `interactives` (`biz_id`,`biz`,`read_cnt`,`like_cnt`,`collect_cnt`,`ctime`,`utime`) VALUES (1,'article',1,0,0,1741181785427,1741181785427) ON DUPLICATE KEY UPDATE `read_cnt`=`read_cnt` + 1,`utime`=1741181785427"]}
点赞业务¶
点赞是与用户绑定的,而且点赞的 qps 要比阅读的 qps 低几个量级,在这里我们创建一张 UserLikBiz
数据表,来记录用户的点赞。
- 采用软删除,用
Status
作为标识位
¶
这种写法会在 eg.go 的大括号内部,新建立一个 article 的变量,而不是复用括号外面的 article,所以应该添加注释的内容。
所以完整的访问流程是:
1. 浏览器访问 http://localhost:3000/articles/view?id=1
2. 前端通过 API 调用 http://localhost:8080/articles/pub/1
3. 后端返回文章详情和互动数据
4. 前端渲染文章内容和互动按钮
分布式任务调度:热点榜单¶
热点模型¶
综合考虑用户的各种行为:阅读量,点赞,收藏
综合考虑时间的衰减特性:包括内容本身的 发布时间、用户点赞、收藏的时间
权重因子:网站有意识地控制某些内容是否是热点(降热搜等)
这里我们采用的热点模型为 Hacknews 模型
如何计算¶
榜单数据并不需要实时计算。
- 扫描全表,找出所有文章的点赞数和发表时间
- 计算每个文章的 score 并且全局排序
采用异步定时计算
- 每隔一段时间,就计算一次热榜。
- 在异步的情况下,计算的时间可以比较长,但是依旧不能太长。
用 小根堆 来维护 Score前 100 的文章。
本地缓存¶
引入本地缓存,采用 本地缓存 + redis 缓存。
-
查找的时候,先查找本地缓存,在查找 Redis
-
更新的时候,先更新本地缓存,再更新 Redis
func NewCompositeRankingCache(local *RankingLocalCache, redis *RankingRedisCache) RankingCache {
return &CompositeRankingCache{
local: local,
redis: redis,
}
}
func (c *CompositeRankingCache) GetTop100(ctx context.Context) ([]domain.Article, error) {
// 先尝试从本地缓存获取
arts, err := c.local.Get(ctx)
if err == nil {
return arts, nil
}
// 本地缓存失效,从Redis获取
arts, err = c.redis.Get(ctx)
if err != nil {
return nil, err
}
// 设置到本地缓存
_ = c.local.Set(ctx, arts)
return arts, nil
}
bug:撤销文章后,缓存未删除¶
修改 dao 中的查询语句,限定只能查询已经发表的文章。
// FindPublicById 从线上库获取文章,只能查询状态为 ArticleStatusPublished:2 的文章
func (dao *GormArticleDAO) FindPublicById(ctx context.Context, id int64) (PublishedArticle, error) {
var art PublishedArticle
err := dao.db.WithContext(ctx).Where("id = ? and status = ? ", id, 2).First(&art).Error
return art, err
}
修改 repository 中的语句,添加为查询到的判断
// 缓存未命中,从数据库中获取
artPublic_published, err := c.dao.FindPublicById(ctx, id)
if err == gorm.ErrRecordNotFound {
return domain.Article{}, errors.New("文章不存在或未发表")
}
if err != nil {
return domain.Article{}, err
}
在撤回文章时,修改状态完后,删除与之相关的所有缓存。
// 数据修改后删除缓存
defer func() {
if err := c.cache.DelFirstPage(ctx, art.Author.Id); err != nil {
c.logger.Error("SyncStatus Article 后删除缓存 FirstPage 失败",
logger.Int64("userId", art.Author.Id),
logger.Error(err),
)
}
if err := c.cache.DelPublic(ctx, art.Id); err != nil {
c.logger.Error("SyncStatus Article 后删除缓存 Public Article 失败",
logger.Int64("articleId", art.Id),
logger.Error(err),
)
}
if err := c.cache.Del(ctx, art.Id); err != nil {
c.logger.Error("SyncStatus Article 后删除缓存 article 失败",
logger.Int64("articleId", art.Id),
logger.Error(err),
)
}
}()
Created: February 24, 2025