Concurrent
In this section, we'll improve the connector performance by reading multiple stream slices in parallel.
Let's update the source. The bulk of the change is changing its parent class to
ConcurrentSourceAdapter
, and updating its __init__
method so it's properly initialized. This
requires a little bit of boilerplate:
class SourceSurveyMonkeyDemo(ConcurrentSourceAdapter):
message_repository = InMemoryMessageRepository(Level(AirbyteLogFormatter.level_mapping[_logger.level]))
def __init__(self, config: Optional[Mapping[str, Any]], state: Optional[Mapping[str, Any]]):
if config:
concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY)
else:
concurrency_level = _DEFAULT_CONCURRENCY
_logger.info(f"Using concurrent cdk with concurrency level {concurrency_level}")
concurrent_source = ConcurrentSource.create(
concurrency_level, concurrency_level // 2, _logger, self._slice_logger, self.message_repository
)
super().__init__(concurrent_source)
self._config = config
self._state = state
def _get_slice_boundary_fields(self, stream: Stream, state_manager: ConnectorStateManager) -> Optional[Tuple[str, str]]:
return ("start_date", "end_date")
We'll also need to update the streams
method to wrap the streams in an adapter class to enable
concurrency.
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = TokenAuthenticator(config["access_token"])
survey_stream = SurveyMonkeyBaseStream(name="surveys", path="/v3/surveys", primary_key="id", authenticator=auth, cursor_field="date_modified")
synchronous_streams = [
survey_stream,
SurveyMonkeySubstream(name="survey_responses", path="/v3/surveys/{stream_slice[id]}/responses/", primary_key="id", authenticator=auth, parent_stream=survey_stream)
]
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in synchronous_streams}, state=self._state)
configured_streams = []
for stream in synchronous_streams:
if stream.cursor_field:
cursor_field = CursorField(stream.cursor_field)
legacy_state = state_manager.get_stream_state(stream.name, stream.namespace)
cursor = ConcurrentCursor(
stream.name,
stream.namespace,
legacy_state,
self.message_repository,
state_manager,
stream.state_converter,
cursor_field,
self._get_slice_boundary_fields(stream, state_manager),
_START_DATE,
)
else:
cursor = FinalStateCursor(stream.name, stream.namespace, self.message_repository)
configured_streams.append (
StreamFacade.create_from_stream(stream,
self,
_logger,
legacy_state,
cursor)
)
return configured_streams
The most interesting piece from this block is the use of ConcurrentCursor
to support concurrent
state management.
The survey responses stream does not support incremental reads, so it's using a FinalStateCursor
instead. The rest of the code change is mostly boilerplate.
We'll also add a state converter to the SurveyMonkeyBaseStream
to describe how the state cursor is
formatted. We'll use the EpochValueConcurrentStreamStateConverter
since the get_updated_state
method returns the cursor as a timestamp
state_converter = EpochValueConcurrentStreamStateConverter()
Next we'll add a few missing constants:
_DEFAULT_CONCURRENCY = 10
_MAX_CONCURRENCY = 10
_RATE_LIMIT_PER_MINUTE = 120
_logger = logging.getLogger("airbyte")
The substream isn't entirely concurrent because its stream_slices definition reads records from the parent stream concurrently:
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for _slice in self._parent_stream.stream_slices():
for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice):
yield parent_record
This can be solved by implementing the connector using constructs from the concurrent CDK directly instead of wrapping synchronous streams in an adapter. This is left outside of the scope of this tutorial because no production connectors currently implement this.
We'll now enable throttling to avoid going over the API rate limit. You can do this by configuring a
moving window rate limit policy for the SurveyMonkeyBaseStream
class:
class SurveyMonkeyBaseStream(HttpStream, ABC):
def __init__(self, name: str, path: str, primary_key: Union[str, List[str]], data_field: Optional[str], cursor_field: Optional[str],
**kwargs: Any) -> None:
self._name = name
self._path = path
self._primary_key = primary_key
self._data_field = data_field
self._cursor_field = cursor_field
super().__init__(**kwargs)
policies = [
MovingWindowCallRatePolicy(
rates=[Rate(limit=_RATE_LIMIT_PER_MINUTE, interval=datetime.timedelta(minutes=1))],
matchers=[],
),
]
api_budget = HttpAPIBudget(policies=policies)
super().__init__(api_budget=api_budget, **kwargs)
Finally, update the run.py
file to properly instantiate the class. Most of this code is
boilerplate code and isn't specific to the Survey Monkey connector.
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import sys
import sys
import traceback
from datetime import datetime
from typing import List
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type
from airbyte_cdk.entrypoint import launch
from .source import SourceSurveyMonkeyDemo
def _get_source(args: List[str]):
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceSurveyMonkeyDemo(
SourceSurveyMonkeyDemo.read_config(config_path) if config_path else None,
SourceSurveyMonkeyDemo.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=int(datetime.now().timestamp() * 1000),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
),
).json()
)
return None
def run():
args = sys.argv[1:]
source = _get_source(args)
launch(source, args)
You can now run a read operation again. The connector will read multiple partitions concurrently instead of looping through all of them sequentially.
poetry run source-survey-monkey-demo read --config secrets/config.json --catalog integration_tests/configured_catalog.json
We're now done! We implemented a Python connector covering many features:
- Fast and reproducible integration tests
- Authentication errors are detected and labeled as such
- One stream supports incremental reads
- One stream depends on another stream
The final code can be found here