一、安装

下载最新版本的服务器端

wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.6.1-linux-amd64.tar.gz

下载最新版本的influxDB客户端

wget https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.6.1-linux-amd64.tar.gz

解压

tar xvzf ./influxdb2-2.6.1-linux-amd64.tar.gz
tar xvzf influxdb2-client-2.6.1-linux-amd64.tar.gz

二、配置

首先要启动influxDB,才进行配置。

1、进入到服务器端的安装目录:cd influxdb2_linux_amd64/

2、后台启动:./influxd &

3、切换到客户端目录:cd ../influxdb2-client-2.6.1-linux-amd64/

4、使用客户端进行配置:./influx setup

主要是配置用户名和密码,配置好之后,你可以通过WEB页面登录进行更多的配置。默认配置文件在~/.influxdbv2/configs

5、登录http://ip:8086 进行进一步的配置,主要是配置API TOKEN这样可以在程序或者第三方应用(比如Grafana里面调用数据)

三、应用

1、首先要按需要在org下面建立不同的buckets,不同的buckets是有不同的数据保存策略。我这边按常规建立了3个buckets:

1)data_5min 精度最高的buckets,每5分钟存一次数据,但只保留一天的数据。程序一般就只向这个buckets写入数据

2)data_1hour 从data_5min里面取出每小时最大的数据存入这个buckets,只保留30天。这个buckets一般都是由task来写入数据。task取代了版本1里面的cq连续查询

3)data_1day 从data_1hour取1天的汇总数据存入这个buckets,永久保存。这个buckets一般也都是由task来写入数据。

2、其次编写task来降低采样精度即downsamples。一般都是从精度高的buckets里面取数存到精度低一档的buckets。这里给出一个示例:

option task = {name: "month_data", every: 1h}
// Defines a data source
from(bucket: "data_5min")
    |> range(start: -task.every)
    // Windows and aggregates the data in to 1h averages
    |> aggregateWindow(fn: max, every: 1h)
    // Stores the aggregated data in a new bucket
    |> to(bucket: "data_1hour", org: "gldx")

这个task每隔一个小时从data_5min取出数据,汇总之后存入data_1hour这个buckets。注意新建task的时候不要包括option task = {name: "month_data", every: 1h};这个由左边的参数自动生成。

3、在程序中向buckets写入数据

这里面一定要注意tag、field和measurement这几个概念。这里简单说一下,具体的请参考官网上的内容。tag是标签,每组数据会有多个tag,tag是由名字和值组成,主要是对这组数据的描述。注意tag是有索引的。field是具体的数据,也是由名字和值组成,但field是没有索引的。measurement相当于数据库里面表的概念,主要是用于数据分组。

这里以python为例:

 from influxdb_client.client.write_api import SYNCHRONOUS
 from influxdb_client import InfluxDBClient
 # api_tocken 是你在WEB管理页面建立的
 api_token = "11111111111111" 
 org = "gldx"
 bucket = "data_5min"
 #一次存入一组数据
 def save_data(tag_dict,field_dict,measurement):
     traffic_point = {"measurement": measurement, "tags": tag_dict, "fields": field_dict}
     try:
        write_client = InfluxDBClient(url="http://127.0.0.1:8086", token=api_token, org=org)
        write_api = write_client.write_api(write_options=SYNCHRONOUS)
        write_api.write(bucket, org, traffic_point)
        write_client.close()
     except Exception as ex:
        print(ex)
        return False
     return True
 
 #一次存批量数据
 def save_bulk_data(tag_dict_list,field_dict_list,measurement):
     traffic_point_list=[]
     for tag_dict,field_dict in zip(tag_dict_list,field_dict_list)
         traffic_point = {"measurement": measurement, "tags": tag_dict, "fields": field_dict}
         traffic_point_list.append(traffic_point)
     try:
        write_client = InfluxDBClient(url="http://127.0.0.1:8086", token=api_token, org=org)
        write_api = write_client.write_api(write_options=SYNCHRONOUS)
        write_api.write(bucket, org, traffic_point_list)
        write_client.close()
     except Exception as ex:
        print(ex)
        return False
     return True
     
 def test_write():
     tag_dict = {"port": "port1", "region": "aaa", "school": "bbb"}
     field_dict = {"in": 1000, "out": 2000}
     measurement = "edu_net"
     save_data(tag_dict,field_dict,measurement)

4、几点体会

1)如果要使用表格显示数据,一定要在flux脚本最后加上group(),否则表格显示有问题

例如:

   from(bucket: "data_5min")
      |> range(start: -5m)
      |> filter(fn: (r) => r["_measurement"] == "edu_net")
      |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
      |> group()

2)influxDB的dashboard虽然也可以用,但功能稍微有点弱,最好使用Grafana的dashboard来显示图和表格

Logo

尧米是由西云算力与CSDN联合运营的AI算力和模型开源社区品牌,为基于DaModel智算平台的AI应用企业和泛AI开发者提供技术交流与成果转化平台。

更多推荐