Atlas - SDL_asyncio_generic.c
Home / ext / SDL / src / io / generic Lines: 1 | Size: 15895 bytes [Download] [Show on GitHub] [Search similar files] [Raw] [Raw (proxy)][FILE BEGIN]1/* 2 Simple DirectMedia Layer 3 Copyright (C) 1997-2025 Sam Lantinga <[email protected]> 4 5 This software is provided 'as-is', without any express or implied 6 warranty. In no event will the authors be held liable for any damages 7 arising from the use of this software. 8 9 Permission is granted to anyone to use this software for any purpose, 10 including commercial applications, and to alter it and redistribute it 11 freely, subject to the following restrictions: 12 13 1. The origin of this software must not be misrepresented; you must not 14 claim that you wrote the original software. If you use this software 15 in a product, an acknowledgment in the product documentation would be 16 appreciated but is not required. 17 2. Altered source versions must be plainly marked as such, and must not be 18 misrepresented as being the original software. 19 3. This notice may not be removed or altered from any source distribution. 20*/ 21 22// The generic backend uses a threadpool to block on synchronous i/o. 23// This is not ideal, it's meant to be used if there isn't a platform-specific 24// backend that can do something more efficient! 25 26#include "SDL_internal.h" 27#include "../SDL_sysasyncio.h" 28 29// on Emscripten without threads, async i/o is synchronous. Sorry. Almost 30// everything is MEMFS, so it's just a memcpy anyhow, and the Emscripten 31// filesystem APIs don't offer async. In theory, directly accessing 32// persistent storage _does_ offer async APIs at the browser level, but 33// that's not exposed in Emscripten's filesystem abstraction. 34#if defined(SDL_PLATFORM_EMSCRIPTEN) && !defined(__EMSCRIPTEN_PTHREADS__) 35#define SDL_ASYNCIO_USE_THREADPOOL 0 36#else 37#define SDL_ASYNCIO_USE_THREADPOOL 1 38#endif 39 40typedef struct GenericAsyncIOQueueData 41{ 42 SDL_Mutex *lock; 43 SDL_Condition *condition; 44 SDL_AsyncIOTask completed_tasks; 45} GenericAsyncIOQueueData; 46 47typedef struct GenericAsyncIOData 48{ 49 SDL_Mutex *lock; // !!! FIXME: we can skip this lock if we have an equivalent of pread/pwrite 50 SDL_IOStream *io; 51} GenericAsyncIOData; 52 53static void AsyncIOTaskComplete(SDL_AsyncIOTask *task) 54{ 55 SDL_assert(task->queue); 56 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) task->queue->userdata; 57 SDL_LockMutex(data->lock); 58 LINKED_LIST_PREPEND(task, data->completed_tasks, queue); 59 SDL_SignalCondition(data->condition); // wake a thread waiting on the queue. 60 SDL_UnlockMutex(data->lock); 61} 62 63// synchronous i/o is offloaded onto the threadpool. This function does the threaded work. 64// This is called directly, without a threadpool, if !SDL_ASYNCIO_USE_THREADPOOL. 65static void SynchronousIO(SDL_AsyncIOTask *task) 66{ 67 SDL_assert(task->result != SDL_ASYNCIO_CANCELED); // shouldn't have gotten in here if canceled! 68 69 GenericAsyncIOData *data = (GenericAsyncIOData *) task->asyncio->userdata; 70 SDL_IOStream *io = data->io; 71 const size_t size = (size_t) task->requested_size; 72 void *ptr = task->buffer; 73 74 // this seek won't work if two tasks are reading from the same file at the same time, 75 // so we lock here. This makes multiple reads from a single file serialize, but different 76 // files will still run in parallel. An app can also open the same file twice to avoid this. 77 SDL_LockMutex(data->lock); 78 if (task->type == SDL_ASYNCIO_TASK_CLOSE) { 79 bool okay = true; 80 if (task->flush) { 81 okay = SDL_FlushIO(data->io); 82 } 83 okay = SDL_CloseIO(data->io) && okay; 84 task->result = okay ? SDL_ASYNCIO_COMPLETE : SDL_ASYNCIO_FAILURE; 85 } else if (SDL_SeekIO(io, (Sint64) task->offset, SDL_IO_SEEK_SET) < 0) { 86 task->result = SDL_ASYNCIO_FAILURE; 87 } else { 88 const bool writing = (task->type == SDL_ASYNCIO_TASK_WRITE); 89 task->result_size = (Uint64) (writing ? SDL_WriteIO(io, ptr, size) : SDL_ReadIO(io, ptr, size)); 90 if (task->result_size == task->requested_size) { 91 task->result = SDL_ASYNCIO_COMPLETE; 92 } else { 93 if (writing) { 94 task->result = SDL_ASYNCIO_FAILURE; // it's always a failure on short writes. 95 } else { 96 const SDL_IOStatus status = SDL_GetIOStatus(io); 97 SDL_assert(status != SDL_IO_STATUS_READY); // this should have either failed or been EOF. 98 SDL_assert(status != SDL_IO_STATUS_NOT_READY); // these should not be non-blocking reads! 99 task->result = (status == SDL_IO_STATUS_EOF) ? SDL_ASYNCIO_COMPLETE : SDL_ASYNCIO_FAILURE; 100 } 101 } 102 } 103 SDL_UnlockMutex(data->lock); 104 105 AsyncIOTaskComplete(task); 106} 107 108#if SDL_ASYNCIO_USE_THREADPOOL 109static SDL_InitState threadpool_init; 110static SDL_Mutex *threadpool_lock = NULL; 111static bool stop_threadpool = false; 112static SDL_AsyncIOTask threadpool_tasks; 113static SDL_Condition *threadpool_condition = NULL; 114static int max_threadpool_threads = 0; 115static int running_threadpool_threads = 0; 116static int idle_threadpool_threads = 0; 117static int threadpool_threads_spun = 0; 118 119static int SDLCALL AsyncIOThreadpoolWorker(void *data) 120{ 121 SDL_LockMutex(threadpool_lock); 122 123 while (!stop_threadpool) { 124 SDL_AsyncIOTask *task = LINKED_LIST_START(threadpool_tasks, threadpool); 125 if (!task) { 126 // if we go 30 seconds without a new task, terminate unless we're the only thread left. 127 idle_threadpool_threads++; 128 const bool rc = SDL_WaitConditionTimeout(threadpool_condition, threadpool_lock, 30000); 129 idle_threadpool_threads--; 130 131 if (!rc) { 132 // decide if we have too many idle threads, and if so, quit to let thread pool shrink when not busy. 133 if (idle_threadpool_threads) { 134 break; 135 } 136 } 137 138 continue; 139 } 140 141 LINKED_LIST_UNLINK(task, threadpool); 142 143 SDL_UnlockMutex(threadpool_lock); 144 145 // bookkeeping is done, so we drop the mutex and fire the work. 146 SynchronousIO(task); 147 148 SDL_LockMutex(threadpool_lock); // take the lock again and see if there's another task (if not, we'll wait on the Condition). 149 } 150 151 running_threadpool_threads--; 152 153 // this is kind of a hack, but this lets us reuse threadpool_condition to block on shutdown until all threads have exited. 154 if (stop_threadpool) { 155 SDL_BroadcastCondition(threadpool_condition); 156 } 157 158 SDL_UnlockMutex(threadpool_lock); 159 160 return 0; 161} 162 163static bool MaybeSpinNewWorkerThread(void) 164{ 165 // if all existing threads are busy and the pool of threads isn't maxed out, make a new one. 166 if ((idle_threadpool_threads == 0) && (running_threadpool_threads < max_threadpool_threads)) { 167 char threadname[32]; 168 SDL_snprintf(threadname, sizeof (threadname), "SDLasyncio%d", threadpool_threads_spun); 169 SDL_Thread *thread = SDL_CreateThread(AsyncIOThreadpoolWorker, threadname, NULL); 170 if (thread == NULL) { 171 return false; 172 } 173 SDL_DetachThread(thread); // these terminate themselves when idle too long, so we never WaitThread. 174 running_threadpool_threads++; 175 threadpool_threads_spun++; 176 } 177 return true; 178} 179 180static void QueueAsyncIOTask(SDL_AsyncIOTask *task) 181{ 182 SDL_assert(task != NULL); 183 184 SDL_LockMutex(threadpool_lock); 185 186 if (stop_threadpool) { // just in case. 187 task->result = SDL_ASYNCIO_CANCELED; 188 AsyncIOTaskComplete(task); 189 } else { 190 LINKED_LIST_PREPEND(task, threadpool_tasks, threadpool); 191 MaybeSpinNewWorkerThread(); // okay if this fails or the thread pool is maxed out. Something will get there eventually. 192 193 // tell idle threads to get to work. 194 // This is a broadcast because we want someone from the thread pool to wake up, but 195 // also shutdown might also be blocking on this. One of the threads will grab 196 // it, the others will go back to sleep. 197 SDL_BroadcastCondition(threadpool_condition); 198 } 199 200 SDL_UnlockMutex(threadpool_lock); 201} 202 203// We don't initialize async i/o at all until it's used, so 204// JUST IN CASE two things try to start at the same time, 205// this will make sure everything gets the same mutex. 206static bool PrepareThreadpool(void) 207{ 208 bool okay = true; 209 if (SDL_ShouldInit(&threadpool_init)) { 210 max_threadpool_threads = (SDL_GetNumLogicalCPUCores() * 2) + 1; // !!! FIXME: this should probably have a hint to override. 211 max_threadpool_threads = SDL_clamp(max_threadpool_threads, 1, 8); // 8 is probably more than enough. 212 213 okay = (okay && ((threadpool_lock = SDL_CreateMutex()) != NULL)); 214 okay = (okay && ((threadpool_condition = SDL_CreateCondition()) != NULL)); 215 okay = (okay && MaybeSpinNewWorkerThread()); // make sure at least one thread is going, since we'll need it. 216 217 if (!okay) { 218 if (threadpool_condition) { 219 SDL_DestroyCondition(threadpool_condition); 220 threadpool_condition = NULL; 221 } 222 if (threadpool_lock) { 223 SDL_DestroyMutex(threadpool_lock); 224 threadpool_lock = NULL; 225 } 226 } 227 228 SDL_SetInitialized(&threadpool_init, okay); 229 } 230 return okay; 231} 232 233static void ShutdownThreadpool(void) 234{ 235 if (SDL_ShouldQuit(&threadpool_init)) { 236 SDL_LockMutex(threadpool_lock); 237 238 // cancel anything that's still pending. 239 SDL_AsyncIOTask *task; 240 while ((task = LINKED_LIST_START(threadpool_tasks, threadpool)) != NULL) { 241 LINKED_LIST_UNLINK(task, threadpool); 242 task->result = SDL_ASYNCIO_CANCELED; 243 AsyncIOTaskComplete(task); 244 } 245 246 stop_threadpool = true; 247 SDL_BroadcastCondition(threadpool_condition); // tell the whole threadpool to wake up and quit. 248 249 while (running_threadpool_threads > 0) { 250 // each threadpool thread will broadcast this condition before it terminates if stop_threadpool is set. 251 // we can't just join the threads because they are detached, so the thread pool can automatically shrink as necessary. 252 SDL_WaitCondition(threadpool_condition, threadpool_lock); 253 } 254 255 SDL_UnlockMutex(threadpool_lock); 256 257 SDL_DestroyMutex(threadpool_lock); 258 threadpool_lock = NULL; 259 SDL_DestroyCondition(threadpool_condition); 260 threadpool_condition = NULL; 261 262 max_threadpool_threads = running_threadpool_threads = idle_threadpool_threads = threadpool_threads_spun = 0; 263 264 stop_threadpool = false; 265 SDL_SetInitialized(&threadpool_init, false); 266 } 267} 268#endif 269 270 271static Sint64 generic_asyncio_size(void *userdata) 272{ 273 GenericAsyncIOData *data = (GenericAsyncIOData *) userdata; 274 return SDL_GetIOSize(data->io); 275} 276 277static bool generic_asyncio_io(void *userdata, SDL_AsyncIOTask *task) 278{ 279 return task->queue->iface.queue_task(task->queue->userdata, task); 280} 281 282static void generic_asyncio_destroy(void *userdata) 283{ 284 GenericAsyncIOData *data = (GenericAsyncIOData *) userdata; 285 SDL_DestroyMutex(data->lock); 286 SDL_free(data); 287} 288 289 290static bool generic_asyncioqueue_queue_task(void *userdata, SDL_AsyncIOTask *task) 291{ 292 #if SDL_ASYNCIO_USE_THREADPOOL 293 QueueAsyncIOTask(task); 294 #else 295 SynchronousIO(task); // oh well. Get a better platform. 296 #endif 297 return true; 298} 299 300static void generic_asyncioqueue_cancel_task(void *userdata, SDL_AsyncIOTask *task) 301{ 302 #if !SDL_ASYNCIO_USE_THREADPOOL // in theory, this was all synchronous and should never call this, but just in case. 303 task->result = SDL_ASYNCIO_CANCELED; 304 AsyncIOTaskComplete(task); 305 #else 306 // we can't stop i/o that's in-flight, but we _can_ just refuse to start it if the threadpool hadn't picked it up yet. 307 SDL_LockMutex(threadpool_lock); 308 if (LINKED_LIST_PREV(task, threadpool) != NULL) { // still in the queue waiting to be run? Take it out. 309 LINKED_LIST_UNLINK(task, threadpool); 310 task->result = SDL_ASYNCIO_CANCELED; 311 AsyncIOTaskComplete(task); 312 } 313 SDL_UnlockMutex(threadpool_lock); 314 #endif 315} 316 317static SDL_AsyncIOTask *generic_asyncioqueue_get_results(void *userdata) 318{ 319 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata; 320 SDL_LockMutex(data->lock); 321 SDL_AsyncIOTask *task = LINKED_LIST_START(data->completed_tasks, queue); 322 if (task) { 323 LINKED_LIST_UNLINK(task, queue); 324 } 325 SDL_UnlockMutex(data->lock); 326 return task; 327} 328 329static SDL_AsyncIOTask *generic_asyncioqueue_wait_results(void *userdata, Sint32 timeoutMS) 330{ 331 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata; 332 SDL_LockMutex(data->lock); 333 SDL_AsyncIOTask *task = LINKED_LIST_START(data->completed_tasks, queue); 334 if (!task) { 335 SDL_WaitConditionTimeout(data->condition, data->lock, timeoutMS); 336 task = LINKED_LIST_START(data->completed_tasks, queue); 337 } 338 if (task) { 339 LINKED_LIST_UNLINK(task, queue); 340 } 341 SDL_UnlockMutex(data->lock); 342 return task; 343} 344 345static void generic_asyncioqueue_signal(void *userdata) 346{ 347 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata; 348 SDL_LockMutex(data->lock); 349 SDL_BroadcastCondition(data->condition); 350 SDL_UnlockMutex(data->lock); 351} 352 353static void generic_asyncioqueue_destroy(void *userdata) 354{ 355 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata; 356 SDL_DestroyMutex(data->lock); 357 SDL_DestroyCondition(data->condition); 358 SDL_free(data); 359} 360 361bool SDL_SYS_CreateAsyncIOQueue_Generic(SDL_AsyncIOQueue *queue) 362{ 363 #if SDL_ASYNCIO_USE_THREADPOOL 364 if (!PrepareThreadpool()) { 365 return false; 366 } 367 #endif 368 369 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) SDL_calloc(1, sizeof (*data)); 370 if (!data) { 371 return false; 372 } 373 374 data->lock = SDL_CreateMutex(); 375 if (!data->lock) { 376 SDL_free(data); 377 return false; 378 } 379 380 data->condition = SDL_CreateCondition(); 381 if (!data->condition) { 382 SDL_DestroyMutex(data->lock); 383 SDL_free(data); 384 return false; 385 } 386 387 static const SDL_AsyncIOQueueInterface SDL_AsyncIOQueue_Generic = { 388 generic_asyncioqueue_queue_task, 389 generic_asyncioqueue_cancel_task, 390 generic_asyncioqueue_get_results, 391 generic_asyncioqueue_wait_results, 392 generic_asyncioqueue_signal, 393 generic_asyncioqueue_destroy 394 }; 395 396 SDL_copyp(&queue->iface, &SDL_AsyncIOQueue_Generic); 397 queue->userdata = data; 398 return true; 399} 400 401 402bool SDL_SYS_AsyncIOFromFile_Generic(const char *file, const char *mode, SDL_AsyncIO *asyncio) 403{ 404 #if SDL_ASYNCIO_USE_THREADPOOL 405 if (!PrepareThreadpool()) { 406 return false; 407 } 408 #endif 409 410 GenericAsyncIOData *data = (GenericAsyncIOData *) SDL_calloc(1, sizeof (*data)); 411 if (!data) { 412 return false; 413 } 414 415 data->lock = SDL_CreateMutex(); 416 if (!data->lock) { 417 SDL_free(data); 418 return false; 419 } 420 421 data->io = SDL_IOFromFile(file, mode); 422 if (!data->io) { 423 SDL_DestroyMutex(data->lock); 424 SDL_free(data); 425 return false; 426 } 427 428 static const SDL_AsyncIOInterface SDL_AsyncIOFile_Generic = { 429 generic_asyncio_size, 430 generic_asyncio_io, 431 generic_asyncio_io, 432 generic_asyncio_io, 433 generic_asyncio_destroy 434 }; 435 436 SDL_copyp(&asyncio->iface, &SDL_AsyncIOFile_Generic); 437 asyncio->userdata = data; 438 return true; 439} 440 441void SDL_SYS_QuitAsyncIO_Generic(void) 442{ 443 #if SDL_ASYNCIO_USE_THREADPOOL 444 ShutdownThreadpool(); 445 #endif 446} 447 448 449#if SDL_ASYNCIO_ONLY_HAVE_GENERIC 450bool SDL_SYS_AsyncIOFromFile(const char *file, const char *mode, SDL_AsyncIO *asyncio) 451{ 452 return SDL_SYS_AsyncIOFromFile_Generic(file, mode, asyncio); 453} 454 455bool SDL_SYS_CreateAsyncIOQueue(SDL_AsyncIOQueue *queue) 456{ 457 return SDL_SYS_CreateAsyncIOQueue_Generic(queue); 458} 459 460void SDL_SYS_QuitAsyncIO(void) 461{ 462 SDL_SYS_QuitAsyncIO_Generic(); 463} 464#endif 465 466[FILE END](C) 2025 0x4248 (C) 2025 4248 Media and 4248 Systems, All part of 0x4248 See LICENCE files for more information. Not all files are by 0x4248 always check Licencing.