No description
This repository has been archived on 2026-04-20. You can view files and clone it, but you cannot make any changes to its state, such as pushing and creating new issues, pull requests or comments.
  • Python 98.6%
  • Dockerfile 1.4%
Find a file
Chris Pressland 4d36aefc86
Some checks failed
ci / tests (push) Failing after 1m5s
ci / build_release (push) Has been skipped
Heavily Nerf Copybot, needs refactoring now. But good enough for today
2022-03-18 00:35:46 +00:00
.github/workflows Initial Commit 2022-03-03 10:17:12 +00:00
copybot Heavily Nerf Copybot, needs refactoring now. But good enough for today 2022-03-18 00:35:46 +00:00
.dockerignore Initial Commit 2022-03-03 10:17:12 +00:00
.flake8 Initial Commit 2022-03-03 10:17:12 +00:00
.gitignore Initial Commit 2022-03-03 10:17:12 +00:00
.snyk Initial Commit 2022-03-03 10:17:12 +00:00
Dockerfile Initial Commit 2022-03-03 10:17:12 +00:00
main.py Initial Commit 2022-03-03 10:17:12 +00:00
Pipfile Initial Commit 2022-03-03 10:17:12 +00:00
Pipfile.lock Initial Commit 2022-03-03 10:17:12 +00:00
README.md Heavily Nerf Copybot, needs refactoring now. But good enough for today 2022-03-18 00:35:46 +00:00
settings.py Heavily Nerf Copybot, needs refactoring now. But good enough for today 2022-03-18 00:35:46 +00:00
setup.py Initial Commit 2022-03-03 10:17:12 +00:00

clickhouse-copybot

Data Classes

Copybot supports simple or complex schemas and has a basic ORM for converting python types into ClickHouse types. Example:

@dataclass(frozen=True)
class HermesMessage(Message):
    event_type: str
    origin: str
    channel: str
    event_date_time: datetime
    external_user_ref: str
    internal_user_ref: int
    email: str
requests.post(settings.clickhouse_host, params={"query": "DROP DATABASE bink;"}).text
msg_payload = {'event_type': 'event.user.created.api', 'origin': 'channel', 'channel': 'bink', 'event_date_time': '2022-03-01 17:40:53', 'external_user_ref': '239825255', 'internal_user_ref': 933, 'email': 'cpressland@bink.com'}
def _test_insert_record() -> None:
    msg_source = "clickhouse_testing"
    msg_payload = {
        "event_type": "event.user.created.api",
        "origin": "channel",
        "channel": "bink",
        "event_date_time": f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
        "external_user_ref": str(randint(100000000, 999999999)),
        "internal_user_ref": randint(1, 999),
        "email": "cpressland@bink.com",
    }
    message_type = message_routing[msg_source]["class"]
    destination_database = message_routing[msg_source]["database"]
    msg = message_type(**msg_payload)
    sql, msg_params = msg.insert(destination_database)
    requests.post(settings.clickhouse_host, params={"query": sql, "database": "bink", **msg_params}).raise_for_status()