InfluxDB
데이터 쓰기
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
# InfluxDB 설정
url = "http://influxdb주소" # influx 주소
token = "토큰" # 웹 UI에서 생성한 token
org = "조직" # 조직 이름
bucket = "버킷이름" # 버킷 이름(스키마)
measurement = '메저먼트이름' #measurement(테이블)
# 클라이언트 연결
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
# 데이터 포인트 작성
point = (
Point(measurement) # measurement
.tag("tag_id", "FLOW01") # 태그(원하는키,원하는값)
.tag("datatyp", "AI")
.tag("tag_group","RCS2")
.field("datavalue", 123.45) # 실제 값(원하는키,원하는값)
.time(datetime.now(timezone.utc)) # 현재 UTC 시간
)
# 데이터 기록
write_api.write(bucket=bucket, org=org, record=point)
print("✅ InfluxDB에 데이터 기록 완료")
데이터 읽기
# pip install influxdb_client
from influxdb_client import InfluxDBClient
# 설정
url = "http://influxdb주소" # influx 주소
token = "토큰" # 웹 UI에서 생성한 token
org = "조직" # 조직 이름
bucket = "버킷이름" # 버킷 이름(스키마)
measurement = '메저먼트이름' #measurement(테이블)
client = InfluxDBClient(url=url, token=token, org=org)
#조회시점:start
#조회시점으로부터 몇초전의 데이터를 조회하겠는가?-600초전
query = f'''
from(bucket: "{bucket}")
|> range(start: -600s)
|> filter(fn: (r) => r._measurement == "{measurement}")
'''
query_api = client.query_api()
result = query_api.query(org=org, query=query)
for table in result:
for record in table.records:
print(record)
# print(f"{record.get_time()} | {record.get_field()} = {record.get_value()} | tag_id={record['tag_id']}")
웹에서 읽기
반응형
'서버&백엔드 > 🗃️ DataBase' 카테고리의 다른 글
SQL 최적화 | 효율적인 PostgreSQL 색인 설계: 등치조건·범위조건·JSONB 활용까지 (0) | 2025.06.19 |
---|---|
InfluxDB | 설치 (0) | 2025.05.14 |
PostgreSQL | Record 타입 사용해보기 (0) | 2025.04.18 |
PostgreSQL | with time zone 과 without time zone 차이 (0) | 2025.04.17 |
PostgreSQL에서 idle vs idle in transaction 상태 차이 (0) | 2025.02.13 |