我们可以很简单实现HTTP流式数据返回。

框架版本 < v2.4

如果当前使用的框架版本小于v2.4正式版(不是beta版本),使用以下方式返回(标准库原生写法)。

package main

import (
	"fmt"
	"net/http"
	"time"

	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/ghttp"
)

func main() {
	s := g.Server()
	s.BindHandler("/", func(r *ghttp.Request) {
		rw := r.Response.RawWriter()
		flusher, ok := rw.(http.Flusher)
		if !ok {
			http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
			return
		}
		r.Response.Header().Set("Content-Type", "text/event-stream")
		r.Response.Header().Set("Cache-Control", "no-cache")
		r.Response.Header().Set("Connection", "keep-alive")

		for i := 0; i < 100; i++ {
			_, err := fmt.Fprintf(rw, "data: %d\n", i)
			if err != nil {
				panic(err)
			}
			flusher.Flush()
			time.Sleep(time.Millisecond * 200)
		}
	})
	s.SetPort(8999)
	s.Run()
}

框架版本 >= v2.4

由于以上操作有点繁琐,因此在该版本以后做了一些操作上的改进,如果当前使用的框架版本在v2.4正式版(不是beta版本)以上,那么可以使用以下方式快速实现流式数据返回。

package main

import (
	"time"

	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/ghttp"
)

func main() {
	s := g.Server()
	s.BindHandler("/", func(r *ghttp.Request) {
		r.Response.Header().Set("Content-Type", "text/event-stream")
		r.Response.Header().Set("Cache-Control", "no-cache")
		r.Response.Header().Set("Connection", "keep-alive")

		for i := 0; i < 100; i++ {
			r.Response.Writefln("data: %d", i)
			r.Response.Flush()
			time.Sleep(time.Millisecond * 200)
		}
	})
	s.SetPort(8999)
	s.Run()
}

示例结果

执行后访问 http://127.0.0.1:8999/ 可以看到数据通过流式方式不断地将数据返回给调用端。

注意事项

  • 如果是在控制器中使用,Request对象的获取可以通过g.RequestFromCtx(ctx)方法。
  • 如果有前置统一输入输出处理的中间件,请将该方法放置于中间件作用域之外,或者使用r.ExitAll()方法跳出中间件控制。


资料

Server-Sent Events (SSE)




Content Menu

  • No labels

3 Comments

  1. 郭强  2.4 之后改进后的代码应该是这样

    1. 每一个消息块前面,要加 "data: "前缀
    2. 每一个消息块后面,要用两个换行(\n) 结束
    package main
    
    import (
    	"time"
    
    	"github.com/gogf/gf/v2/encoding/gjson"
    	"github.com/gogf/gf/v2/frame/g"
    	"github.com/gogf/gf/v2/net/ghttp"
    	"github.com/gogf/gf/v2/util/gconv"
    )
    
    type TestMessage struct {
    	Name  string
    	Value string
    }
    
    func main() {
    	s := g.Server()
    	s.BindHandler("/stream", func(r *ghttp.Request) {
    		r.Response.Header().Set("Content-Type", "text/event-stream")
    		r.Response.Header().Set("Cache-Control", "no-cache")
    		r.Response.Header().Set("Connection", "keep-alive")
    
    		for i := 0; i < 100; i++ {
    			r.Response.Writefln("data: " + gjson.MustEncodeString(TestMessage{
    				Name:  "Name" + gconv.String(i),
    				Value: "Value" + gconv.String(i),
    			}) + "\n")
    			r.Response.Flush()
    			time.Sleep(time.Millisecond * 200)
    		}
    	})
    	s.SetPort(8999)
    	s.Run()
    }
    
    
  2. 2.4版本,如果有前置统一输入输出处理的中间件,请将该方法放置于中间件作用域之外,或者使用r.ExitAll()方法跳出中间件控制。

    • 放置于中间作用域之外,可以正常流式响应 
    • 放置于中间作用域
      • 不使用r.ExitAll() ,流式响应生效,但是最终会返回统一结构体 

        data: 97 data: 98 data: 99 {"code":0,"message":"","data":null}
      • 使用r.ExitAll()或者r.ExitA() 不能使用流式响应,直接返回统一响应,