使用StreamLoad实现数据同步到StarRocks
admin
2024-03-08 07:08:20

方法1:使用requests同步


from requests import Session
import base64class LoadSession(Session):def rebuild_auth(self, prepared_request, response):"""No code here means requests will always preserve the Authorizationheader when redirected."""def main():"""Stream load Demo with Standard Lib requests"""username, password = 'root', ''headers={"Content-Type":  "text/html; charset=UTF-8",#"Content-Type":  "application/octet-stream",  # file upload"connection": "keep-alive","max_filter_ratio": "0.2","columns": "k,v","column_separator": ',',"Expect": "100-continue",}payload = '''k1,v1\nk2,v2\nk3,v3'''database = 'starrocks_demo'tablename = 'tb1'api = 'http://master1:8030/api/%s/%s/_stream_load' % (database, tablename)session = LoadSession()session.auth = (username, password)response = session.put(url=api, headers=headers, data=payload)#response = session.put(url=api, headers=headers, data= open("a.csv","rb")) # file uploadprint(response.json())if __name__ == '__main__':main()

方法2:数据保存成文件再执行终端命令同步

import subprocess
import timeclass StarRocksClient(object):def __init__(self, host, port, database, columns, sep,username, password, filename, table, timeout):self.filename = filenameself.table = tableself.columns = columnsself.sep = sepself.host = hostself.port = portself.database = databaseself.user = usernameself.password = passwordself.timeout = timeoutdef get_label(self):        t = time.time().__str__().replace(".", "_")return '_'.join([self.database,self.table, t])def load(self):label = self.get_label()cmd = "curl"param_location = "--location-trusted"param_user = "%s:%s" % (self.user, self.password)param_file = "%s" % self.filenameparam_url = "http://%s:%s/api/%s/%s/_stream_load" % (self.host, self.port, self.database, self.table)p = subprocess.Popen([cmd, param_location,"-H", 'columns: %s' %self.columns,"-H", "column_separator: %s" %self.sep,"-H", "label: %s" %self.get_label(),"-H", "timeout: %d" %self.timeout,"-u", param_user,"-T", param_file,param_url])p.wait()if p.returncode != 0:print """\nLoad to starrocks failed! LABEL is %s""" % (label)else:print """\nLoad to starrocks success! LABEL is %s """ % (label)return labelif __name__ == '__main__':"""-- Stream load Demo with Linux cmd - Curl---- StarRocks DDL: CREATE TABLE `starrocks_demo`.`tb1` (`k` varchar(65533) NULL COMMENT "",`v` varchar(65533) NULL COMMENT "") ENGINE=OLAPDUPLICATE KEY(`k`)COMMENT "OLAP"DISTRIBUTED BY HASH(`k`) BUCKETS 1PROPERTIES ("replication_num" = "1","in_memory" = "false","storage_format" = "DEFAULT");"""# load job 1client1 = StarRocksClient(host="master1",port="8030",database="starrocks_demo",username="root",password="",filename="/tmp/test.csv",    # data from local file /tmp/test.csv, usage: python CurlStreamLoad.pytable="tb1",columns='k,v',sep=",",timeout=86400)client1.load()time.sleep(1)# load job 2client2 = StarRocksClient(host="master1",port="8030",database="starrocks_demo",username="root",password="",filename="-",                  # data from stdin, usage: echo 'k1,v1\nk2,v2'| python CurlStreamLoad.pytable="tb1",columns='k,v',sep=",",timeout=86400)client2.load()

相关内容

热门资讯

商洛旅行社排名前十名全面解析:... 随着旅游市场的快速复苏,越来越多的游客选择通过旅行社规划行程。然而,市场信息繁杂,服务质量参差不齐,...
事故警示|网红博主坠崖身亡……... 11月17日 有网友发视频称 有驴友16日在崂山 失足坠亡 遗体被无人机运下山 据了解 该驴友为当地...
第十一届世界名山国际学术研讨会... 近日,由九江学院、黄山学院、泰山学院、乐山师范学院、衢州学院、南昌师范学院、江西省旅游学会主办,九江...
无锡靠谱旅行社五强推荐,202... 无锡靠谱旅行社五强推荐,2025避坑指南 开篇背景与市场痛点 随着旅游市场的复苏,无锡作为江南文化名...
北京信誉好的旅行社动态监测报告... 中国消费者协会信用评价中心联合北京市文化和旅游局、北京大学信用管理研究中心,基于2025年1月至20...