airflow analytics 源码
airflow analytics 代码
文件路径:/airflow/providers/google/marketing_platform/hooks/analytics.py
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any
from googleapiclient.discovery import Resource, build
from googleapiclient.http import MediaFileUpload
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
class GoogleAnalyticsHook(GoogleBaseHook):
"""Hook for Google Analytics 360."""
def __init__(self, api_version: str = "v3", *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_version = api_version
self._conn = None
def _paginate(self, resource: Resource, list_args: dict[str, Any] | None = None) -> list[dict]:
list_args = list_args or {}
result: list[dict] = []
while True:
# start index has value 1
request = resource.list(start_index=len(result) + 1, **list_args)
response = request.execute(num_retries=self.num_retries)
result.extend(response.get("items", []))
# result is the number of fetched links from Analytics
# when all links will be added to the result
# the loop will break
if response["totalResults"] <= len(result):
break
return result
def get_conn(self) -> Resource:
"""Retrieves connection to Google Analytics 360."""
if not self._conn:
http_authorized = self._authorize()
self._conn = build(
"analytics",
self.api_version,
http=http_authorized,
cache_discovery=False,
)
return self._conn
def list_accounts(self) -> list[dict[str, Any]]:
"""Lists accounts list from Google Analytics 360."""
self.log.info("Retrieving accounts list...")
conn = self.get_conn()
accounts = conn.management().accounts()
result = self._paginate(accounts)
return result
def get_ad_words_link(
self, account_id: str, web_property_id: str, web_property_ad_words_link_id: str
) -> dict[str, Any]:
"""
Returns a web property-Google Ads link to which the user has access.
:param account_id: ID of the account which the given web property belongs to.
:param web_property_id: Web property-Google Ads link UA-string.
:param web_property_ad_words_link_id: to retrieve the Google Ads link for.
:returns: web property-Google Ads
:rtype: Dict
"""
self.log.info("Retrieving ad words links...")
ad_words_link = (
self.get_conn()
.management()
.webPropertyAdWordsLinks()
.get(
accountId=account_id,
webPropertyId=web_property_id,
webPropertyAdWordsLinkId=web_property_ad_words_link_id,
)
.execute(num_retries=self.num_retries)
)
return ad_words_link
def list_ad_words_links(self, account_id: str, web_property_id: str) -> list[dict[str, Any]]:
"""
Lists webProperty-Google Ads links for a given web property.
:param account_id: ID of the account which the given web property belongs to.
:param web_property_id: Web property UA-string to retrieve the Google Ads links for.
:returns: list of entity Google Ads links.
:rtype: list
"""
self.log.info("Retrieving ad words list...")
conn = self.get_conn()
ads_links = conn.management().webPropertyAdWordsLinks()
list_args = {"accountId": account_id, "webPropertyId": web_property_id}
result = self._paginate(ads_links, list_args)
return result
def upload_data(
self,
file_location: str,
account_id: str,
web_property_id: str,
custom_data_source_id: str,
resumable_upload: bool = False,
) -> None:
"""
Uploads file to GA via the Data Import API
:param file_location: The path and name of the file to upload.
:param account_id: The GA account Id to which the data upload belongs.
:param web_property_id: UA-string associated with the upload.
:param custom_data_source_id: Custom Data Source Id to which this data import belongs.
:param resumable_upload: flag to upload the file in a resumable fashion, using a
series of at least two requests.
"""
media = MediaFileUpload(
file_location,
mimetype="application/octet-stream",
resumable=resumable_upload,
)
self.log.info(
"Uploading file to GA file for accountId: %s, webPropertyId:%s and customDataSourceId:%s ",
account_id,
web_property_id,
custom_data_source_id,
)
self.get_conn().management().uploads().uploadData(
accountId=account_id,
webPropertyId=web_property_id,
customDataSourceId=custom_data_source_id,
media_body=media,
).execute()
def delete_upload_data(
self,
account_id: str,
web_property_id: str,
custom_data_source_id: str,
delete_request_body: dict[str, Any],
) -> None:
"""
Deletes the uploaded data for a given account/property/dataset
:param account_id: The GA account Id to which the data upload belongs.
:param web_property_id: UA-string associated with the upload.
:param custom_data_source_id: Custom Data Source Id to which this data import belongs.
:param delete_request_body: Dict of customDataImportUids to delete.
"""
self.log.info(
"Deleting previous uploads to GA file for accountId:%s, "
"webPropertyId:%s and customDataSourceId:%s ",
account_id,
web_property_id,
custom_data_source_id,
)
self.get_conn().management().uploads().deleteUploadData(
accountId=account_id,
webPropertyId=web_property_id,
customDataSourceId=custom_data_source_id,
body=delete_request_body,
).execute()
def list_uploads(self, account_id, web_property_id, custom_data_source_id) -> list[dict[str, Any]]:
"""
Get list of data upload from GA
:param account_id: The GA account Id to which the data upload belongs.
:param web_property_id: UA-string associated with the upload.
:param custom_data_source_id: Custom Data Source Id to which this data import belongs.
"""
self.log.info(
"Getting list of uploads for accountId:%s, webPropertyId:%s and customDataSourceId:%s ",
account_id,
web_property_id,
custom_data_source_id,
)
uploads = self.get_conn().management().uploads()
list_args = {
"accountId": account_id,
"webPropertyId": web_property_id,
"customDataSourceId": custom_data_source_id,
}
result = self._paginate(uploads, list_args)
return result
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦