假设有一个这样的场景
1:调用方不关注接口响应内容
2:接口响应比较慢,会阻塞服务
典型的场景可能就是注册账号的时候邮件发送了,这种情况下一般都不会等邮件是否发送成功,因为都会提供一个重新发送的功能,而由于发送邮件一般比较耗时,所以一般都做类似任务队列的形式来实现,而现在我们就在真实接口的上游做一个有点任务队列功能的http代理服务
这个服务主要需要考虑以下几点
1:限流,限制向下游的并发以防止将下游打跨
2:队列长度,防止自己无限接受请求,把自己搞死
代码示例如下:
package main import ( "bytes" "fmt" "io/ioutil" "net/http" "net/url" "time" "github.com/gin-gonic/gin" "github.com/spf13/cobra" "github.com/spf13/viper" ) var ( ProjectName string = "proxy" //项目名称 Version string = "0.1" //版本 Port int = 1215 //监听端口 Concurrency int = 200 //向下游的并发数 ConcurrencyChan chan int //控制下游的并发chan Timeout int = 2000 //下游超时时间 BufferSize int = 10000 //任务队列最大长度,超过则丢弃 RequestContent chan interface{} //任务队列chan Upstream string = "http://www.xtgxiso.com" //下游地址 ) //任务结构,代表一个请求 type Tast struct { URL string Method string Header interface{} Body []byte } func main() { viper.AutomaticEnv() var rootCmd = &cobra.Command{ Use: ProjectName, Short: fmt.Sprintf("%s %s", ProjectName, Version), Run: run, PreRunE: preRunE, } rootCmd.Flags().Int("port", 1215, "Listen port") viper.BindPFlag("port", rootCmd.Flags().Lookup("port")) rootCmd.Flags().Int("concurrency", 200, "Concurrency") viper.BindPFlag("concurrency", rootCmd.Flags().Lookup("concurrency")) rootCmd.Flags().Int("timeout", 2000, "timeout") viper.BindPFlag("timeout", rootCmd.Flags().Lookup("timeout")) rootCmd.Flags().Int("buffersize", 10000, "buffersize") viper.BindPFlag("buffersize", rootCmd.Flags().Lookup("buffersize")) rootCmd.Flags().String("upstream", "http://www.xtgxiso.com", "upstream") viper.BindPFlag("upstream", rootCmd.Flags().Lookup("upstream")) rootCmd.Execute() } func run(cmd *cobra.Command, args []string) { r := gin.Default() r.NoRoute(product) fmt.Printf("%s %s Running on %d\n", ProjectName, Version, Port) fmt.Printf("Concurrency:%d \n", Concurrency) fmt.Printf("Timeout:%d \n", Timeout) fmt.Printf("BufferSize:%d \n", BufferSize) fmt.Printf("Upstream:%s \n", Upstream) r.Run(fmt.Sprintf(":%d", Port)) } func preRunE(cmd *cobra.Command, args []string) error { Port = viper.GetInt("port") Concurrency = viper.GetInt("concurrency") Timeout = viper.GetInt("timeout") BufferSize = viper.GetInt("bufferSize") Upstream = viper.GetString("upstream") RequestContent = make(chan interface{}, BufferSize) go consume() ConcurrencyChan = make(chan int, Concurrency) return nil } //任务生产者 func product(c *gin.Context) { body, _ := ioutil.ReadAll(c.Request.Body) s := Tast{ URL: Upstream + c.Request.URL.RequestURI(), Method: string(c.Request.Method), Header: c.Request.Header, Body: body, } select { case RequestContent <- s: c.String(http.StatusOK, "OK") case <-time.After(time.Millisecond * 10): c.String(http.StatusOK, "timeout") } } //任务消费者 func consume() { for { select { case ConcurrencyChan <- 1: r := <-RequestContent go request(r) case <-time.After(time.Millisecond * 1000): fmt.Printf("写入ConcurrencyChan超时") } } } //向下游发请求 func request(r interface{}) { c := r.(Tast) remote, _ := url.Parse(c.URL) Body := ioutil.NopCloser(bytes.NewReader(c.Body)) req := &http.Request{ URL: remote, Method: c.Method, Header: c.Header.(http.Header), Body: Body, } client := &http.Client{ Timeout: time.Duration(Timeout) * time.Millisecond, } client.Do(req) /*调试用,正常不用等请求 var ( resp *http.Response err error body []byte ) resp, err = client.Do(req) if err != nil { fmt.Println(err) } defer resp.Body.Close() body, err = ioutil.ReadAll(resp.Body) if err != nil { fmt.Println(err) } fmt.Println(string(body)) */ <-ConcurrencyChan }
在这个项目中,涉及到如下知识点,大家可以搜索相关键词来学习
1:通过chan来控制并发数
2 :github.com/gin-gonic/gin 一个不错的http api 框架开发库
3:github.com/spf13/viper 一个不错的应用配置库,支持多种格式配置文件,环境变量读取,命令行参数读取等功能
4:github.com/spf13/cobra 一个用来创建强大的现代CLI命令行的golang库,也是一个生成程序应用和命令行文件的程序