Golang: How's the semaphore timeout?

The semaphore in the Golang is implemented with the channel:

An example is the following: https://sites.google.com/site/gopatterns/concurrency/semaphores

Context:

We have several hundred servers, and there are common resources to which we want to restrict access. Therefore, for this resource we want to use a semaphore to restrict access to only 5 simultaneous accesses of these servers. For this, we plan to use a lock server. When a computer accesses a resource, it first registers with the lock server that it is accessing the resource using a key. And then, when this is done, he will send another request to the lock server to say that it is done and release the semaphore. This ensures that we restrict access to these resources to the maximum amount of simultaneous access.

Problem: Want to handle it gracefully if something goes wrong.

Question

How are you going to implement a timeout on a semaphore?

Example:

Let's say I have a semaphore of size 5. There are 10 processes at the same time trying to get a lock in the semaphore, so in this case only 5 will get it.

Sometimes processes die without reacting (the real reason is a little complicated to explain, but basically sometimes the process may not unlock it), which causes a problem, since the space in the semaphore is now permanently locked.

So, I would like to have a timeout on this. Here are some issues:

Processes will be performed anywhere from 2 seconds to 60 minutes.

We have some conditions for the race, because if time runs out, and then the process tries to unlock it, we unlock the semaphore twice, and not once. And vice versa, we first open it, and then iterate over.

How to take the proposed template published above and turn it into a thread-safe semaphore with timeouts?

+4
4

, , , , , goroutines , - . , .

1) WaitGroup : http://golang.org/pkg/sync/#example_WaitGroup

goroutine , , (, , ). wg.Wait(), , , go. : http://play.golang.org/p/wnm24TcBZg , wg.Wait() , .

2) time.Ticker -: http://golang.org/pkg/time/#Ticker

, . , . for, , , : http://play.golang.org/p/IHeqmiFBSS

, , , , , , defer wg.Done(), . , .

+1

, , , () , , goroutine . goroutine , (.. node)

, , , .

1) , 2) , ( , ) 3) , 4) " ()" ( , )

, , . , . http://play.golang.org/p/DLOX7m8m6q

package main

import "fmt"

import "time"

type Locker struct {
    ch chan int
    locked bool
}

func (l *Locker) lock(){
    l.ch <- 1
    l.locked=true
}
func (l *Locker) unlock() {
    if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock
        l.locked = false // avoid unlocking twice if socket crashes after unlock
        <- l.ch
    }
}

func dostuff(name string, locker Locker) {
    locker.lock()
    defer locker.unlock()
    fmt.Println(name,"Doing stuff")
    time.Sleep(1 * time.Second)
}

func main() {
    ch := make(chan int, 2)
    go dostuff("1",Locker{ch,false})
    go dostuff("2",Locker{ch,false})
    go dostuff("3",Locker{ch,false})
    go dostuff("4",Locker{ch,false})
    time.Sleep(4 * time.Second)
}
+1

:

  • 5 , .
  • .

. ( ) 5 (, 75- ..) /. , . , 5 /.

:

  • , , .
  • . , .
0

, , , .

package main

import (
   "fmt"
   "time"
   "math/rand"
   "strconv"
)

type Empty interface{}

type Semaphore struct {
    dur time.Duration
    ch  chan Empty
}

func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) {
    sem = new(Semaphore)
    sem.dur = dur
    sem.ch = make(chan Empty, max)
    return
}

type Timeout struct{}

type Work struct{}

var null Empty
var timeout Timeout
var work Work

var global = time.Now()

func (sem *Semaphore) StartJob(id int, job func()) {
    sem.ch <- null
    go func() {
        ch := make(chan interface{})
        go func() {
            time.Sleep(sem.dur)
            ch <- timeout
        }()
        go func() {
            fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global))
            job()
            ch <- work
        }()
        switch (<-ch).(type) {
        case Timeout:
            fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global))
        case Work:
            fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global))
        }
        <-sem.ch
    }()
}

func main() {
    rand.Seed(time.Now().Unix())
    sem := NewSemaphore(3, 3*time.Second)
    for i := 0; i < 10; i++ {
        id := i
        go sem.StartJob(i, func() {
            seconds := 2 + rand.Intn(5)
            fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs")
            time.Sleep(time.Duration(seconds) * time.Second)
        })
    }
    time.Sleep(30 * time.Second)
}
0

All Articles