Go如何优雅地错误处理(Error Handling and Go 1)

Go的错误处理一直被吐槽太繁琐, 作为主要用GO的攻城狮, 经常写 if err!=nil, 但是如果想偷懒, 少带了上下文信息, 直接写 if err!=nil { return err} 或者 fmt.Errorf 携带的上下文信息太少了的话, 看到错误日志也会一脸懵逼, 难以定位问题.
官方在 2011 年就发过一篇博客教大家如何在Go中处理error https://blog.golang.org/error-handling-and-go , error 是一个内建的 interface, 鼓励大家用好自定义错误类型, 常用的范式有三种:

  • 一是用 errors.New(str string) 定义错误常量, 让调用方去判断返回的 err 是否等于这个常量, 来进行区分处理;
  • 二是用 fmt.Errorf(fmt string, args... interface{}) 增加一些上下文信息, 用文字的方式告诉调用方哪里出错了, 让调用方打错误日志出来;
  • 三是自定义 struct type , 实现 error 接口, 调用方用类型断言转成特定的 struct type , 拿到更结构化的错误信息.

我最开始最常用的做法是, fmt.Errorf 时写上 此函数函数名、调用出错的函数名、参数是什么、err , 代码十分啰嗦, 而且通常打日志是在上层函数打的, 看到错误日志还需要用函数名去代码中搜索看看在哪里出错. 业务代码调用层级一多,非常麻烦. 很多情况下我既想带上下文信息, 又想在上层调用方取得最里层出错的函数返回的error常量或自定义的 struct type, 最好还能自动带上行号函数名信息, 减少每次写 fmt.Errof 的手动写上函数名的痛苦. 于是开始在 github 找包, star 数最高的是 pkg/errorsjuju/errors.

  • pkg/errors 解决了一些问题, 核心函数是 Wrapf 和 Cause: Wrapf包装错误附加上下文信息并带上调用栈, 但是每次去包装错误的时候都去取一次调用栈, 完全没有必要啊, 因为最早出错的函数里就能拿到完整的调用栈的, 并且调用栈打出来的信息也不好看, 而且通常HTTP服务会用框架, 用了框架的话调用栈就会肿起来, 这些框架的固定调用栈信息打印出来毫无帮助. Cause 去递归拿到最里层的 error, 用于和error常量比较或类型断言成自定义 struct type.
// Wrapf returns an error annotating err with a stack trace
// at the point Wrapf is call, and the format specifier.
// If err is nil, Wrapf returns nil.
func Wrapf(err error, format string, args ...interface{}) error {
if err == nil {
return nil
}
err = &withMessage{
cause: err,
msg: fmt.Sprintf(format, args...),
}
return &withStack{
err,
callers(),
}
}
// Cause returns the underlying cause of the error, if possible.
// An error value has a cause if it implements the following
// interface:
//
// type causer interface {
// Cause() error
// }
//
// If the error does not implement Cause, the original error will
// be returned. If the error is nil, nil will be returned without further
// investigation.
func Cause(err error) error {
type causer interface {
Cause() error
}
for err != nil {
cause, ok := err.(causer)
if !ok {
break
}
err = cause.Cause()
}
return err
}
  • juju/errors API非常复杂, 包装的error的函数就有三个 func Annotatef(other error, format string, args ...interface{}) errorfunc Maskf(other error, format string, args ...interface{}) errorfunc Wrapf(other, newDescriptive error, format string, args ...interface{}) error … , 每次包装时都会SetLocation, 消耗更大, 即时有时不需要打印error string 只需要判断, 它也去用runtime.Caller去拿文件名, 行号; 调用栈打出来的信息也不好看.
// SetLocation records the source location of the error at callDepth stack
// frames above the call.
func (e *Err) SetLocation(callDepth int) {
_, file, line, _ := runtime.Caller(callDepth + 1)
e.file = trimGoPath(file)
e.line = line
}

以上包不满足要求, 只能造轮子了. 两个思想. API要设计的简单, 调用栈要好看 https://github.com/hanjm/errors

  • API简单: 定义error常量只有 errors.New 函数, 兼容标准库的函数; 包装error的只有 errors.Errorf 函数, 只在最早出错的时候取调用栈, 调用方再包装时无需取调用栈, 此时只需要pc, 不需要这时就把文件名行号取出来; 取最里层的 error 只有 errors.GetInnerMost, 用于和 error 常量比较或类型断言成自定义 struct type分类处理.
  • 调用栈好看: 去掉标准包的调用栈, 去掉框架固定的调用栈信息(通常是github.com的包), 只保留业务逻辑的调用栈. 按[ 文件名:行号 函数名:message]分行格式化输出, 把调用栈和附加的message对应起来. (第一版格式是[文件名:行号 函数名:message], 没有空格, 后面有个同事说在Goland IDE里看panic信息时可以点击定位到源码, 你的包能不能加这个功能, 所以去研究了下, 写了几个print的demo试了下发现如果输出中的文件名前后带空格的话, intellij IDE会自动识别输出中的文件名变成超链接, 所以给 “文件名:行号” 前后加了空格, 就能在IDE中直接点击定位到源码对应的行, 非常地方便, 感谢这位同事)

在IDE中加个live template, 写errf回车就补全到

if err!=nil {
err = errors.Errorf(err,"{{光标}}")
return
}

然后补充必要的注释和参数就行了, 在本地环境调试时看到错误日志点击就可以定位到源码, 在非本地环境跑看到错误日志相比之前也能更好地知道发生了什么, 复制文件名:行号到IDE中就能定位到源码, 大大减轻了错误处理的繁琐.

分享到 评论

深入理解NATS & NATS Streaming (踩坑记)

简介

NATS Server是一个高性能的, cloud native的, 基于发布订阅机制的消息系统, 没有消息持久化功能.
NATS Streaming Server是基于NATS Server的, 增加消息持久化功能的消息系统.

NATS Streaming 持久化特性踩坑记

官网的文档并不详细, 很多重要的技术细节没说, 看了官网的文档之后发现用法很简单, 然后直接去写代码, 写publisher代码没什么问题, 写subscriber代码也能正常工作. 但是subscriber一重启, 重启后重启期间publisher发的消息不会继续收到, 说好的持久化呢? 我把官网的文档翻了遍也没找到答案. 最后在项目的readme.md中找到了答案: 要让subscriber重启后能继续收到重启期间发过来的消息且不重复消息, 必须在调用Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)订阅时设置一样的durableName, 且重启后连接时Connect(stanClusterID, clientID string, options ...Option) (Conn, error)ClusterID、clientID不能变.

要想理解NATS和NATS Streaming的特性, server和client的readme文档都需要仔细阅读, 特别是nats-streaming服务端的readme. 代码也值得阅读研究.

重要特性说明

  1. 当subject没有被订阅时, 消息会被直接丢弃, 所以重启订阅者会丢消息, 解决办法: 要么开2个以上客户端实例, 组成队列订阅QueueSubscribe, 要么换NATS Streaming.
  2. clientID和durableName对于NATS Streaming非常重要. 要让subscriber重启后能继续收到重启期间发过来的消息且不重复消息, 必须在调用Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)订阅时设置一样的durableName, 调用Connect(stanClusterID, clientID string, options ...Option) (Conn, error)连接时ClusterID、clientID不能变. 程序关闭时应该使用Close而不是Unsubscribe, Unsubscribe()会删除在server端删除该持久化订阅.

    This client ID links a given connection to its published messages, subscriptions, especially durable subscriptions. Indeed, durable subscriptions are stored as a combination of the client ID and durable name.
    If an application wishes to resume message consumption from where it previously stopped, it needs to create a durable subscription. It does so by providing a durable name, which is combined with the client ID provided when the client created its connection. The server then maintain the state for this subscription even after the client connection is closed.
    Note: The starting position given by the client when restarting a durable subscription is ignored.
    When the application wants to stop receiving messages on a durable subscription, it should close - but not unsubscribe- this subscription. If a given client library does not have the option to close a subscription, the application should close the connection instead.
    When the application wants to delete the subscription, it must unsubscribe it. Once unsubscribed, the state is removed and it is then possible to re-use the durable name, but it will be considered a brand new durable subscription, with the start position being the one given by the client when creating the durable subscription.

  1. NATS连接时可以设置客户端的名字, 这样在monitor界面中的/connz就能方便地看到各个客户端的统计数据.

    // Options that can be passed to Connect. // Name is an Option to set the client name. func Name(name string) Option {
    return func(o *Options) error {
    o.Name = name
    return nil
    }
    }
    type ConnInfo struct {
    Cid uint64 `json:"cid"`
    IP string `json:"ip"`
    Port int `json:"port"`
    Start time.Time `json:"start"`
    LastActivity time.Time `json:"last_activity"`
    Uptime string `json:"uptime"`
    Idle string `json:"idle"`
    Pending int `json:"pending_bytes"`
    InMsgs int64 `json:"in_msgs"`
    OutMsgs int64 `json:"out_msgs"`
    InBytes int64 `json:"in_bytes"`
    OutBytes int64 `json:"out_bytes"`
    NumSubs uint32 `json:"subscriptions"`
    Name string `json:"name,omitempty"`
    Lang string `json:"lang,omitempty"`
    Version string `json:"version,omitempty"`
    TLSVersion string `json:"tls_version,omitempty"`
    TLSCipher string `json:"tls_cipher_suite,omitempty"`
    AuthorizedUser string `json:"authorized_user,omitempty"`
    Subs []string `json:"subscriptions_list,omitempty"`
    }
  2. 使用.来分隔subject的级别. NATS允许subject包含斜杠/符号, 但NATS Streaming不允许, 因为NATS Streaming持久化时会使用subject名字来作为文件夹名,

    • NATS的subject可以为任意不为空的字符串, 具体的subject不能包含通配符’*’和’>’.
    • NATS Streaming的subject不能为空, 首尾不能为点’.’, 不能包含两个连续的点’.’, 由于暂时不支持通配符订阅功能, 所以不能包含’*’和’>’.
  3. NATS Streaming Server实际上是内嵌了一个NATS Server, 自己作为NATS的客户端. NATS Streaming的客户端实际上没有和NATS Streaming Server直接连接, 而是连接内嵌的NATS Server, NATS Streaming Server通过订阅客户端的心跳来知道NATS Streaming客户端连接有没有断开. 所以它强烈建议客户端退出程序时主动Close.
  4. NATS可以热重新加载配置, 发送SIGHUP信号或gnatsd -sl reload即可.
  5. 开发环境可以加-V参数了解NATS, 生产环境就没必要了, 否则会把发过来的消息全打在日志里.
  6. 你甚至可以用NATS的client包publish消息到NATS Streaming, NATS的client可以subscribe, 但NATS Streaming的client无法subscribe, 因为内部的subject变了. 最好不用混用, 容易出问题.
  7. NATS Streaming客户端连接时提供的ClusterID和服务端启动配置的ClusterID不一致时会报, 有人表示费解吐槽过, https://github.com/nats-io/nats-streaming-server/issues/309, 但官方解释说没有问题, Timeout也说的通.

    If you provide a cluster ID not used by any of the servers in the network, no server will respond to the client, hence the timeout error message from the client library. If anything, this is an error message that needs to be updated in the client libraries, not in the server.

  8. ChanSubscribe方式的客户端优雅关闭, 等待消息处理完成.

    package main
    import (
    "fmt"
    "os" "syscall" "os/signal" "github.com/nats-io/go-nats" "sync" )
    func main() {
    n, err := nats.Connect("nats://127.0.0.1:7222",
    nats.Name("test_client"),
    nats.UserInfo("", ""))
    if err != nil {
    panic(err)
    }
    subject := "test"
    msgCh := make(chan *nats.Msg, nats.DefaultMaxChanLen)
    _, err = n.ChanSubscribe(subject, msgCh)
    if err != nil {
    panic(err)
    }
    wg := sync.WaitGroup{}
    for i := 0; i < 2; i++ {
    wg.Add(1)
    go func() {
    defer wg.Done()
    // msg handler
    for msg := range msgCh {
    fmt.Printf("%s\n", msg.Data)
    }
    }()
    }
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGQUIT,
    syscall.SIGTERM,
    syscall.SIGINT,
    syscall.SIGUSR1,
    syscall.SIGUSR2)
    select {
    case <-quit:
    defer wg.Wait()
    // close msgCh and wait process ok
    close(msgCh)
    n.Flush()
    n.Close()
    }
    }

NATS代码中的技巧

  1. 很有用的Go风格的可选参数设计模式, 很多地方见过.

    // Option is a function on the options for a connection.
    type Option func(*Options) error
    // Options can be used to create a customized connection.
    type Options struct {
    Url string
    ...
    User string
    Password string
    }
    var DefaultOptions = Options{
    AllowReconnect: true,
    MaxReconnect: DefaultMaxReconnect,
    ReconnectWait: DefaultReconnectWait,
    Timeout: DefaultTimeout,
    PingInterval: DefaultPingInterval,
    MaxPingsOut: DefaultMaxPingOut,
    SubChanLen: DefaultMaxChanLen,
    ReconnectBufSize: DefaultReconnectBufSize,
    Dialer: &net.Dialer{
    Timeout: DefaultTimeout,
    }, }
    // Connect will attempt to connect to the NATS system.
    // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
    // Comma separated arrays are also supported, e.g. urlA, urlB.
    // Options start with the defaults but can be overridden.
    func Connect(url string, options ...Option) (*Conn, error) {
    opts := DefaultOptions
    opts.Servers = processUrlString(url)
    for _, opt := range options {
    if err := opt(&opts); err != nil {
    return nil, err
    }
    }
    return opts.Connect()
    }
    // Options that can be passed to Connect. // Name is an Option to set the client name. func Name(name string) Option {
    return func(o *Options) error {
    o.Name = name
    return nil
    }
    }
  2. 使用ringBuffer限制消息数量

    You can view a message log as a ring buffer. Messages are appended to the end of the log. If a limit is set globally for all channels, or specifically for this channel, when the limit is reached, older messages are removed to make room for the new ones.
    
  3. 用reflect来绑定任意类型的chan

    chVal := reflect.ValueOf(channel)
    if chVal.Kind() != reflect.Chan {
    return ErrChanArg
    }
    val, ok := chVal.Recv()
    if !ok {
    // Channel has most likely been closed.
    return
    }

TODO cluster研究

分享到 评论