假设有一个这样的场景
1:调用方不关注接口响应内容
2:接口响应比较慢,会阻塞服务
典型的场景可能就是注册账号的时候邮件发送了,这种情况下一般都不会等邮件是否发送成功,因为都会提供一个重新发送的功能,而由于发送邮件一般比较耗时,所以一般都做类似任务队列的形式来实现,而现在我们就在真实接口的上游做一个有点任务队列功能的http代理服务
这个服务主要需要考虑以下几点
1:限流,限制向下游的并发以防止将下游打跨
2:队列长度,防止自己无限接受请求,把自己搞死
代码示例如下:
package main
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/gin-gonic/gin"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var (
ProjectName string = "proxy" //项目名称
Version string = "0.1" //版本
Port int = 1215 //监听端口
Concurrency int = 200 //向下游的并发数
ConcurrencyChan chan int //控制下游的并发chan
Timeout int = 2000 //下游超时时间
BufferSize int = 10000 //任务队列最大长度,超过则丢弃
RequestContent chan interface{} //任务队列chan
Upstream string = "http://www.xtgxiso.com" //下游地址
)
//任务结构,代表一个请求
type Tast struct {
URL string
Method string
Header interface{}
Body []byte
}
func main() {
viper.AutomaticEnv()
var rootCmd = &cobra.Command{
Use: ProjectName,
Short: fmt.Sprintf("%s %s", ProjectName, Version),
Run: run,
PreRunE: preRunE,
}
rootCmd.Flags().Int("port", 1215, "Listen port")
viper.BindPFlag("port", rootCmd.Flags().Lookup("port"))
rootCmd.Flags().Int("concurrency", 200, "Concurrency")
viper.BindPFlag("concurrency", rootCmd.Flags().Lookup("concurrency"))
rootCmd.Flags().Int("timeout", 2000, "timeout")
viper.BindPFlag("timeout", rootCmd.Flags().Lookup("timeout"))
rootCmd.Flags().Int("buffersize", 10000, "buffersize")
viper.BindPFlag("buffersize", rootCmd.Flags().Lookup("buffersize"))
rootCmd.Flags().String("upstream", "http://www.xtgxiso.com", "upstream")
viper.BindPFlag("upstream", rootCmd.Flags().Lookup("upstream"))
rootCmd.Execute()
}
func run(cmd *cobra.Command, args []string) {
r := gin.Default()
r.NoRoute(product)
fmt.Printf("%s %s Running on %d\n", ProjectName, Version, Port)
fmt.Printf("Concurrency:%d \n", Concurrency)
fmt.Printf("Timeout:%d \n", Timeout)
fmt.Printf("BufferSize:%d \n", BufferSize)
fmt.Printf("Upstream:%s \n", Upstream)
r.Run(fmt.Sprintf(":%d", Port))
}
func preRunE(cmd *cobra.Command, args []string) error {
Port = viper.GetInt("port")
Concurrency = viper.GetInt("concurrency")
Timeout = viper.GetInt("timeout")
BufferSize = viper.GetInt("bufferSize")
Upstream = viper.GetString("upstream")
RequestContent = make(chan interface{}, BufferSize)
go consume()
ConcurrencyChan = make(chan int, Concurrency)
return nil
}
//任务生产者
func product(c *gin.Context) {
body, _ := ioutil.ReadAll(c.Request.Body)
s := Tast{
URL: Upstream + c.Request.URL.RequestURI(),
Method: string(c.Request.Method),
Header: c.Request.Header,
Body: body,
}
select {
case RequestContent <- s:
c.String(http.StatusOK, "OK")
case <-time.After(time.Millisecond * 10):
c.String(http.StatusOK, "timeout")
}
}
//任务消费者
func consume() {
for {
select {
case ConcurrencyChan <- 1:
r := <-RequestContent
go request(r)
case <-time.After(time.Millisecond * 1000):
fmt.Printf("写入ConcurrencyChan超时")
}
}
}
//向下游发请求
func request(r interface{}) {
c := r.(Tast)
remote, _ := url.Parse(c.URL)
Body := ioutil.NopCloser(bytes.NewReader(c.Body))
req := &http.Request{
URL: remote,
Method: c.Method,
Header: c.Header.(http.Header),
Body: Body,
}
client := &http.Client{
Timeout: time.Duration(Timeout) * time.Millisecond,
}
client.Do(req)
/*调试用,正常不用等请求
var (
resp *http.Response
err error
body []byte
)
resp, err = client.Do(req)
if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
}
fmt.Println(string(body))
*/
<-ConcurrencyChan
}
在这个项目中,涉及到如下知识点,大家可以搜索相关键词来学习
1:通过chan来控制并发数
2 :github.com/gin-gonic/gin 一个不错的http api 框架开发库
3:github.com/spf13/viper 一个不错的应用配置库,支持多种格式配置文件,环境变量读取,命令行参数读取等功能
4:github.com/spf13/cobra 一个用来创建强大的现代CLI命令行的golang库,也是一个生成程序应用和命令行文件的程序
