Skip to content

dispatcher

dispatcher

common

Status

Bases: Enum

Source code in src/qgis_server_light/interface/dispatcher/common.py
4
5
6
7
8
class Status(Enum):
    SUCCESS = "succeed"
    FAILURE = "failed"
    RUNNING = "running"
    QUEUED = "queued"
FAILURE = 'failed' class-attribute instance-attribute
QUEUED = 'queued' class-attribute instance-attribute
RUNNING = 'running' class-attribute instance-attribute
SUCCESS = 'succeed' class-attribute instance-attribute

redis_asio

This contains the interface definition about how a job info is passed around a redis queue.

RedisQueue

Source code in src/qgis_server_light/interface/dispatcher/redis_asio.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class RedisQueue:
    job_queue_name: str = "jobs"
    job_info_key: str = "info"
    job_info_type_key: str = "info_type"
    job_channel_name: str = "notifications"
    job_status_key: str = "status"
    job_duration_key: str = "duration"
    job_timestamp_key: str = "timestamp"
    job_last_update_key: str = f"{job_timestamp_key}.last_update"

    def __init__(self, redis_client: redis_aio.Redis) -> None:
        # we use this to hold connections to redis in a pool, this way we are
        # event loop safe and when creating the redis client for every call of
        # post, we only instantiate a minimal wrapper object which is cheap.

        self.client = redis_client

    @classmethod
    def create(cls, url: str):
        redis_client = redis_aio.Redis.from_url(url)
        return cls(redis_client)

    async def set_job_runtime_status(
        self,
        job_id,
        pipeline: Pipeline,
        status: str,
        start_time: float,
    ):
        duration = time.time() - start_time
        ts = datetime.datetime.now().isoformat()
        await pipeline.hset(f"job:{job_id}", self.job_status_key, status)
        await pipeline.hset(
            f"job:{job_id}",
            f"{self.job_timestamp_key}.{status}",
            ts,
        )
        await pipeline.hset(f"job:{job_id}", self.job_last_update_key, ts)
        await pipeline.hset(f"job:{job_id}", self.job_duration_key, str(duration))
        await pipeline.execute()

    async def post(
        self,
        job_parameter: (
            QslJobParameterRender
            | QslJobParameterFeatureInfo
            | QslJobParameterLegend
            | QslJobParameterFeature
        ),
        to: float = 10.0,
    ) -> tuple[JobResult, str]:
        """
        Posts a new `runner` to the runner queue and waits maximum `timeout` seconds to complete.
        Will return a JobResult if successful or raise an error.

        Args:
            job_parameter: The parameter for the job which should be executed.
            to: The timeout a job is expected to be waited for before canceling
                job execution.
        """
        job_id = str(uuid4())
        start_time = time.time()
        if isinstance(job_parameter, QslJobParameterRender):
            job_info = QslJobInfoRender(
                id=job_id, type=QslJobInfoRender.__name__, job=job_parameter
            )
        elif isinstance(job_parameter, QslJobParameterFeatureInfo):
            job_info = QslJobInfoFeatureInfo(
                id=job_id, type=QslJobParameterFeatureInfo.__name__, job=job_parameter
            )
        elif isinstance(job_parameter, QslJobParameterLegend):
            job_info = QslJobInfoLegend(
                id=job_id, type=QslJobInfoLegend.__name__, job=job_parameter
            )
        elif isinstance(job_parameter, QslJobParameterFeature):
            job_info = QslJobInfoFeature(
                id=job_id, type=QslJobInfoFeature.__name__, job=job_parameter
            )
        else:
            return (
                JobResult(
                    id=job_id,
                    data=f"Unsupported runner type: {type(job_parameter)}",
                    content_type="application/text",
                ),
                Status.FAILURE.value,
            )
        async with self.client.pipeline() as p:
            # Putting job info into redis
            await p.hset(
                f"job:{job_id}", self.job_info_key, JsonSerializer().render(job_info)
            )
            await p.hset(
                f"job:{job_id}", self.job_info_type_key, job_info.__class__.__name__
            )
            # Queuing the job onto the list/queue
            await p.rpush(self.job_queue_name, job_id)
            await p.execute()

            logging.info(f"{job_id} queued")

            # we inform, that the job was queued
            await self.set_job_runtime_status(
                job_id, p, Status.QUEUED.value, start_time
            )
            try:
                async with self.client.pubsub() as ps:
                    # we tell redis to let us know if a message is published
                    # for this channel `notifications:{job_id}`.
                    await ps.subscribe(f"{self.job_channel_name}:{job_id}")
                    try:
                        # this puts a timeout trigger on the subscription, after timeout
                        # an asyncio.TimeoutError or asyncio.exceptions.CancelledError
                        # is raised. See except block below.
                        async with timeout(to):
                            while True:
                                message = await ps.get_message(
                                    timeout=to, ignore_subscribe_messages=True
                                )
                                if not message:
                                    continue  # https://github.com/redis/redis-py/issues/733
                                status_binary = await self.client.hget(
                                    f"job:{job_id}", "status"
                                )
                                status = status_binary.decode()
                                result: JobResult = pickle.loads(message["data"])
                                duration = time.time() - start_time
                                if status == Status.SUCCESS.value:
                                    logging.info(
                                        f"Job id: {job_id}, status: {status}, "
                                        f"duration: {duration}"
                                    )
                                elif status == Status.FAILURE.value:
                                    logging.info(
                                        f"Job id: {job_id}, status: {status}, "
                                        f"duration: {duration}, error: {result.data}"
                                    )
                                return result, status
                    except (asyncio.TimeoutError, asyncio.exceptions.CancelledError):
                        logging.info(f"{job_id} timeout")
                        raise
            except Exception as e:
                duration = time.time() - start_time
                logging.info(
                    f"Job id: {job_id}, status: {Status.FAILURE.value}, duration: "
                    f"{duration}",
                    exc_info=True,
                )
                return (
                    JobResult(
                        id=job_id,
                        data=str(e),
                        content_type="application/text",
                    ),
                    Status.FAILURE.value,
                )
            finally:
                try:
                    await self.client.delete(f"job:{job_id}")
                except Exception:
                    logging.warning(
                        f"Cleanup failed for {job_id}",
                        exc_info=True,
                    )
client = redis_client instance-attribute
job_channel_name: str = 'notifications' class-attribute instance-attribute
job_duration_key: str = 'duration' class-attribute instance-attribute
job_info_key: str = 'info' class-attribute instance-attribute
job_info_type_key: str = 'info_type' class-attribute instance-attribute
job_last_update_key: str = f'{job_timestamp_key}.last_update' class-attribute instance-attribute
job_queue_name: str = 'jobs' class-attribute instance-attribute
job_status_key: str = 'status' class-attribute instance-attribute
job_timestamp_key: str = 'timestamp' class-attribute instance-attribute
__init__(redis_client: redis_aio.Redis) -> None
Source code in src/qgis_server_light/interface/dispatcher/redis_asio.py
48
49
50
51
52
53
def __init__(self, redis_client: redis_aio.Redis) -> None:
    # we use this to hold connections to redis in a pool, this way we are
    # event loop safe and when creating the redis client for every call of
    # post, we only instantiate a minimal wrapper object which is cheap.

    self.client = redis_client
create(url: str) classmethod
Source code in src/qgis_server_light/interface/dispatcher/redis_asio.py
55
56
57
58
@classmethod
def create(cls, url: str):
    redis_client = redis_aio.Redis.from_url(url)
    return cls(redis_client)
post(job_parameter: QslJobParameterRender | QslJobParameterFeatureInfo | QslJobParameterLegend | QslJobParameterFeature, to: float = 10.0) -> tuple[JobResult, str] async

Posts a new runner to the runner queue and waits maximum timeout seconds to complete. Will return a JobResult if successful or raise an error.

Parameters:

Source code in src/qgis_server_light/interface/dispatcher/redis_asio.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
async def post(
    self,
    job_parameter: (
        QslJobParameterRender
        | QslJobParameterFeatureInfo
        | QslJobParameterLegend
        | QslJobParameterFeature
    ),
    to: float = 10.0,
) -> tuple[JobResult, str]:
    """
    Posts a new `runner` to the runner queue and waits maximum `timeout` seconds to complete.
    Will return a JobResult if successful or raise an error.

    Args:
        job_parameter: The parameter for the job which should be executed.
        to: The timeout a job is expected to be waited for before canceling
            job execution.
    """
    job_id = str(uuid4())
    start_time = time.time()
    if isinstance(job_parameter, QslJobParameterRender):
        job_info = QslJobInfoRender(
            id=job_id, type=QslJobInfoRender.__name__, job=job_parameter
        )
    elif isinstance(job_parameter, QslJobParameterFeatureInfo):
        job_info = QslJobInfoFeatureInfo(
            id=job_id, type=QslJobParameterFeatureInfo.__name__, job=job_parameter
        )
    elif isinstance(job_parameter, QslJobParameterLegend):
        job_info = QslJobInfoLegend(
            id=job_id, type=QslJobInfoLegend.__name__, job=job_parameter
        )
    elif isinstance(job_parameter, QslJobParameterFeature):
        job_info = QslJobInfoFeature(
            id=job_id, type=QslJobInfoFeature.__name__, job=job_parameter
        )
    else:
        return (
            JobResult(
                id=job_id,
                data=f"Unsupported runner type: {type(job_parameter)}",
                content_type="application/text",
            ),
            Status.FAILURE.value,
        )
    async with self.client.pipeline() as p:
        # Putting job info into redis
        await p.hset(
            f"job:{job_id}", self.job_info_key, JsonSerializer().render(job_info)
        )
        await p.hset(
            f"job:{job_id}", self.job_info_type_key, job_info.__class__.__name__
        )
        # Queuing the job onto the list/queue
        await p.rpush(self.job_queue_name, job_id)
        await p.execute()

        logging.info(f"{job_id} queued")

        # we inform, that the job was queued
        await self.set_job_runtime_status(
            job_id, p, Status.QUEUED.value, start_time
        )
        try:
            async with self.client.pubsub() as ps:
                # we tell redis to let us know if a message is published
                # for this channel `notifications:{job_id}`.
                await ps.subscribe(f"{self.job_channel_name}:{job_id}")
                try:
                    # this puts a timeout trigger on the subscription, after timeout
                    # an asyncio.TimeoutError or asyncio.exceptions.CancelledError
                    # is raised. See except block below.
                    async with timeout(to):
                        while True:
                            message = await ps.get_message(
                                timeout=to, ignore_subscribe_messages=True
                            )
                            if not message:
                                continue  # https://github.com/redis/redis-py/issues/733
                            status_binary = await self.client.hget(
                                f"job:{job_id}", "status"
                            )
                            status = status_binary.decode()
                            result: JobResult = pickle.loads(message["data"])
                            duration = time.time() - start_time
                            if status == Status.SUCCESS.value:
                                logging.info(
                                    f"Job id: {job_id}, status: {status}, "
                                    f"duration: {duration}"
                                )
                            elif status == Status.FAILURE.value:
                                logging.info(
                                    f"Job id: {job_id}, status: {status}, "
                                    f"duration: {duration}, error: {result.data}"
                                )
                            return result, status
                except (asyncio.TimeoutError, asyncio.exceptions.CancelledError):
                    logging.info(f"{job_id} timeout")
                    raise
        except Exception as e:
            duration = time.time() - start_time
            logging.info(
                f"Job id: {job_id}, status: {Status.FAILURE.value}, duration: "
                f"{duration}",
                exc_info=True,
            )
            return (
                JobResult(
                    id=job_id,
                    data=str(e),
                    content_type="application/text",
                ),
                Status.FAILURE.value,
            )
        finally:
            try:
                await self.client.delete(f"job:{job_id}")
            except Exception:
                logging.warning(
                    f"Cleanup failed for {job_id}",
                    exc_info=True,
                )
set_job_runtime_status(job_id, pipeline: Pipeline, status: str, start_time: float) async
Source code in src/qgis_server_light/interface/dispatcher/redis_asio.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
async def set_job_runtime_status(
    self,
    job_id,
    pipeline: Pipeline,
    status: str,
    start_time: float,
):
    duration = time.time() - start_time
    ts = datetime.datetime.now().isoformat()
    await pipeline.hset(f"job:{job_id}", self.job_status_key, status)
    await pipeline.hset(
        f"job:{job_id}",
        f"{self.job_timestamp_key}.{status}",
        ts,
    )
    await pipeline.hset(f"job:{job_id}", self.job_last_update_key, ts)
    await pipeline.hset(f"job:{job_id}", self.job_duration_key, str(duration))
    await pipeline.execute()