方法1:使用requests同步
from requests import Session
import base64class LoadSession(Session):def rebuild_auth(self, prepared_request, response):"""No code here means requests will always preserve the Authorizationheader when redirected."""def main():"""Stream load Demo with Standard Lib requests"""username, password = 'root', ''headers={"Content-Type": "text/html; charset=UTF-8",#"Content-Type": "application/octet-stream", # file upload"connection": "keep-alive","max_filter_ratio": "0.2","columns": "k,v","column_separator": ',',"Expect": "100-continue",}payload = '''k1,v1\nk2,v2\nk3,v3'''database = 'starrocks_demo'tablename = 'tb1'api = 'http://master1:8030/api/%s/%s/_stream_load' % (database, tablename)session = LoadSession()session.auth = (username, password)response = session.put(url=api, headers=headers, data=payload)#response = session.put(url=api, headers=headers, data= open("a.csv","rb")) # file uploadprint(response.json())if __name__ == '__main__':main()
方法2:数据保存成文件再执行终端命令同步
import subprocess
import timeclass StarRocksClient(object):def __init__(self, host, port, database, columns, sep,username, password, filename, table, timeout):self.filename = filenameself.table = tableself.columns = columnsself.sep = sepself.host = hostself.port = portself.database = databaseself.user = usernameself.password = passwordself.timeout = timeoutdef get_label(self): t = time.time().__str__().replace(".", "_")return '_'.join([self.database,self.table, t])def load(self):label = self.get_label()cmd = "curl"param_location = "--location-trusted"param_user = "%s:%s" % (self.user, self.password)param_file = "%s" % self.filenameparam_url = "http://%s:%s/api/%s/%s/_stream_load" % (self.host, self.port, self.database, self.table)p = subprocess.Popen([cmd, param_location,"-H", 'columns: %s' %self.columns,"-H", "column_separator: %s" %self.sep,"-H", "label: %s" %self.get_label(),"-H", "timeout: %d" %self.timeout,"-u", param_user,"-T", param_file,param_url])p.wait()if p.returncode != 0:print """\nLoad to starrocks failed! LABEL is %s""" % (label)else:print """\nLoad to starrocks success! LABEL is %s """ % (label)return labelif __name__ == '__main__':"""-- Stream load Demo with Linux cmd - Curl---- StarRocks DDL: CREATE TABLE `starrocks_demo`.`tb1` (`k` varchar(65533) NULL COMMENT "",`v` varchar(65533) NULL COMMENT "") ENGINE=OLAPDUPLICATE KEY(`k`)COMMENT "OLAP"DISTRIBUTED BY HASH(`k`) BUCKETS 1PROPERTIES ("replication_num" = "1","in_memory" = "false","storage_format" = "DEFAULT");"""# load job 1client1 = StarRocksClient(host="master1",port="8030",database="starrocks_demo",username="root",password="",filename="/tmp/test.csv", # data from local file /tmp/test.csv, usage: python CurlStreamLoad.pytable="tb1",columns='k,v',sep=",",timeout=86400)client1.load()time.sleep(1)# load job 2client2 = StarRocksClient(host="master1",port="8030",database="starrocks_demo",username="root",password="",filename="-", # data from stdin, usage: echo 'k1,v1\nk2,v2'| python CurlStreamLoad.pytable="tb1",columns='k,v',sep=",",timeout=86400)client2.load()