#!/usr/bin/env python
# encoding: utf-8
# Copyright (C) Alibaba Cloud Computing
# All rights reserved.
import sys
import requests
try:
import json
except ImportError:
import simplejson as json
from datetime import datetime
from sls_logs_pb2 import LogGroup
from aliyun.sls.util import Util
from aliyun.sls.slsexception import SLSException
from aliyun.sls.getlogsresponse import GetLogsResponse
from aliyun.sls.putlogsresponse import PutLogsResponse
from aliyun.sls.listtopicsresponse import ListTopicsResponse
from aliyun.sls.listlogstoresresponse import ListLogstoresResponse
from aliyun.sls.gethistogramsresponse import GetHistogramsResponse
CONNECTION_TIME_OUT = 20
API_VERSION = '0.4.0'
USER_AGENT = 'sls-python-sdk-v-0.4.6'
"""
SlsClient class is the main class in the SDK. It can be used to communicate with
SLS server to put/get data.
:Author: sls_dev
"""
[docs]class SLSClient(object):
""" Construct the SLSClient with endpoint, accessKeyId, accessKey.
:type endpoint: string
:param endpoint: SLS host name, for example, http://ch-hangzhou.sls.aliyuncs.com
:type accessKeyId: string
:param accessKeyId: aliyun accessKeyId
:type accessKey: string
:param accessKey: aliyun accessKey
"""
__version__ = API_VERSION
Version = __version__
def __init__(self, endpoint, accessKeyId, accessKey):
if isinstance(endpoint, unicode): # ensure is ascii str
endpoint = endpoint.encode('ascii')
if isinstance(accessKeyId, unicode):
accessKeyId = accessKeyId.encode('ascii')
if isinstance(accessKey, unicode):
accessKey = accessKey.encode('ascii')
self._isRowIp = True
self._port = 80
self._setendpoint(endpoint)
self._accessKeyId = accessKeyId
self._accessKey = accessKey
self._timeout = CONNECTION_TIME_OUT
self._source = Util.get_host_ip(self._slsHost)
def _setendpoint(self, endpoint):
pos = endpoint.find('://')
if pos != -1:
endpoint = endpoint[pos + 3:] # strip http://
pos = endpoint.find('/')
if pos != -1:
endpoint = endpoint[:pos]
pos = endpoint.find(':')
if pos != -1:
self._port = int(endpoint[pos + 1:])
endpoint = endpoint[:pos]
self._isRowIp = Util.is_row_ip(endpoint)
self._slsHost = endpoint
self._endpoint = endpoint + ':' + str(self._port)
def _getGMT(self):
return datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
def _loadJson(self, respText, requestId):
if not respText:
return None
try:
return json.loads(respText)
except:
raise SLSException('SLSBadResponse',
'Bad json format:\n%s' % respText,
requestId)
def _getHttpResponse(self, method, url, params, body, headers): # ensure method, url, body is str
try :
headers['User-Agent'] = USER_AGENT
if method.lower() == 'get' :
r = requests.get(url, params = params, data = body, headers = headers, timeout = self._timeout)
else :
r = requests.post(url, params = params, data = body, headers = headers, timeout = self._timeout)
return (r.status_code, r.content, r.headers)
except Exception, ex:
raise SLSException('SLSRequestError', str(ex))
def _sendRequest(self, method, url, params, body, headers):
(status, respText, respHeader) = self._getHttpResponse(method, url, params, body, headers)
header = {}
for key, value in respHeader.items():
header[key] = value
requestId = header['x-sls-requestid'] if 'x-sls-requestid' in header else ''
exJson = self._loadJson(respText, requestId)
if status == 200:
return (exJson, header)
elif 'error_code' in exJson and 'error_message' in exJson:
raise SLSException(exJson['error_code'], exJson['error_message'], requestId)
else:
exJson = '. Return json is '+str(exJson) if exJson else '.'
raise SLSException('SLSRequestError',
'Request is failed. Http code is '+str(status)+exJson, requestId)
def _send(self, method, project, body, resource, params, headers):
if body:
headers['Content-Length'] = len(body)
headers['Content-MD5'] = Util.cal_md5(body)
headers['Content-Type'] = 'application/x-protobuf'
else:
headers['Content-Length'] = 0
headers["x-sls-bodyrawsize"] = 0
headers['Content-Type'] = ''
headers['x-sls-apiversion'] = API_VERSION
headers['x-sls-signaturemethod'] = 'hmac-sha1'
url = ''
if self._isRowIp:
url = "http://" + self._endpoint
else:
url = "http://" + project + "." + self._endpoint
headers['Host'] = project + "." + self._slsHost
headers['Date'] = self._getGMT()
signature = Util.get_request_authorization(method, resource,
self._accessKey, params, headers)
headers['Authorization'] = "SLS " + self._accessKeyId + ':' + signature
url = url + resource
return self._sendRequest(method, url, params, body, headers)
[docs] def get_unicode(self, key):
if isinstance(key, str):
key = unicode(key, 'utf-8')
return key
[docs] def put_logs(self, request):
""" Put logs to SLS.
Unsuccessful opertaion will cause an SLSException.
:type request: PutLogsRequest
:param request: the PutLogs request parameters class
:return: PutLogsResponse
:raise: SLSException
"""
if len(request.get_log_items()) > 4096:
raise SLSException('InvalidLogSize',
"logItems' length exceeds maximum limitation: 4096 lines.")
logGroup = LogGroup()
logGroup.Topic = request.get_topic()
if request.get_source():
logGroup.Source = request.get_source()
else:
if self._source=='127.0.0.1':
self._source = Util.get_host_ip(request.get_project() + '.' + self._slsHost)
logGroup.Source = self._source
for logItem in request.get_log_items():
log = logGroup.Logs.add()
log.Time = logItem.get_time()
contents = logItem.get_contents()
for key, value in contents:
content = log.Contents.add()
content.Key = self.get_unicode(key)
content.Value = self.get_unicode(value)
body = logGroup.SerializeToString()
if len(body) > 3 * 1024 * 1024: # 3 MB
raise SLSException('InvalidLogSize',
"logItems' size exceeds maximum limitation: 3 MB.")
headers = {}
headers['x-sls-bodyrawsize'] = len(body)
headers['x-sls-compresstype'] = 'deflate'
body = Util.compress_data(body)
params = {}
logstore = request.get_logstore()
project = request.get_project()
resource = '/logstores/' + logstore
respHeaders = self._send('POST', project, body, resource, params, headers)
return PutLogsResponse(respHeaders[1])
[docs] def list_logstores(self, request):
""" List all logstores of requested project.
Unsuccessful opertaion will cause an SLSException.
:type request: ListLogstoresRequest
:param request: the ListLogstores request parameters class.
:return: ListLogStoresResponse
:raise: SLSException
"""
headers = {}
params = {}
resource = '/logstores'
project = request.get_project()
(resp, header) = self._send("GET", project, None, resource, params, headers)
return ListLogstoresResponse(resp, header)
[docs] def list_topics(self, request):
""" List all topics in a logstore.
Unsuccessful opertaion will cause an SLSException.
:type request: ListTopicsRequest
:param request: the ListTopics request parameters class.
:return: ListTopicsResponse
:raise: SLSException
"""
headers = {}
params = {}
if request.get_token()!=None:
params['token'] = request.get_token()
if request.get_line()!=None:
params['line'] = request.get_line()
params['type'] = 'topic'
logstore = request.get_logstore()
project = request.get_project()
resource = "/logstores/" + logstore
(resp, header) = self._send("GET", project, None, resource, params, headers)
return ListTopicsResponse(resp, header)
[docs] def get_histograms(self, request):
""" Get histograms of requested query from SLS.
Unsuccessful opertaion will cause an SLSException.
:type request: GetHistogramsRequest
:param request: the GetHistograms request parameters class.
:return: GetHistogramsResponse
:raise: SLSException
"""
headers = {}
params = {}
if request.get_topic()!=None:
params['topic'] = request.get_topic()
if request.get_from()!=None:
params['from'] = request.get_from()
if request.get_to()!=None:
params['to'] = request.get_to()
if request.get_query()!=None:
params['query'] = request.get_query()
params['type'] = 'histogram'
logstore = request.get_logstore()
project = request.get_project()
resource = "/logstores/" + logstore
(resp, header) = self._send("GET", project, None, resource, params, headers)
return GetHistogramsResponse(resp, header)
[docs] def get_logs(self, request):
""" Get logs from SLS.
Unsuccessful opertaion will cause an SLSException.
:type request: GetLogsRequest
:param request: the GetLogs request parameters class.
:return: GetLogsResponse
:raise: SLSException
"""
headers = {}
params = {}
if request.get_topic()!=None:
params['topic'] = request.get_topic()
if request.get_from()!=None:
params['from'] = request.get_from()
if request.get_to()!=None:
params['to'] = request.get_to()
if request.get_query()!=None:
params['query'] = request.get_query()
params['type'] = 'log'
if request.get_line()!=None:
params['line'] = request.get_line()
if request.get_offset()!=None:
params['offset'] = request.get_offset()
if request.get_reverse()!=None:
params['reverse'] = 'true' if request.get_reverse() else 'false'
logstore = request.get_logstore()
project = request.get_project()
resource = "/logstores/" + logstore
(resp, header) = self._send("GET", project, None, resource, params, headers)
return GetLogsResponse(resp, header)