当前位置: 首页 > news >正文

golang连接influxdb的orm操作

1. 安装influxdb-client-go包

go get github.com/influxdata/influxdb-client-go/v2

2. 连接influxdb


type InfluxDBCon struct {client      influxdb2.Client // 客户端连接writeClient api.WriteAPI // 写对象queryClient api.QueryAPI // 读对象deleteAPI   api.DeleteAPI // 删除org         string // 组织bucket      string // 桶
}// 建立连接
func NewInfluxDBCon(url, token, org, bucket string) *InfluxDBCon {client := influxdb2.NewClientWithOptions(url, token,influxdb2.DefaultOptions().SetBatchSize(20))return &InfluxDBCon{client:      client,writeClient: client.WriteAPI(org, bucket),queryClient: client.QueryAPI(org),deleteAPI:   client.DeleteAPI(),org:         org,bucket:      bucket,}
}// 关闭连接
func (c *InfluxDBCon) Close() {c.client.Close()
}func (c *InfluxDBCon) WriteClient() api.WriteAPI {return c.writeClient
}func (c *InfluxDBCon) QueryClient() api.QueryAPI {return c.queryClient
}

3. 增删读改


func (c *InfluxDBCon) WritePoint(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) {p := influxdb2.NewPoint(measurement, tags, fields, ts)c.writeClient.WritePoint(p)c.writeClient.Flush()
}func (c *InfluxDBCon) Insert(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) error {c.WritePoint(measurement, tags, fields, ts)return nil
}func (c *InfluxDBCon) DeletePoints(measurement string, start, stop time.Time, tagEq map[string]string, extraPredicate string) error {predicate := buildDeletePredicate(measurement, tagEq, extraPredicate)// 直接用名字版本,避免去查 domain 对象return c.deleteAPI.DeleteWithName(context.Background(), c.org, c.bucket, start, stop, predicate)
}// 注意:Delete 谓词只支持按 tag / _measurement(不支持 field 值)
// 参考官方说明。:contentReference[oaicite:1]{index=1}
func buildDeletePredicate(measurement string, tagEq map[string]string, extra string) string {parts := []string{fmt.Sprintf(`_measurement="%s"`, escapePredicate(measurement))}for k, v := range tagEq {parts = append(parts, fmt.Sprintf(`%s="%s"`, k, escapePredicate(v)))}if s := strings.TrimSpace(extra); s != "" {parts = append(parts, fmt.Sprintf("(%s)", s))}return strings.Join(parts, " and ")
}func escapePredicate(s string) string {s = strings.ReplaceAll(s, `\`, `\\`)s = strings.ReplaceAll(s, `"`, `\"`)return s
}func (c *InfluxDBCon) QueryOne(flux string) (map[string]interface{}, error) {res, err := c.queryClient.Query(context.Background(), flux)if err != nil {return nil, err}defer res.Close()if res.Next() {record := res.Record()data := map[string]interface{}{"time":  record.Time(),"value": record.Value(),}for k, v := range record.Values() {data[k] = v}return data, nil}return nil, fmt.Errorf("no record found")
}func (c *InfluxDBCon) Update(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) error {// 用极窄时间窗 + 完整 tags 来删除旧点(避免误删)start := ts.Add(-1 * time.Second)stop := ts.Add(1 * time.Second)if err := c.DeletePoints(measurement, start, stop, tags, ""); err != nil {return err}// 再写入新点p := influxdb2.NewPoint(measurement, tags, fields, ts)c.writeClient.WritePoint(p)c.writeClient.Flush()return nil
}// ==== 统计功能 ====func (c *InfluxDBCon) Count(measurement string, start time.Duration) (int64, error) {flux := fmt.Sprintf(`from(bucket:"%s")|> range(start: -%s)|> filter(fn: (r) => r._measurement == "%s")|> count()`, c.bucket, start.String(), measurement)res, err := c.queryClient.Query(context.Background(), flux)if err != nil {return 0, err}defer res.Close()if res.Next() {if v, ok := res.Record().Value().(int64); ok {return v, nil}}return 0, nil
}

4. 单元测试


// 初始化测试客户端
func initTestClient() *InfluxDBCon {token := "PjpqAXwbXwizAFieHJekR2tZUf3djVMuAcFpS1mbSjyi80NRWG11hqlc4ZPDs0SLh7oIxEZnufWLzZueERVHkw=="url := "http://192.168.20.211:8086"org := "operate"bucket := "mybuk"return NewInfluxDBCon(url, token, org, bucket)
}func TestInfluxDBCon(t *testing.T) {client := initTestClient()defer client.Close()measurement := "system_test"tags := map[string]string{"id":     "rack_1","vendor": "AWS",}fields := map[string]interface{}{"temperature": rand.Float64() * 80,"disk_free":   rand.Float64() * 1000,}now := time.Now()// 1. Inserterr := client.Insert(measurement, tags, fields, now)if err != nil {t.Fatalf("insert error: %v", err)}t.Log("insert ok")time.Sleep(1 * time.Second) // 写入的数据,落盘需要时间// 2. QueryOneflux := fmt.Sprintf(`from(bucket:"%s")|> range(start: -1h)|> filter(fn: (r) => r._measurement == "%s")|> limit(n:1)`, client.bucket, measurement)record, err := client.QueryOne(flux)if err != nil {t.Fatalf("query one error: %v", err)}t.Logf("query one result: %+v", record)// 3. Updatefields["temperature"] = 66.6err = client.Update(measurement, tags, fields, now)if err != nil {t.Fatalf("update error: %v", err)}t.Log("update ok")// 4. Countcount, err := client.Count(measurement, time.Hour)if err != nil {t.Fatalf("count error: %v", err)}t.Logf("count result: %d", count)// 8. Deleteerr = client.DeletePoints(measurement,now.Add(-1*time.Minute),now.Add(1*time.Minute),map[string]string{"id": "rack_1", "vendor": "AWS"},"",)if err != nil {t.Fatalf("delete error: %v", err)}t.Log("delete ok")
}


文章转载自:

http://PECBgiuR.mftzm.cn
http://onc5xiz7.mftzm.cn
http://QEb5gcRZ.mftzm.cn
http://6LfEUjCq.mftzm.cn
http://sYur7voL.mftzm.cn
http://hFZBfakr.mftzm.cn
http://Go54TMYL.mftzm.cn
http://a1hA7CAR.mftzm.cn
http://tGfuPttV.mftzm.cn
http://bnAqoSQD.mftzm.cn
http://qgiI4iyr.mftzm.cn
http://kFvbL6ga.mftzm.cn
http://IKgMuTq3.mftzm.cn
http://FmoaYoIt.mftzm.cn
http://VwlXRVuR.mftzm.cn
http://5PfAzYId.mftzm.cn
http://N4HN5Iit.mftzm.cn
http://q9hMrFy7.mftzm.cn
http://aApd5OXx.mftzm.cn
http://bO6CpO4I.mftzm.cn
http://I0GBk1Bc.mftzm.cn
http://rkD5KNfL.mftzm.cn
http://LWTgeZLj.mftzm.cn
http://jgGL9nk4.mftzm.cn
http://s2tkEEiS.mftzm.cn
http://WoLId0ni.mftzm.cn
http://rZM0Pkof.mftzm.cn
http://dzrzDnh2.mftzm.cn
http://Yw8WXl1v.mftzm.cn
http://QaGnaQ2D.mftzm.cn
http://www.dtcms.com/a/370823.html

相关文章:

  • C#中一段程序类比博图
  • rh134第三章复习总结
  • Spring的事件监听机制(一)
  • 【鸿蒙 NEXT】V1迁移V2状态管理
  • FRCNet
  • git 冲突,Merge
  • NAND Flash块擦除与数据状态解析
  • 分享一个基于Python+大数据的房地产一手房成交数据关联分析与可视化系统,基于机器学习的深圳房产价格走势分析与预测系统
  • 超文本的定义
  • LeetCode 2461.长度为K子数组中的最大和
  • 【机器学习入门】6.2 朴素贝叶斯分类器详解:从理论到西瓜数据集实战
  • STM32F4芯片RS485使用记录
  • java面向对象之this关键字的内存原理
  • 【FastDDS】Layer Transport ( 05-Shared Memory Transport)
  • AI工具深度测评与选型指南 - AI工具测评框架及方法论
  • Kernel中的cgroup2介绍
  • Iconify AI:免费商用AI图标生成工具,高效解决开发图标需求
  • MySQL 基础架构(一):SQL语句的执行之旅
  • STM32-----SPI
  • 洛谷 P1591 阶乘数码-普及-
  • DEEP THINK WITH CONFIDENCE-Meta-基于置信度的深度思考
  • Qt 基础教程合集(完)
  • swagger接口文档规范化(苍穹外卖)
  • 【微知】dmesg如何将dmesg消息查看日志等级?(dmesg -x; prefix)
  • 基于STM32智能阳台监控系统
  • Ubuntu 22.04.1上安装MySQL 8.0及设置root密码
  • 【混元AIGC+腾讯云智能体+首创Coze核心流思维导图MCP】:打造一个文思通-智能写作助手Agent
  • B.50.10.09-RPC核心原理与电商应用
  • C语言字符函数和字符串函数(2)
  • 基于STM32的智慧民宿环境监测系统设计