如何在 Kubernetes 上使用 InfluxDB 及其 Python 客户端

导航至

Saiyam Pathak本文最初发表于 The New Stack,由 Saiyam Pathak 撰写,他是 InfluxAce 和 Civo Cloud 的技术布道总监。

在本教程中,我们将讨论 InfluxDB 及其 Python 客户端。我们将 在 Kubernetes 集群内部 部署 InfluxDB,然后使用 InfluxDB Python 客户端向 InfluxDB 发送数据。

您将学到什么?

  • 如何将 InfluxDB 部署到 Kubernetes 集群(快速入门方式)
  • 如何使用 InfluxDB Python 客户端

先决条件

  • 您控制的 Kubernetes 集群。我们将利用 Civo 超快的托管 K3s 服务 快速进行实验。如果您还没有帐户,请立即注册 beta 版,以享受快速部署时间和每月 70 美元的免费额度。或者,您也可以使用任何其他 Kubernetes 集群。
  • 安装并设置 kubectl,并下载集群的 kubeconfig 文件。

InfluxDB Python Kubernetes

通过运行以下命令确保您可以连接到您的 Kubernetes 集群

kubectl get nodes
NAME               STATUS   ROLES    AGE   VERSION
kube-master-18e1   Ready    master   8h    v1.18.6+k3s1
kube-node-4e70     Ready    <none>   8h    v1.18.6+k3s1
kube-node-d58a     Ready    <none>   8h    v1.18.6+k3s1

您应该看到集群中节点的名称显示出来。

开始使用 InfluxDB

克隆存储库:https://github.com/saiyam1814/pyconf.git

git clone https://github.com/saiyam1814/pyconf.git
cd pyconf/deploy
# Apply the influx.yaml file
kubectl create -f influx.yaml
namespace/influxdb created
statefulset.apps/influxdb created
service/influxdb created

上述脚本为 InfluxDB 2.0.1 版本创建了命名空间、StatefulSet 和服务。

现在我们将创建 ingress,为此,我们将修改 ing.yaml 文件并输入创建的集群的 DNS 名称。您可以从仪表板获取 DNS 名称。

influxdb create ingress

在 ing.yaml 的 host 部分,指向 influx.{DNS 名称}。

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
 name: influxdb
 namespace: influxdb
spec:
 rules:
 - host: influxdb.360dfac7-2adc-4e60-bdcb-0b7a283ac3c8.k8s.civo.com
   http:
     paths:
     - backend:
         serviceName: influxdb
         servicePort: 8086
kubectl create -f ing.yml
ingress.extensions/influxdb created
kubectl get all -n influxdb
NAME             READY   STATUS    RESTARTS   AGE
pod/influxdb-0   1/1     Running   0          6m45s
NAME               TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)    AGE
service/influxdb   ClusterIP   192.168.145.255   <none>        8086/TCP   6m45s
NAME                        READY   AGE
statefulset.apps/influxdb   1/1     6m46s

kubectl get ing -n influxdb
'NAME       CLASS    HOSTS                                                        ADDRESS         PORTS   AGE
influxdb   <none>   influxdb.360dfac7-2adc-4e60-bdcb-0b7a283ac3c8.k8s.civo.com   91.211.153.65   80      55s

导航到 HOST ADDRESS

influxdb 2 welcome page

influxdb setup initial user

influxdb collect data

现在,在 Token 的高级部分,选择并复制生成的令牌 — 因为这将是通过 Python 客户端连接到 InfluxDB 所必需的。

influxdb copy token

influxdb load data

下一步

从令牌创建 Kubernetes Secret

kubectl create secret generic influx --from-literal=token="qWpu90WO31r02miedaP7-BXG9hmrtfBCPgncqu3PsU-PzZZ3jrg7eC9RE2yZzLurhNlk8tr_maKYE3zpk1GJ2A=="
secret/influx created

为 Ingress HOST Address 创建配置映射

kubectl create configmap host --from-literal=host="influxdb.360dfac7-2adc-4e60-bdcb-0b7a283ac3c8.k8s.civo.com"
configmap/host created

从 Deploy 文件夹创建 Daemonset

kubectl create -f ds.yaml
daemonset.apps/pyconf-demo created
kubectl get pods
NAME                READY   STATUS    RESTARTS   AGE
pyconf-demo-dq5c9   1/1     Running   0          40m
pyconf-demo-lbxww   1/1     Running   0          40m
pyconf-demo-q65ps   1/1     Running   0          86s

从 InfluxDB UI 查看

viewing influxdb ui

了解通过 Python 客户端连接和写入数据点到 InfluxDB

from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
from influxdb_client.client.write_api import SYNCHRONOUS
import schedule
import time
import socket
import os
def influx():
   print("starting")
   org = "demo"
   bucket = "demo"
   client = InfluxDBClient(url="{}".format(os.environ.get('host')), token="{}".format(os.environ.get('token')))
   meminfo = dict((i.split()[0].rstrip(':'),int(i.split()[1])) for i in open('/proc/meminfo').readlines())
   freemem = meminfo['MemFree'] / 1024 /1024
   write_api = client.write_api(write_options=SYNCHRONOUS)
   point = Point("free_mem").tag("host", socket.gethostname()).field("free_memory_Gb", freemem ).time(datetime.utcnow(), WritePrecision.NS)
   write_api.write(bucket, org, point)
schedule.every(5).seconds.do(influx)  
while 1:
   schedule.run_pending()
   time.sleep(1)
  • 导入 influxdb-client
  • Org:对应于数据需要推送到的组织。
  • Bucket:对应于数据需要推送到的存储桶。
  • Client:通过提供主机和令牌创建的连接。
  • Freemem:获取主机可用内存(以 Gb 为单位)。
  • client.write_api 用于写入 InfluxDB。
  • Point:将创建 measurement free_mem、tag host 和 field free_mem_Gb 的实际点。
  • write_api.write(bucket,org,point) 用于插入到数据库。
  • 最后一部分使其每 5 秒运行一次。

用例

您可能有某些脚本会捕获一些数据,或进行一些级别的处理,然后创建数据,这些数据随后可以通过 Python 客户端发送到 InfluxDB(对于此特定用例)。有很多客户端库受支持,可以用来发送数据。

总结

我们已经了解了如何在现有的 K3s Kubernetes 集群上部署 InfluxDB。使用 Python 客户端将数据写入 InfluxDB,然后从 InfluxDB UI 查看数据。